From 6eea89f4c0d9786be05ef3751d6541d028122d5f Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Fri, 15 Jan 2016 16:27:24 -0700 Subject: [PATCH] Make NSQ plugin compatible with version 0.10.0 --- CHANGELOG.md | 1 + plugins/inputs/all/all.go | 1 + plugins/inputs/nsq/nsq.go | 109 +++++++++++++----------- plugins/inputs/nsq/nsq_test.go | 149 ++++++++++++++++++++++++--------- 4 files changed, 172 insertions(+), 88 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 18a68ebf7..0be1ec417 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - [#475](https://github.com/influxdata/telegraf/pull/475): Add response time to httpjson plugin. Thanks @titilambert! - [#519](https://github.com/influxdata/telegraf/pull/519): Added a sensors input based on lm-sensors. Thanks @md14454! - [#467](https://github.com/influxdata/telegraf/issues/467): Add option to disable statsd measurement name conversion. +- [#534](https://github.com/influxdata/telegraf/pull/534): NSQ input plugin. Thanks @allingeek! ### Bugfixes - [#506](https://github.com/influxdb/telegraf/pull/506): Ping input doesn't return response time metric when timeout. Thanks @titilambert! diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index b6b1e74da..b4c8553c3 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -19,6 +19,7 @@ import ( _ "github.com/influxdb/telegraf/plugins/inputs/mongodb" _ "github.com/influxdb/telegraf/plugins/inputs/mysql" _ "github.com/influxdb/telegraf/plugins/inputs/nginx" + _ "github.com/influxdb/telegraf/plugins/inputs/nsq" _ "github.com/influxdb/telegraf/plugins/inputs/phpfpm" _ "github.com/influxdb/telegraf/plugins/inputs/ping" _ "github.com/influxdb/telegraf/plugins/inputs/postgresql" diff --git a/plugins/inputs/nsq/nsq.go b/plugins/inputs/nsq/nsq.go index 678ea8be7..48a709a37 100644 --- a/plugins/inputs/nsq/nsq.go +++ b/plugins/inputs/nsq/nsq.go @@ -31,7 +31,7 @@ import ( "sync" "time" - "github.com/influxdb/telegraf/plugins" + "github.com/influxdb/telegraf/plugins/inputs" ) // Might add Lookupd endpoints for cluster discovery @@ -41,7 +41,7 @@ type NSQ struct { var sampleConfig = ` # An array of NSQD HTTP API endpoints - endpoints = ["http://localhost:4151","http://otherhost:4151"] + endpoints = ["http://localhost:4151"] ` const ( @@ -49,7 +49,7 @@ const ( ) func init() { - plugins.Add("nsq", func() plugins.Plugin { + inputs.Add("nsq", func() inputs.Input { return &NSQ{} }) } @@ -62,7 +62,7 @@ func (n *NSQ) Description() string { return "Read NSQ topic and channel statistics." } -func (n *NSQ) Gather(acc plugins.Accumulator) error { +func (n *NSQ) Gather(acc inputs.Accumulator) error { var wg sync.WaitGroup var outerr error @@ -85,7 +85,7 @@ var tr = &http.Transport{ var client = &http.Client{Transport: tr} -func (n *NSQ) gatherEndpoint(e string, acc plugins.Accumulator) error { +func (n *NSQ) gatherEndpoint(e string, acc inputs.Accumulator) error { u, err := buildURL(e) if err != nil { return err @@ -111,13 +111,15 @@ func (n *NSQ) gatherEndpoint(e string, acc plugins.Accumulator) error { `server_version`: s.Data.Version, } + fields := make(map[string]interface{}) if s.Data.Health == `OK` { - acc.Add(`nsq_server_count`, int64(1), tags) + fields["server_count"] = int64(1) } else { - acc.Add(`nsq_server_count`, int64(0), tags) + fields["server_count"] = int64(0) } + fields["topic_count"] = int64(len(s.Data.Topics)) - acc.Add(`nsq_server_topic_count`, int64(len(s.Data.Topics)), tags) + acc.AddFields("nsq_server", fields, tags) for _, t := range s.Data.Topics { topicStats(t, acc, u.Host, s.Data.Version) } @@ -134,68 +136,77 @@ func buildURL(e string) (*url.URL, error) { return addr, nil } -func topicStats(t TopicStats, acc plugins.Accumulator, host, version string) { - +func topicStats(t TopicStats, acc inputs.Accumulator, host, version string) { // per topic overall (tag: name, paused, channel count) tags := map[string]string{ - `server_host`: host, - `server_version`: version, - `topic`: t.Name, + "server_host": host, + "server_version": version, + "topic": t.Name, } - acc.Add(`nsq_topic_depth`, t.Depth, tags) - acc.Add(`nsq_topic_backend_depth`, t.BackendDepth, tags) - acc.Add(`nsq_topic_message_count`, t.MessageCount, tags) + fields := map[string]interface{}{ + "depth": t.Depth, + "backend_depth": t.BackendDepth, + "message_count": t.MessageCount, + "channel_count": int64(len(t.Channels)), + } + acc.AddFields("nsq_topic", fields, tags) - acc.Add(`nsq_topic_channel_count`, int64(len(t.Channels)), tags) for _, c := range t.Channels { channelStats(c, acc, host, version, t.Name) } } -func channelStats(c ChannelStats, acc plugins.Accumulator, host, version, topic string) { +func channelStats(c ChannelStats, acc inputs.Accumulator, host, version, topic string) { tags := map[string]string{ - `server_host`: host, - `server_version`: version, - `topic`: topic, - `channel`: c.Name, + "server_host": host, + "server_version": version, + "topic": topic, + "channel": c.Name, } - acc.Add("nsq_channel_depth", c.Depth, tags) - acc.Add("nsq_channel_backend_depth", c.BackendDepth, tags) - acc.Add("nsq_channel_inflight_count", c.InFlightCount, tags) - acc.Add("nsq_channel_deferred_count", c.DeferredCount, tags) - acc.Add("nsq_channel_message_count", c.MessageCount, tags) - acc.Add("nsq_channel_requeue_count", c.RequeueCount, tags) - acc.Add("nsq_channel_timeout_count", c.TimeoutCount, tags) + fields := map[string]interface{}{ + "depth": c.Depth, + "backend_depth": c.BackendDepth, + "inflight_count": c.InFlightCount, + "deferred_count": c.DeferredCount, + "message_count": c.MessageCount, + "requeue_count": c.RequeueCount, + "timeout_count": c.TimeoutCount, + "client_count": int64(len(c.Clients)), + } - acc.Add("nsq_channel_client_count", int64(len(c.Clients)), tags) + acc.AddFields("nsq_channel", fields, tags) for _, cl := range c.Clients { clientStats(cl, acc, host, version, topic, c.Name) } } -func clientStats(c ClientStats, acc plugins.Accumulator, host, version, topic, channel string) { +func clientStats(c ClientStats, acc inputs.Accumulator, host, version, topic, channel string) { tags := map[string]string{ - `server_host`: host, - `server_version`: version, - `topic`: topic, - `channel`: channel, - `client_name`: c.Name, - `client_id`: c.ID, - `client_hostname`: c.Hostname, - `client_version`: c.Version, - `client_address`: c.RemoteAddress, - `client_user_agent`: c.UserAgent, - `client_tls`: strconv.FormatBool(c.TLS), - `client_snappy`: strconv.FormatBool(c.Snappy), - `client_deflate`: strconv.FormatBool(c.Deflate), + "server_host": host, + "server_version": version, + "topic": topic, + "channel": channel, + "client_name": c.Name, + "client_id": c.ID, + "client_hostname": c.Hostname, + "client_version": c.Version, + "client_address": c.RemoteAddress, + "client_user_agent": c.UserAgent, + "client_tls": strconv.FormatBool(c.TLS), + "client_snappy": strconv.FormatBool(c.Snappy), + "client_deflate": strconv.FormatBool(c.Deflate), } - acc.Add("nsq_client_ready_count", c.ReadyCount, tags) - acc.Add("nsq_client_inflight_count", c.InFlightCount, tags) - acc.Add("nsq_client_message_count", c.MessageCount, tags) - acc.Add("nsq_client_finish_count", c.FinishCount, tags) - acc.Add("nsq_client_requeue_count", c.RequeueCount, tags) + + fields := map[string]interface{}{ + "ready_count": c.ReadyCount, + "inflight_count": c.InFlightCount, + "message_count": c.MessageCount, + "finish_count": c.FinishCount, + "requeue_count": c.RequeueCount, + } + acc.AddFields("nsq_client", fields, tags) } type NSQStats struct { diff --git a/plugins/inputs/nsq/nsq_test.go b/plugins/inputs/nsq/nsq_test.go index 44a205c08..fc34a710b 100644 --- a/plugins/inputs/nsq/nsq_test.go +++ b/plugins/inputs/nsq/nsq_test.go @@ -9,7 +9,6 @@ import ( "github.com/influxdb/telegraf/testutil" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -35,49 +34,121 @@ func TestNSQStats(t *testing.T) { // actually validate the tests tests := []struct { m string - v int64 + f map[string]interface{} g map[string]string }{ - {`nsq_server_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`}}, - {`nsq_server_topic_count`, int64(2), map[string]string{`server_host`: host, `server_version`: `0.3.6`}}, - {`nsq_topic_depth`, int64(12), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`}}, - {`nsq_topic_backend_depth`, int64(13), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`}}, - {`nsq_topic_message_count`, int64(14), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`}}, - {`nsq_topic_channel_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`}}, - {`nsq_channel_depth`, int64(0), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}}, - {`nsq_channel_backend_depth`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}}, - {`nsq_channel_inflight_count`, int64(2), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}}, - {`nsq_channel_deferred_count`, int64(3), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}}, - {`nsq_channel_message_count`, int64(4), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}}, - {`nsq_channel_requeue_count`, int64(5), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}}, - {`nsq_channel_timeout_count`, int64(6), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}}, - {`nsq_channel_client_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}}, - {`nsq_client_ready_count`, int64(200), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}}, - {`nsq_client_inflight_count`, int64(7), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}}, - {`nsq_client_message_count`, int64(8), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}}, - {`nsq_client_finish_count`, int64(9), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}}, - {`nsq_client_requeue_count`, int64(10), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}}, - {`nsq_topic_depth`, int64(28), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`}}, - {`nsq_topic_backend_depth`, int64(29), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`}}, - {`nsq_topic_message_count`, int64(30), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`}}, - {`nsq_topic_channel_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`}}, - {`nsq_channel_depth`, int64(15), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}}, - {`nsq_channel_backend_depth`, int64(16), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}}, - {`nsq_channel_inflight_count`, int64(17), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}}, - {`nsq_channel_deferred_count`, int64(18), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}}, - {`nsq_channel_message_count`, int64(19), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}}, - {`nsq_channel_requeue_count`, int64(20), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}}, - {`nsq_channel_timeout_count`, int64(21), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}}, - {`nsq_channel_client_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}}, - {`nsq_client_ready_count`, int64(22), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}}, - {`nsq_client_inflight_count`, int64(23), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}}, - {`nsq_client_message_count`, int64(24), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}}, - {`nsq_client_finish_count`, int64(25), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}}, - {`nsq_client_requeue_count`, int64(26), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}}, + { + "nsq_server", + map[string]interface{}{ + "server_count": int64(1), + "topic_count": int64(2), + }, + map[string]string{ + "server_host": host, + "server_version": "0.3.6", + }, + }, + { + "nsq_topic", + map[string]interface{}{ + "depth": int64(12), + "backend_depth": int64(13), + "message_count": int64(14), + "channel_count": int64(1), + }, + map[string]string{ + "server_host": host, + "server_version": "0.3.6", + "topic": "t1"}, + }, + { + "nsq_channel", + map[string]interface{}{ + "depth": int64(0), + "backend_depth": int64(1), + "inflight_count": int64(2), + "deferred_count": int64(3), + "message_count": int64(4), + "requeue_count": int64(5), + "timeout_count": int64(6), + "client_count": int64(1), + }, + map[string]string{ + "server_host": host, + "server_version": "0.3.6", + "topic": "t1", + "channel": "c1", + }, + }, + { + "nsq_client", + map[string]interface{}{ + "ready_count": int64(200), + "inflight_count": int64(7), + "message_count": int64(8), + "finish_count": int64(9), + "requeue_count": int64(10), + }, + map[string]string{"server_host": host, "server_version": "0.3.6", + "topic": "t1", "channel": "c1", "client_name": "373a715cd990", + "client_id": "373a715cd990", "client_hostname": "373a715cd990", + "client_version": "V2", "client_address": "172.17.0.11:35560", + "client_tls": "false", "client_snappy": "false", + "client_deflate": "false", + "client_user_agent": "nsq_to_nsq/0.3.6 go-nsq/1.0.5"}, + }, + { + "nsq_topic", + map[string]interface{}{ + "depth": int64(28), + "backend_depth": int64(29), + "message_count": int64(30), + "channel_count": int64(1), + }, + map[string]string{ + "server_host": host, + "server_version": "0.3.6", + "topic": "t2"}, + }, + { + "nsq_channel", + map[string]interface{}{ + "depth": int64(15), + "backend_depth": int64(16), + "inflight_count": int64(17), + "deferred_count": int64(18), + "message_count": int64(19), + "requeue_count": int64(20), + "timeout_count": int64(21), + "client_count": int64(1), + }, + map[string]string{ + "server_host": host, + "server_version": "0.3.6", + "topic": "t2", + "channel": "c2", + }, + }, + { + "nsq_client", + map[string]interface{}{ + "ready_count": int64(22), + "inflight_count": int64(23), + "message_count": int64(24), + "finish_count": int64(25), + "requeue_count": int64(26), + }, + map[string]string{"server_host": host, "server_version": "0.3.6", + "topic": "t2", "channel": "c2", "client_name": "377569bd462b", + "client_id": "377569bd462b", "client_hostname": "377569bd462b", + "client_version": "V2", "client_address": "172.17.0.8:48145", + "client_user_agent": "go-nsq/1.0.5", "client_tls": "true", + "client_snappy": "true", "client_deflate": "true"}, + }, } for _, test := range tests { - assert.True(t, acc.CheckTaggedValue(test.m, test.v, test.g), "Failed expectation: (\"%v\", \"%v\", \"%v\")", test.m, test.v, fmt.Sprint(test.g)) + acc.AssertContainsTaggedFields(t, test.m, test.f, test.g) } }