Make NSQ plugin compatible with version 0.10.0

This commit is contained in:
Cameron Sparr 2016-01-15 16:27:24 -07:00
parent dbbb2d9877
commit 6eea89f4c0
4 changed files with 172 additions and 88 deletions

View File

@ -6,6 +6,7 @@
- [#475](https://github.com/influxdata/telegraf/pull/475): Add response time to httpjson plugin. Thanks @titilambert! - [#475](https://github.com/influxdata/telegraf/pull/475): Add response time to httpjson plugin. Thanks @titilambert!
- [#519](https://github.com/influxdata/telegraf/pull/519): Added a sensors input based on lm-sensors. Thanks @md14454! - [#519](https://github.com/influxdata/telegraf/pull/519): Added a sensors input based on lm-sensors. Thanks @md14454!
- [#467](https://github.com/influxdata/telegraf/issues/467): Add option to disable statsd measurement name conversion. - [#467](https://github.com/influxdata/telegraf/issues/467): Add option to disable statsd measurement name conversion.
- [#534](https://github.com/influxdata/telegraf/pull/534): NSQ input plugin. Thanks @allingeek!
### Bugfixes ### Bugfixes
- [#506](https://github.com/influxdb/telegraf/pull/506): Ping input doesn't return response time metric when timeout. Thanks @titilambert! - [#506](https://github.com/influxdb/telegraf/pull/506): Ping input doesn't return response time metric when timeout. Thanks @titilambert!

View File

@ -19,6 +19,7 @@ import (
_ "github.com/influxdb/telegraf/plugins/inputs/mongodb" _ "github.com/influxdb/telegraf/plugins/inputs/mongodb"
_ "github.com/influxdb/telegraf/plugins/inputs/mysql" _ "github.com/influxdb/telegraf/plugins/inputs/mysql"
_ "github.com/influxdb/telegraf/plugins/inputs/nginx" _ "github.com/influxdb/telegraf/plugins/inputs/nginx"
_ "github.com/influxdb/telegraf/plugins/inputs/nsq"
_ "github.com/influxdb/telegraf/plugins/inputs/phpfpm" _ "github.com/influxdb/telegraf/plugins/inputs/phpfpm"
_ "github.com/influxdb/telegraf/plugins/inputs/ping" _ "github.com/influxdb/telegraf/plugins/inputs/ping"
_ "github.com/influxdb/telegraf/plugins/inputs/postgresql" _ "github.com/influxdb/telegraf/plugins/inputs/postgresql"

View File

@ -31,7 +31,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/influxdb/telegraf/plugins" "github.com/influxdb/telegraf/plugins/inputs"
) )
// Might add Lookupd endpoints for cluster discovery // Might add Lookupd endpoints for cluster discovery
@ -41,7 +41,7 @@ type NSQ struct {
var sampleConfig = ` var sampleConfig = `
# An array of NSQD HTTP API endpoints # An array of NSQD HTTP API endpoints
endpoints = ["http://localhost:4151","http://otherhost:4151"] endpoints = ["http://localhost:4151"]
` `
const ( const (
@ -49,7 +49,7 @@ const (
) )
func init() { func init() {
plugins.Add("nsq", func() plugins.Plugin { inputs.Add("nsq", func() inputs.Input {
return &NSQ{} return &NSQ{}
}) })
} }
@ -62,7 +62,7 @@ func (n *NSQ) Description() string {
return "Read NSQ topic and channel statistics." return "Read NSQ topic and channel statistics."
} }
func (n *NSQ) Gather(acc plugins.Accumulator) error { func (n *NSQ) Gather(acc inputs.Accumulator) error {
var wg sync.WaitGroup var wg sync.WaitGroup
var outerr error var outerr error
@ -85,7 +85,7 @@ var tr = &http.Transport{
var client = &http.Client{Transport: tr} var client = &http.Client{Transport: tr}
func (n *NSQ) gatherEndpoint(e string, acc plugins.Accumulator) error { func (n *NSQ) gatherEndpoint(e string, acc inputs.Accumulator) error {
u, err := buildURL(e) u, err := buildURL(e)
if err != nil { if err != nil {
return err return err
@ -111,13 +111,15 @@ func (n *NSQ) gatherEndpoint(e string, acc plugins.Accumulator) error {
`server_version`: s.Data.Version, `server_version`: s.Data.Version,
} }
fields := make(map[string]interface{})
if s.Data.Health == `OK` { if s.Data.Health == `OK` {
acc.Add(`nsq_server_count`, int64(1), tags) fields["server_count"] = int64(1)
} else { } else {
acc.Add(`nsq_server_count`, int64(0), tags) fields["server_count"] = int64(0)
} }
fields["topic_count"] = int64(len(s.Data.Topics))
acc.Add(`nsq_server_topic_count`, int64(len(s.Data.Topics)), tags) acc.AddFields("nsq_server", fields, tags)
for _, t := range s.Data.Topics { for _, t := range s.Data.Topics {
topicStats(t, acc, u.Host, s.Data.Version) topicStats(t, acc, u.Host, s.Data.Version)
} }
@ -134,68 +136,77 @@ func buildURL(e string) (*url.URL, error) {
return addr, nil return addr, nil
} }
func topicStats(t TopicStats, acc plugins.Accumulator, host, version string) { func topicStats(t TopicStats, acc inputs.Accumulator, host, version string) {
// per topic overall (tag: name, paused, channel count) // per topic overall (tag: name, paused, channel count)
tags := map[string]string{ tags := map[string]string{
`server_host`: host, "server_host": host,
`server_version`: version, "server_version": version,
`topic`: t.Name, "topic": t.Name,
} }
acc.Add(`nsq_topic_depth`, t.Depth, tags) fields := map[string]interface{}{
acc.Add(`nsq_topic_backend_depth`, t.BackendDepth, tags) "depth": t.Depth,
acc.Add(`nsq_topic_message_count`, t.MessageCount, tags) "backend_depth": t.BackendDepth,
"message_count": t.MessageCount,
"channel_count": int64(len(t.Channels)),
}
acc.AddFields("nsq_topic", fields, tags)
acc.Add(`nsq_topic_channel_count`, int64(len(t.Channels)), tags)
for _, c := range t.Channels { for _, c := range t.Channels {
channelStats(c, acc, host, version, t.Name) channelStats(c, acc, host, version, t.Name)
} }
} }
func channelStats(c ChannelStats, acc plugins.Accumulator, host, version, topic string) { func channelStats(c ChannelStats, acc inputs.Accumulator, host, version, topic string) {
tags := map[string]string{ tags := map[string]string{
`server_host`: host, "server_host": host,
`server_version`: version, "server_version": version,
`topic`: topic, "topic": topic,
`channel`: c.Name, "channel": c.Name,
} }
acc.Add("nsq_channel_depth", c.Depth, tags) fields := map[string]interface{}{
acc.Add("nsq_channel_backend_depth", c.BackendDepth, tags) "depth": c.Depth,
acc.Add("nsq_channel_inflight_count", c.InFlightCount, tags) "backend_depth": c.BackendDepth,
acc.Add("nsq_channel_deferred_count", c.DeferredCount, tags) "inflight_count": c.InFlightCount,
acc.Add("nsq_channel_message_count", c.MessageCount, tags) "deferred_count": c.DeferredCount,
acc.Add("nsq_channel_requeue_count", c.RequeueCount, tags) "message_count": c.MessageCount,
acc.Add("nsq_channel_timeout_count", c.TimeoutCount, tags) "requeue_count": c.RequeueCount,
"timeout_count": c.TimeoutCount,
"client_count": int64(len(c.Clients)),
}
acc.Add("nsq_channel_client_count", int64(len(c.Clients)), tags) acc.AddFields("nsq_channel", fields, tags)
for _, cl := range c.Clients { for _, cl := range c.Clients {
clientStats(cl, acc, host, version, topic, c.Name) clientStats(cl, acc, host, version, topic, c.Name)
} }
} }
func clientStats(c ClientStats, acc plugins.Accumulator, host, version, topic, channel string) { func clientStats(c ClientStats, acc inputs.Accumulator, host, version, topic, channel string) {
tags := map[string]string{ tags := map[string]string{
`server_host`: host, "server_host": host,
`server_version`: version, "server_version": version,
`topic`: topic, "topic": topic,
`channel`: channel, "channel": channel,
`client_name`: c.Name, "client_name": c.Name,
`client_id`: c.ID, "client_id": c.ID,
`client_hostname`: c.Hostname, "client_hostname": c.Hostname,
`client_version`: c.Version, "client_version": c.Version,
`client_address`: c.RemoteAddress, "client_address": c.RemoteAddress,
`client_user_agent`: c.UserAgent, "client_user_agent": c.UserAgent,
`client_tls`: strconv.FormatBool(c.TLS), "client_tls": strconv.FormatBool(c.TLS),
`client_snappy`: strconv.FormatBool(c.Snappy), "client_snappy": strconv.FormatBool(c.Snappy),
`client_deflate`: strconv.FormatBool(c.Deflate), "client_deflate": strconv.FormatBool(c.Deflate),
} }
acc.Add("nsq_client_ready_count", c.ReadyCount, tags)
acc.Add("nsq_client_inflight_count", c.InFlightCount, tags) fields := map[string]interface{}{
acc.Add("nsq_client_message_count", c.MessageCount, tags) "ready_count": c.ReadyCount,
acc.Add("nsq_client_finish_count", c.FinishCount, tags) "inflight_count": c.InFlightCount,
acc.Add("nsq_client_requeue_count", c.RequeueCount, tags) "message_count": c.MessageCount,
"finish_count": c.FinishCount,
"requeue_count": c.RequeueCount,
}
acc.AddFields("nsq_client", fields, tags)
} }
type NSQStats struct { type NSQStats struct {

View File

@ -9,7 +9,6 @@ import (
"github.com/influxdb/telegraf/testutil" "github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -35,49 +34,121 @@ func TestNSQStats(t *testing.T) {
// actually validate the tests // actually validate the tests
tests := []struct { tests := []struct {
m string m string
v int64 f map[string]interface{}
g map[string]string g map[string]string
}{ }{
{`nsq_server_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`}}, {
{`nsq_server_topic_count`, int64(2), map[string]string{`server_host`: host, `server_version`: `0.3.6`}}, "nsq_server",
{`nsq_topic_depth`, int64(12), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`}}, map[string]interface{}{
{`nsq_topic_backend_depth`, int64(13), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`}}, "server_count": int64(1),
{`nsq_topic_message_count`, int64(14), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`}}, "topic_count": int64(2),
{`nsq_topic_channel_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`}}, },
{`nsq_channel_depth`, int64(0), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}}, map[string]string{
{`nsq_channel_backend_depth`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}}, "server_host": host,
{`nsq_channel_inflight_count`, int64(2), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}}, "server_version": "0.3.6",
{`nsq_channel_deferred_count`, int64(3), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}}, },
{`nsq_channel_message_count`, int64(4), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}}, },
{`nsq_channel_requeue_count`, int64(5), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}}, {
{`nsq_channel_timeout_count`, int64(6), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}}, "nsq_topic",
{`nsq_channel_client_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}}, map[string]interface{}{
{`nsq_client_ready_count`, int64(200), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}}, "depth": int64(12),
{`nsq_client_inflight_count`, int64(7), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}}, "backend_depth": int64(13),
{`nsq_client_message_count`, int64(8), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}}, "message_count": int64(14),
{`nsq_client_finish_count`, int64(9), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}}, "channel_count": int64(1),
{`nsq_client_requeue_count`, int64(10), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}}, },
{`nsq_topic_depth`, int64(28), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`}}, map[string]string{
{`nsq_topic_backend_depth`, int64(29), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`}}, "server_host": host,
{`nsq_topic_message_count`, int64(30), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`}}, "server_version": "0.3.6",
{`nsq_topic_channel_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`}}, "topic": "t1"},
{`nsq_channel_depth`, int64(15), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}}, },
{`nsq_channel_backend_depth`, int64(16), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}}, {
{`nsq_channel_inflight_count`, int64(17), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}}, "nsq_channel",
{`nsq_channel_deferred_count`, int64(18), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}}, map[string]interface{}{
{`nsq_channel_message_count`, int64(19), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}}, "depth": int64(0),
{`nsq_channel_requeue_count`, int64(20), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}}, "backend_depth": int64(1),
{`nsq_channel_timeout_count`, int64(21), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}}, "inflight_count": int64(2),
{`nsq_channel_client_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}}, "deferred_count": int64(3),
{`nsq_client_ready_count`, int64(22), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}}, "message_count": int64(4),
{`nsq_client_inflight_count`, int64(23), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}}, "requeue_count": int64(5),
{`nsq_client_message_count`, int64(24), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}}, "timeout_count": int64(6),
{`nsq_client_finish_count`, int64(25), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}}, "client_count": int64(1),
{`nsq_client_requeue_count`, int64(26), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}}, },
map[string]string{
"server_host": host,
"server_version": "0.3.6",
"topic": "t1",
"channel": "c1",
},
},
{
"nsq_client",
map[string]interface{}{
"ready_count": int64(200),
"inflight_count": int64(7),
"message_count": int64(8),
"finish_count": int64(9),
"requeue_count": int64(10),
},
map[string]string{"server_host": host, "server_version": "0.3.6",
"topic": "t1", "channel": "c1", "client_name": "373a715cd990",
"client_id": "373a715cd990", "client_hostname": "373a715cd990",
"client_version": "V2", "client_address": "172.17.0.11:35560",
"client_tls": "false", "client_snappy": "false",
"client_deflate": "false",
"client_user_agent": "nsq_to_nsq/0.3.6 go-nsq/1.0.5"},
},
{
"nsq_topic",
map[string]interface{}{
"depth": int64(28),
"backend_depth": int64(29),
"message_count": int64(30),
"channel_count": int64(1),
},
map[string]string{
"server_host": host,
"server_version": "0.3.6",
"topic": "t2"},
},
{
"nsq_channel",
map[string]interface{}{
"depth": int64(15),
"backend_depth": int64(16),
"inflight_count": int64(17),
"deferred_count": int64(18),
"message_count": int64(19),
"requeue_count": int64(20),
"timeout_count": int64(21),
"client_count": int64(1),
},
map[string]string{
"server_host": host,
"server_version": "0.3.6",
"topic": "t2",
"channel": "c2",
},
},
{
"nsq_client",
map[string]interface{}{
"ready_count": int64(22),
"inflight_count": int64(23),
"message_count": int64(24),
"finish_count": int64(25),
"requeue_count": int64(26),
},
map[string]string{"server_host": host, "server_version": "0.3.6",
"topic": "t2", "channel": "c2", "client_name": "377569bd462b",
"client_id": "377569bd462b", "client_hostname": "377569bd462b",
"client_version": "V2", "client_address": "172.17.0.8:48145",
"client_user_agent": "go-nsq/1.0.5", "client_tls": "true",
"client_snappy": "true", "client_deflate": "true"},
},
} }
for _, test := range tests { for _, test := range tests {
assert.True(t, acc.CheckTaggedValue(test.m, test.v, test.g), "Failed expectation: (\"%v\", \"%v\", \"%v\")", test.m, test.v, fmt.Sprint(test.g)) acc.AssertContainsTaggedFields(t, test.m, test.f, test.g)
} }
} }