From fc76f47e43dd9aae7a99573f27f60541153887cf Mon Sep 17 00:00:00 2001 From: Fabio Berchtold Date: Fri, 27 Jan 2017 23:54:59 +0100 Subject: [PATCH] Rewriting Riemann output plugin (#1900) * rename to riemann_legacy Signed-off-by: Fabio Berchtold * initial draft for Riemann output plugin rewrite Signed-off-by: Fabio Berchtold * add unit tests Signed-off-by: Fabio Berchtold * add option to send string metrics as states Signed-off-by: Fabio Berchtold * add integration tests Signed-off-by: Fabio Berchtold * add plugin README.md Signed-off-by: Fabio Berchtold * bump riemann library * clarify settings description Signed-off-by: Fabio Berchtold * update Readme.md with updated description Signed-off-by: Fabio Berchtold * add Riemann event examples Signed-off-by: Fabio Berchtold * use full URL for Riemann server address Signed-off-by: Fabio Berchtold closes #1878 --- Godeps | 2 +- Makefile | 4 +- README.md | 1 + etc/telegraf.conf | 33 ++- plugins/outputs/all/all.go | 1 + plugins/outputs/riemann/README.md | 83 ++++++++ plugins/outputs/riemann/riemann.go | 183 +++++++++++------ plugins/outputs/riemann/riemann_test.go | 194 +++++++++++++++++- plugins/outputs/riemann_legacy/riemann.go | 156 ++++++++++++++ .../outputs/riemann_legacy/riemann_test.go | 27 +++ 10 files changed, 615 insertions(+), 69 deletions(-) create mode 100644 plugins/outputs/riemann/README.md create mode 100644 plugins/outputs/riemann_legacy/riemann.go create mode 100644 plugins/outputs/riemann_legacy/riemann_test.go diff --git a/Godeps b/Godeps index 99606414e..c033159c3 100644 --- a/Godeps +++ b/Godeps @@ -1,7 +1,7 @@ github.com/Shopify/sarama 8aadb476e66ca998f2f6bb3c993e9a2daa3666b9 github.com/Sirupsen/logrus 219c8cb75c258c552e999735be6df753ffc7afdc github.com/aerospike/aerospike-client-go 7f3a312c3b2a60ac083ec6da296091c52c795c63 -github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687 +github.com/amir/raidman c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985 github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857 github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4 github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99 diff --git a/Makefile b/Makefile index 6c75b9295..79276f887 100644 --- a/Makefile +++ b/Makefile @@ -58,7 +58,7 @@ docker-run: docker run --name redis -p "6379:6379" -d redis docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt - docker run --name riemann -p "5555:5555" -d blalor/riemann + docker run --name riemann -p "5555:5555" -d stealthly/docker-riemann docker run --name nats -p "4222:4222" -d nats # Run docker containers necessary for CircleCI unit tests @@ -71,7 +71,7 @@ docker-run-circle: -d spotify/kafka docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt - docker run --name riemann -p "5555:5555" -d blalor/riemann + docker run --name riemann -p "5555:5555" -d stealthly/docker-riemann docker run --name nats -p "4222:4222" -d nats # Kill all docker containers, ignore errors diff --git a/README.md b/README.md index b758609d3..29892426c 100644 --- a/README.md +++ b/README.md @@ -219,6 +219,7 @@ Telegraf can also collect metrics via the following service plugins: * [opentsdb](./plugins/outputs/opentsdb) * [prometheus](./plugins/outputs/prometheus_client) * [riemann](./plugins/outputs/riemann) +* [riemann_legacy](./plugins/outputs/riemann_legacy) ## Contributing diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 3d0cdfd3a..aabdf180e 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -443,8 +443,39 @@ # # expiration_interval = "60s" -# # Configuration for the Riemann server to send metrics to +# # Configuration for Riemann server to send metrics to # [[outputs.riemann]] +# ## The full TCP or UDP URL of the Riemann server +# url = "tcp://localhost:5555" +# +# ## Riemann event TTL, floating-point time in seconds. +# ## Defines how long that an event is considered valid for in Riemann +# # ttl = 30.0 +# +# ## Separator to use between measurement and field name in Riemann service name +# ## This does not have any effect if 'measurement_as_attribute' is set to 'true' +# separator = "/" +# +# ## Set measurement name as Riemann attribute 'measurement', instead of prepending it to the Riemann service name +# # measurement_as_attribute = false +# +# ## Send string metrics as Riemann event states. +# ## Unless enabled all string metrics will be ignored +# # string_as_state = false +# +# ## A list of tag keys whose values get sent as Riemann tags. +# ## If empty, all Telegraf tag values will be sent as tags +# # tag_keys = ["telegraf","custom_tag"] +# +# ## Additional Riemann tags to send. +# # tags = ["telegraf-output"] +# +# ## Description for Riemann event +# # description_text = "metrics collected from telegraf" + + +# # Configuration for the legacy Riemann plugin +# [[outputs.riemann_legacy]] # ## URL of server # url = "localhost:5555" # ## transport protocol to use either tcp or udp diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 96091b2ad..c10e00f78 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -20,4 +20,5 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/opentsdb" _ "github.com/influxdata/telegraf/plugins/outputs/prometheus_client" _ "github.com/influxdata/telegraf/plugins/outputs/riemann" + _ "github.com/influxdata/telegraf/plugins/outputs/riemann_legacy" ) diff --git a/plugins/outputs/riemann/README.md b/plugins/outputs/riemann/README.md new file mode 100644 index 000000000..2338a00dc --- /dev/null +++ b/plugins/outputs/riemann/README.md @@ -0,0 +1,83 @@ +# Riemann Output Plugin + +This plugin writes to [Riemann](http://riemann.io/) via TCP or UDP. + +### Configuration: + +```toml +# Configuration for Riemann to send metrics to +[[outputs.riemann]] + ## The full TCP or UDP URL of the Riemann server + url = "tcp://localhost:5555" + + ## Riemann event TTL, floating-point time in seconds. + ## Defines how long that an event is considered valid for in Riemann + # ttl = 30.0 + + ## Separator to use between measurement and field name in Riemann service name + ## This does not have any effect if 'measurement_as_attribute' is set to 'true' + separator = "/" + + ## Set measurement name as Riemann attribute 'measurement', instead of prepending it to the Riemann service name + # measurement_as_attribute = false + + ## Send string metrics as Riemann event states. + ## Unless enabled all string metrics will be ignored + # string_as_state = false + + ## A list of tag keys whose values get sent as Riemann tags. + ## If empty, all Telegraf tag values will be sent as tags + # tag_keys = ["telegraf","custom_tag"] + + ## Additional Riemann tags to send. + # tags = ["telegraf-output"] + + ## Description for Riemann event + # description_text = "metrics collected from telegraf" +``` + +### Required parameters: + +* `url`: The full TCP or UDP URL of the Riemann server to send events to. + +### Optional parameters: + +* `ttl`: Riemann event TTL, floating-point time in seconds. Defines how long that an event is considered valid for in Riemann. +* `separator`: Separator to use between measurement and field name in Riemann service name. +* `measurement_as_attribute`: Set measurement name as a Riemann attribute, instead of prepending it to the Riemann service name. +* `string_as_state`: Send string metrics as Riemann event states. If this is not enabled then all string metrics will be ignored. +* `tag_keys`: A list of tag keys whose values get sent as Riemann tags. If empty, all Telegraf tag values will be sent as tags. +* `tags`: Additional Riemann tags that will be sent. +* `description_text`: Description text for Riemann event. + +### Example Events: + +Riemann event emitted by Telegraf with default configuration: +``` +#riemann.codec.Event{ +:host "postgresql-1e612b44-e92f-4d27-9f30-5e2f53947870", :state nil, :description nil, :ttl 30.0, +:service "disk/used_percent", :metric 73.16736001949994, :path "/boot", :fstype "ext4", :time 1475605021} +``` + +Telegraf emitting the same Riemann event with `measurement_as_attribute` set to `true`: +``` +#riemann.codec.Event{ ... +:measurement "disk", :service "used_percent", :metric 73.16736001949994, +... :time 1475605021} +``` + +Telegraf emitting the same Riemann event with additional Riemann tags defined: +``` +#riemann.codec.Event{ +:host "postgresql-1e612b44-e92f-4d27-9f30-5e2f53947870", :state nil, :description nil, :ttl 30.0, +:service "disk/used_percent", :metric 73.16736001949994, :path "/boot", :fstype "ext4", :time 1475605021, +:tags ["telegraf" "postgres_cluster"]} +``` + +Telegraf emitting a Riemann event with a status text and `string_as_state` set to `true`, and a `description_text` defined: +``` +#riemann.codec.Event{ +:host "postgresql-1e612b44-e92f-4d27-9f30-5e2f53947870", :state "Running", :ttl 30.0, +:description "PostgreSQL master node is up and running", +:service "status", :time 1475605021} +``` diff --git a/plugins/outputs/riemann/riemann.go b/plugins/outputs/riemann/riemann.go index fa150e097..25cf3011a 100644 --- a/plugins/outputs/riemann/riemann.go +++ b/plugins/outputs/riemann/riemann.go @@ -3,6 +3,7 @@ package riemann import ( "fmt" "log" + "net/url" "os" "sort" "strings" @@ -12,44 +13,70 @@ import ( "github.com/influxdata/telegraf/plugins/outputs" ) -const deprecationMsg = "I! WARNING: this Riemann output plugin will be deprecated in a future release, see https://github.com/influxdata/telegraf/issues/1878 for more details & discussion." - type Riemann struct { - URL string - Transport string - Separator string + URL string + TTL float32 + Separator string + MeasurementAsAttribute bool + StringAsState bool + TagKeys []string + Tags []string + DescriptionText string client *raidman.Client } var sampleConfig = ` - ## URL of server - url = "localhost:5555" - ## transport protocol to use either tcp or udp - transport = "tcp" - ## separator to use between input name and field name in Riemann service name - separator = " " + ## The full TCP or UDP URL of the Riemann server + url = "tcp://localhost:5555" + + ## Riemann event TTL, floating-point time in seconds. + ## Defines how long that an event is considered valid for in Riemann + # ttl = 30.0 + + ## Separator to use between measurement and field name in Riemann service name + ## This does not have any effect if 'measurement_as_attribute' is set to 'true' + separator = "/" + + ## Set measurement name as Riemann attribute 'measurement', instead of prepending it to the Riemann service name + # measurement_as_attribute = false + + ## Send string metrics as Riemann event states. + ## Unless enabled all string metrics will be ignored + # string_as_state = false + + ## A list of tag keys whose values get sent as Riemann tags. + ## If empty, all Telegraf tag values will be sent as tags + # tag_keys = ["telegraf","custom_tag"] + + ## Additional Riemann tags to send. + # tags = ["telegraf-output"] + + ## Description for Riemann event + # description_text = "metrics collected from telegraf" ` func (r *Riemann) Connect() error { - log.Printf(deprecationMsg) - c, err := raidman.Dial(r.Transport, r.URL) + parsed_url, err := url.Parse(r.URL) + if err != nil { + return err + } + client, err := raidman.Dial(parsed_url.Scheme, parsed_url.Host) if err != nil { r.client = nil return err } - r.client = c + r.client = client return nil } func (r *Riemann) Close() error { - if r.client == nil { - return nil + if r.client != nil { + r.client.Close() + r.client = nil } - r.client.Close() - r.client = nil return nil } @@ -62,91 +89,125 @@ func (r *Riemann) Description() string { } func (r *Riemann) Write(metrics []telegraf.Metric) error { - log.Printf(deprecationMsg) if len(metrics) == 0 { return nil } if r.client == nil { - err := r.Connect() - if err != nil { - return fmt.Errorf("FAILED to (re)connect to Riemann. Error: %s\n", err) + if err := r.Connect(); err != nil { + return fmt.Errorf("Failed to (re)connect to Riemann: %s", err.Error()) } } + // build list of Riemann events to send var events []*raidman.Event - for _, p := range metrics { - evs := buildEvents(p, r.Separator) + for _, m := range metrics { + evs := r.buildRiemannEvents(m) for _, ev := range evs { events = append(events, ev) } } - var senderr = r.client.SendMulti(events) - if senderr != nil { - r.Close() // always retuns nil - return fmt.Errorf("FAILED to send riemann message (will try to reconnect). Error: %s\n", - senderr) + if err := r.client.SendMulti(events); err != nil { + r.Close() + return fmt.Errorf("Failed to send riemann message: %s", err) } - return nil } -func buildEvents(p telegraf.Metric, s string) []*raidman.Event { +func (r *Riemann) buildRiemannEvents(m telegraf.Metric) []*raidman.Event { events := []*raidman.Event{} - for fieldName, value := range p.Fields() { - host, ok := p.Tags()["host"] + for fieldName, value := range m.Fields() { + // get host for Riemann event + host, ok := m.Tags()["host"] if !ok { - hostname, err := os.Hostname() - if err != nil { - host = "unknown" - } else { + if hostname, err := os.Hostname(); err == nil { host = hostname + } else { + host = "unknown" } } event := &raidman.Event{ - Host: host, - Service: serviceName(s, p.Name(), p.Tags(), fieldName), + Host: host, + Ttl: r.TTL, + Description: r.DescriptionText, + Time: m.Time().Unix(), + + Attributes: r.attributes(m.Name(), m.Tags()), + Service: r.service(m.Name(), fieldName), + Tags: r.tags(m.Tags()), } switch value.(type) { case string: + // only send string metrics if explicitly enabled, skip otherwise + if !r.StringAsState { + log.Printf("D! Riemann event states disabled, skipping metric value [%s]\n", value) + continue + } event.State = value.(string) - default: + case int, int64, uint64, float32, float64: event.Metric = value + default: + log.Printf("D! Riemann does not support metric value [%s]\n", value) + continue } events = append(events, event) } - return events } -func serviceName(s string, n string, t map[string]string, f string) string { - serviceStrings := []string{} - serviceStrings = append(serviceStrings, n) - - // we'll skip the 'host' tag - tagStrings := []string{} - tagNames := []string{} - - for tagName := range t { - tagNames = append(tagNames, tagName) +func (r *Riemann) attributes(name string, tags map[string]string) map[string]string { + if r.MeasurementAsAttribute { + tags["measurement"] = name } - sort.Strings(tagNames) - for _, tagName := range tagNames { - if tagName != "host" { - tagStrings = append(tagStrings, t[tagName]) + delete(tags, "host") // exclude 'host' tag + return tags +} + +func (r *Riemann) service(name string, field string) string { + var serviceStrings []string + + // if measurement is not enabled as an attribute then prepend it to service name + if !r.MeasurementAsAttribute { + serviceStrings = append(serviceStrings, name) + } + serviceStrings = append(serviceStrings, field) + + return strings.Join(serviceStrings, r.Separator) +} + +func (r *Riemann) tags(tags map[string]string) []string { + // always add specified Riemann tags + values := r.Tags + + // if tag_keys are specified, add those and return tag list + if len(r.TagKeys) > 0 { + for _, tagName := range r.TagKeys { + value, ok := tags[tagName] + if ok { + values = append(values, value) + } + } + return values + } + + // otherwise add all values from telegraf tag key/value pairs + var keys []string + for key := range tags { + keys = append(keys, key) + } + sort.Strings(keys) + + for _, key := range keys { + if key != "host" { // exclude 'host' tag + values = append(values, tags[key]) } } - var tagString string = strings.Join(tagStrings, s) - if tagString != "" { - serviceStrings = append(serviceStrings, tagString) - } - serviceStrings = append(serviceStrings, f) - return strings.Join(serviceStrings, s) + return values } func init() { diff --git a/plugins/outputs/riemann/riemann_test.go b/plugins/outputs/riemann/riemann_test.go index b599cdf60..10f89e786 100644 --- a/plugins/outputs/riemann/riemann_test.go +++ b/plugins/outputs/riemann/riemann_test.go @@ -1,22 +1,180 @@ package riemann import ( + "fmt" "testing" + "time" + "github.com/amir/raidman" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) +func TestAttributes(t *testing.T) { + tags := map[string]string{"tag1": "value1", "tag2": "value2"} + + r := &Riemann{} + require.Equal(t, + map[string]string{"tag1": "value1", "tag2": "value2"}, + r.attributes("test", tags)) + + // enable measurement as attribute, should now be included + r.MeasurementAsAttribute = true + require.Equal(t, + map[string]string{"tag1": "value1", "tag2": "value2", "measurement": "test"}, + r.attributes("test", tags)) +} + +func TestService(t *testing.T) { + r := &Riemann{ + Separator: "/", + } + require.Equal(t, "test/value", r.service("test", "value")) + + // enable measurement as attribute, should not be part of service name anymore + r.MeasurementAsAttribute = true + require.Equal(t, "value", r.service("test", "value")) +} + +func TestTags(t *testing.T) { + tags := map[string]string{"tag1": "value1", "tag2": "value2"} + + // all tag values plus additional tag should be present + r := &Riemann{ + Tags: []string{"test"}, + } + require.Equal(t, + []string{"test", "value1", "value2"}, + r.tags(tags)) + + // only tag2 value plus additional tag should be present + r.TagKeys = []string{"tag2"} + require.Equal(t, + []string{"test", "value2"}, + r.tags(tags)) + + // only tag1 value should be present + r.Tags = nil + r.TagKeys = []string{"tag1"} + require.Equal(t, + []string{"value1"}, + r.tags(tags)) +} + +func TestMetricEvents(t *testing.T) { + r := &Riemann{ + TTL: 20.0, + Separator: "/", + MeasurementAsAttribute: false, + DescriptionText: "metrics from telegraf", + Tags: []string{"telegraf"}, + } + + // build a single event + metric, _ := telegraf.NewMetric( + "test1", + map[string]string{"tag1": "value1", "host": "abc123"}, + map[string]interface{}{"value": 5.6}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + events := r.buildRiemannEvents(metric) + require.Len(t, events, 1) + + // is event as expected? + expectedEvent := &raidman.Event{ + Ttl: 20.0, + Time: 1257894000, + Tags: []string{"telegraf", "value1"}, + Host: "abc123", + State: "", + Service: "test1/value", + Metric: 5.6, + Description: "metrics from telegraf", + Attributes: map[string]string{"tag1": "value1"}, + } + require.Equal(t, expectedEvent, events[0]) + + // build 2 events + metric, _ = telegraf.NewMetric( + "test2", + map[string]string{"host": "xyz987"}, + map[string]interface{}{"point": 1}, + time.Date(2012, time.November, 2, 3, 0, 0, 0, time.UTC), + ) + + events = append(events, r.buildRiemannEvents(metric)...) + require.Len(t, events, 2) + + // first event should still be the same + require.Equal(t, expectedEvent, events[0]) + + // second event + expectedEvent = &raidman.Event{ + Ttl: 20.0, + Time: 1351825200, + Tags: []string{"telegraf"}, + Host: "xyz987", + State: "", + Service: "test2/point", + Metric: int64(1), + Description: "metrics from telegraf", + Attributes: map[string]string{}, + } + require.Equal(t, expectedEvent, events[1]) +} + +func TestStateEvents(t *testing.T) { + r := &Riemann{ + MeasurementAsAttribute: true, + } + + // string metrics will be skipped unless explicitly enabled + metric, _ := telegraf.NewMetric( + "test", + map[string]string{"host": "host"}, + map[string]interface{}{"value": "running"}, + time.Date(2015, time.November, 9, 22, 0, 0, 0, time.UTC), + ) + + events := r.buildRiemannEvents(metric) + // no event should be present + require.Len(t, events, 0) + + // enable string metrics as event states + r.StringAsState = true + events = r.buildRiemannEvents(metric) + require.Len(t, events, 1) + + // is event as expected? + expectedEvent := &raidman.Event{ + Ttl: 0, + Time: 1447106400, + Tags: nil, + Host: "host", + State: "running", + Service: "value", + Metric: nil, + Description: "", + Attributes: map[string]string{"measurement": "test"}, + } + require.Equal(t, expectedEvent, events[0]) +} + func TestConnectAndWrite(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } - url := testutil.GetLocalHost() + ":5555" - r := &Riemann{ - URL: url, - Transport: "tcp", + URL: fmt.Sprintf("tcp://%s:5555", testutil.GetLocalHost()), + TTL: 15.0, + Separator: "/", + MeasurementAsAttribute: false, + StringAsState: true, + DescriptionText: "metrics from telegraf", + Tags: []string{"docker"}, } err := r.Connect() @@ -24,4 +182,32 @@ func TestConnectAndWrite(t *testing.T) { err = r.Write(testutil.MockMetrics()) require.NoError(t, err) + + metrics := make([]telegraf.Metric, 0) + metrics = append(metrics, testutil.TestMetric(2)) + metrics = append(metrics, testutil.TestMetric(3.456789)) + metrics = append(metrics, testutil.TestMetric(uint(0))) + metrics = append(metrics, testutil.TestMetric("ok")) + metrics = append(metrics, testutil.TestMetric("running")) + err = r.Write(metrics) + require.NoError(t, err) + + time.Sleep(200 * time.Millisecond) + + // are there any "docker" tagged events in Riemann? + events, err := r.client.Query(`tagged "docker"`) + require.NoError(t, err) + require.NotZero(t, len(events)) + + // get Riemann events with state = "running", should be 1 event + events, err = r.client.Query(`state = "running"`) + require.NoError(t, err) + require.Len(t, events, 1) + + // is event as expected? + require.Equal(t, []string{"docker", "value1"}, events[0].Tags) + require.Equal(t, "running", events[0].State) + require.Equal(t, "test1/value", events[0].Service) + require.Equal(t, "metrics from telegraf", events[0].Description) + require.Equal(t, map[string]string{"tag1": "value1"}, events[0].Attributes) } diff --git a/plugins/outputs/riemann_legacy/riemann.go b/plugins/outputs/riemann_legacy/riemann.go new file mode 100644 index 000000000..69de7f521 --- /dev/null +++ b/plugins/outputs/riemann_legacy/riemann.go @@ -0,0 +1,156 @@ +package riemann_legacy + +import ( + "fmt" + "log" + "os" + "sort" + "strings" + + "github.com/amir/raidman" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" +) + +const deprecationMsg = "E! Error: this Riemann output plugin will be deprecated in a future release, see https://github.com/influxdata/telegraf/issues/1878 for more details & discussion." + +type Riemann struct { + URL string + Transport string + Separator string + + client *raidman.Client +} + +var sampleConfig = ` + ## URL of server + url = "localhost:5555" + ## transport protocol to use either tcp or udp + transport = "tcp" + ## separator to use between input name and field name in Riemann service name + separator = " " +` + +func (r *Riemann) Connect() error { + log.Printf(deprecationMsg) + c, err := raidman.Dial(r.Transport, r.URL) + + if err != nil { + r.client = nil + return err + } + + r.client = c + return nil +} + +func (r *Riemann) Close() error { + if r.client == nil { + return nil + } + r.client.Close() + r.client = nil + return nil +} + +func (r *Riemann) SampleConfig() string { + return sampleConfig +} + +func (r *Riemann) Description() string { + return "Configuration for the Riemann server to send metrics to" +} + +func (r *Riemann) Write(metrics []telegraf.Metric) error { + log.Printf(deprecationMsg) + if len(metrics) == 0 { + return nil + } + + if r.client == nil { + err := r.Connect() + if err != nil { + return fmt.Errorf("FAILED to (re)connect to Riemann. Error: %s\n", err) + } + } + + var events []*raidman.Event + for _, p := range metrics { + evs := buildEvents(p, r.Separator) + for _, ev := range evs { + events = append(events, ev) + } + } + + var senderr = r.client.SendMulti(events) + if senderr != nil { + r.Close() // always retuns nil + return fmt.Errorf("FAILED to send riemann message (will try to reconnect). Error: %s\n", + senderr) + } + + return nil +} + +func buildEvents(p telegraf.Metric, s string) []*raidman.Event { + events := []*raidman.Event{} + for fieldName, value := range p.Fields() { + host, ok := p.Tags()["host"] + if !ok { + hostname, err := os.Hostname() + if err != nil { + host = "unknown" + } else { + host = hostname + } + } + + event := &raidman.Event{ + Host: host, + Service: serviceName(s, p.Name(), p.Tags(), fieldName), + } + + switch value.(type) { + case string: + event.State = value.(string) + default: + event.Metric = value + } + + events = append(events, event) + } + + return events +} + +func serviceName(s string, n string, t map[string]string, f string) string { + serviceStrings := []string{} + serviceStrings = append(serviceStrings, n) + + // we'll skip the 'host' tag + tagStrings := []string{} + tagNames := []string{} + + for tagName := range t { + tagNames = append(tagNames, tagName) + } + sort.Strings(tagNames) + + for _, tagName := range tagNames { + if tagName != "host" { + tagStrings = append(tagStrings, t[tagName]) + } + } + var tagString string = strings.Join(tagStrings, s) + if tagString != "" { + serviceStrings = append(serviceStrings, tagString) + } + serviceStrings = append(serviceStrings, f) + return strings.Join(serviceStrings, s) +} + +func init() { + outputs.Add("riemann_legacy", func() telegraf.Output { + return &Riemann{} + }) +} diff --git a/plugins/outputs/riemann_legacy/riemann_test.go b/plugins/outputs/riemann_legacy/riemann_test.go new file mode 100644 index 000000000..e57cbb43c --- /dev/null +++ b/plugins/outputs/riemann_legacy/riemann_test.go @@ -0,0 +1,27 @@ +package riemann_legacy + +import ( + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestConnectAndWrite(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + url := testutil.GetLocalHost() + ":5555" + + r := &Riemann{ + URL: url, + Transport: "tcp", + } + + err := r.Connect() + require.NoError(t, err) + + err = r.Write(testutil.MockMetrics()) + require.NoError(t, err) +}