diff --git a/plugins/inputs/nsq/nsq.go b/plugins/inputs/nsq/nsq.go new file mode 100644 index 000000000..678ea8be7 --- /dev/null +++ b/plugins/inputs/nsq/nsq.go @@ -0,0 +1,260 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Jeff Nickoloff (jeff@allingeek.com) +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package nsq + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "strconv" + "sync" + "time" + + "github.com/influxdb/telegraf/plugins" +) + +// Might add Lookupd endpoints for cluster discovery +type NSQ struct { + Endpoints []string +} + +var sampleConfig = ` + # An array of NSQD HTTP API endpoints + endpoints = ["http://localhost:4151","http://otherhost:4151"] +` + +const ( + requestPattern = `%s/stats?format=json` +) + +func init() { + plugins.Add("nsq", func() plugins.Plugin { + return &NSQ{} + }) +} + +func (n *NSQ) SampleConfig() string { + return sampleConfig +} + +func (n *NSQ) Description() string { + return "Read NSQ topic and channel statistics." +} + +func (n *NSQ) Gather(acc plugins.Accumulator) error { + var wg sync.WaitGroup + var outerr error + + for _, e := range n.Endpoints { + wg.Add(1) + go func(e string) { + defer wg.Done() + outerr = n.gatherEndpoint(e, acc) + }(e) + } + + wg.Wait() + + return outerr +} + +var tr = &http.Transport{ + ResponseHeaderTimeout: time.Duration(3 * time.Second), +} + +var client = &http.Client{Transport: tr} + +func (n *NSQ) gatherEndpoint(e string, acc plugins.Accumulator) error { + u, err := buildURL(e) + if err != nil { + return err + } + r, err := client.Get(u.String()) + if err != nil { + return fmt.Errorf("Error while polling %s: %s", u.String(), err) + } + defer r.Body.Close() + + if r.StatusCode != http.StatusOK { + return fmt.Errorf("%s returned HTTP status %s", u.String(), r.Status) + } + + s := &NSQStats{} + err = json.NewDecoder(r.Body).Decode(s) + if err != nil { + return fmt.Errorf(`Error parsing response: %s`, err) + } + + tags := map[string]string{ + `server_host`: u.Host, + `server_version`: s.Data.Version, + } + + if s.Data.Health == `OK` { + acc.Add(`nsq_server_count`, int64(1), tags) + } else { + acc.Add(`nsq_server_count`, int64(0), tags) + } + + acc.Add(`nsq_server_topic_count`, int64(len(s.Data.Topics)), tags) + for _, t := range s.Data.Topics { + topicStats(t, acc, u.Host, s.Data.Version) + } + + return nil +} + +func buildURL(e string) (*url.URL, error) { + u := fmt.Sprintf(requestPattern, e) + addr, err := url.Parse(u) + if err != nil { + return nil, fmt.Errorf("Unable to parse address '%s': %s", u, err) + } + return addr, nil +} + +func topicStats(t TopicStats, acc plugins.Accumulator, host, version string) { + + // per topic overall (tag: name, paused, channel count) + tags := map[string]string{ + `server_host`: host, + `server_version`: version, + `topic`: t.Name, + } + + acc.Add(`nsq_topic_depth`, t.Depth, tags) + acc.Add(`nsq_topic_backend_depth`, t.BackendDepth, tags) + acc.Add(`nsq_topic_message_count`, t.MessageCount, tags) + + acc.Add(`nsq_topic_channel_count`, int64(len(t.Channels)), tags) + for _, c := range t.Channels { + channelStats(c, acc, host, version, t.Name) + } +} + +func channelStats(c ChannelStats, acc plugins.Accumulator, host, version, topic string) { + tags := map[string]string{ + `server_host`: host, + `server_version`: version, + `topic`: topic, + `channel`: c.Name, + } + + acc.Add("nsq_channel_depth", c.Depth, tags) + acc.Add("nsq_channel_backend_depth", c.BackendDepth, tags) + acc.Add("nsq_channel_inflight_count", c.InFlightCount, tags) + acc.Add("nsq_channel_deferred_count", c.DeferredCount, tags) + acc.Add("nsq_channel_message_count", c.MessageCount, tags) + acc.Add("nsq_channel_requeue_count", c.RequeueCount, tags) + acc.Add("nsq_channel_timeout_count", c.TimeoutCount, tags) + + acc.Add("nsq_channel_client_count", int64(len(c.Clients)), tags) + for _, cl := range c.Clients { + clientStats(cl, acc, host, version, topic, c.Name) + } +} + +func clientStats(c ClientStats, acc plugins.Accumulator, host, version, topic, channel string) { + tags := map[string]string{ + `server_host`: host, + `server_version`: version, + `topic`: topic, + `channel`: channel, + `client_name`: c.Name, + `client_id`: c.ID, + `client_hostname`: c.Hostname, + `client_version`: c.Version, + `client_address`: c.RemoteAddress, + `client_user_agent`: c.UserAgent, + `client_tls`: strconv.FormatBool(c.TLS), + `client_snappy`: strconv.FormatBool(c.Snappy), + `client_deflate`: strconv.FormatBool(c.Deflate), + } + acc.Add("nsq_client_ready_count", c.ReadyCount, tags) + acc.Add("nsq_client_inflight_count", c.InFlightCount, tags) + acc.Add("nsq_client_message_count", c.MessageCount, tags) + acc.Add("nsq_client_finish_count", c.FinishCount, tags) + acc.Add("nsq_client_requeue_count", c.RequeueCount, tags) +} + +type NSQStats struct { + Code int64 `json:"status_code"` + Txt string `json:"status_txt"` + Data NSQStatsData `json:"data"` +} + +type NSQStatsData struct { + Version string `json:"version"` + Health string `json:"health"` + StartTime int64 `json:"start_time"` + Topics []TopicStats `json:"topics"` +} + +// e2e_processing_latency is not modeled +type TopicStats struct { + Name string `json:"topic_name"` + Depth int64 `json:"depth"` + BackendDepth int64 `json:"backend_depth"` + MessageCount int64 `json:"message_count"` + Paused bool `json:"paused"` + Channels []ChannelStats `json:"channels"` +} + +// e2e_processing_latency is not modeled +type ChannelStats struct { + Name string `json:"channel_name"` + Depth int64 `json:"depth"` + BackendDepth int64 `json:"backend_depth"` + InFlightCount int64 `json:"in_flight_count"` + DeferredCount int64 `json:"deferred_count"` + MessageCount int64 `json:"message_count"` + RequeueCount int64 `json:"requeue_count"` + TimeoutCount int64 `json:"timeout_count"` + Paused bool `json:"paused"` + Clients []ClientStats `json:"clients"` +} + +type ClientStats struct { + Name string `json:"name"` + ID string `json:"client_id"` + Hostname string `json:"hostname"` + Version string `json:"version"` + RemoteAddress string `json:"remote_address"` + State int64 `json:"state"` + ReadyCount int64 `json:"ready_count"` + InFlightCount int64 `json:"in_flight_count"` + MessageCount int64 `json:"message_count"` + FinishCount int64 `json:"finish_count"` + RequeueCount int64 `json:"requeue_count"` + ConnectTime int64 `json:"connect_ts"` + SampleRate int64 `json:"sample_rate"` + Deflate bool `json:"deflate"` + Snappy bool `json:"snappy"` + UserAgent string `json:"user_agent"` + TLS bool `json:"tls"` + TLSCipherSuite string `json:"tls_cipher_suite"` + TLSVersion string `json:"tls_version"` + TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"` + TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"` +} diff --git a/plugins/inputs/nsq/nsq_test.go b/plugins/inputs/nsq/nsq_test.go new file mode 100644 index 000000000..44a205c08 --- /dev/null +++ b/plugins/inputs/nsq/nsq_test.go @@ -0,0 +1,202 @@ +package nsq + +import ( + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/influxdb/telegraf/testutil" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNSQStats(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w, response) + })) + defer ts.Close() + + n := &NSQ{ + Endpoints: []string{ts.URL}, + } + + var acc testutil.Accumulator + err := n.Gather(&acc) + require.NoError(t, err) + + u, err := url.Parse(ts.URL) + require.NoError(t, err) + host := u.Host + + // actually validate the tests + tests := []struct { + m string + v int64 + g map[string]string + }{ + {`nsq_server_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`}}, + {`nsq_server_topic_count`, int64(2), map[string]string{`server_host`: host, `server_version`: `0.3.6`}}, + {`nsq_topic_depth`, int64(12), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`}}, + {`nsq_topic_backend_depth`, int64(13), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`}}, + {`nsq_topic_message_count`, int64(14), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`}}, + {`nsq_topic_channel_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`}}, + {`nsq_channel_depth`, int64(0), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}}, + {`nsq_channel_backend_depth`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}}, + {`nsq_channel_inflight_count`, int64(2), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}}, + {`nsq_channel_deferred_count`, int64(3), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}}, + {`nsq_channel_message_count`, int64(4), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}}, + {`nsq_channel_requeue_count`, int64(5), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}}, + {`nsq_channel_timeout_count`, int64(6), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}}, + {`nsq_channel_client_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}}, + {`nsq_client_ready_count`, int64(200), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}}, + {`nsq_client_inflight_count`, int64(7), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}}, + {`nsq_client_message_count`, int64(8), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}}, + {`nsq_client_finish_count`, int64(9), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}}, + {`nsq_client_requeue_count`, int64(10), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}}, + {`nsq_topic_depth`, int64(28), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`}}, + {`nsq_topic_backend_depth`, int64(29), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`}}, + {`nsq_topic_message_count`, int64(30), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`}}, + {`nsq_topic_channel_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`}}, + {`nsq_channel_depth`, int64(15), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}}, + {`nsq_channel_backend_depth`, int64(16), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}}, + {`nsq_channel_inflight_count`, int64(17), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}}, + {`nsq_channel_deferred_count`, int64(18), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}}, + {`nsq_channel_message_count`, int64(19), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}}, + {`nsq_channel_requeue_count`, int64(20), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}}, + {`nsq_channel_timeout_count`, int64(21), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}}, + {`nsq_channel_client_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}}, + {`nsq_client_ready_count`, int64(22), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}}, + {`nsq_client_inflight_count`, int64(23), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}}, + {`nsq_client_message_count`, int64(24), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}}, + {`nsq_client_finish_count`, int64(25), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}}, + {`nsq_client_requeue_count`, int64(26), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}}, + } + + for _, test := range tests { + assert.True(t, acc.CheckTaggedValue(test.m, test.v, test.g), "Failed expectation: (\"%v\", \"%v\", \"%v\")", test.m, test.v, fmt.Sprint(test.g)) + } +} + +var response = ` +{ + "status_code": 200, + "status_txt": "OK", + "data": { + "version": "0.3.6", + "health": "OK", + "start_time": 1452021674, + "topics": [ + { + "topic_name": "t1", + "channels": [ + { + "channel_name": "c1", + "depth": 0, + "backend_depth": 1, + "in_flight_count": 2, + "deferred_count": 3, + "message_count": 4, + "requeue_count": 5, + "timeout_count": 6, + "clients": [ + { + "name": "373a715cd990", + "client_id": "373a715cd990", + "hostname": "373a715cd990", + "version": "V2", + "remote_address": "172.17.0.11:35560", + "state": 3, + "ready_count": 200, + "in_flight_count": 7, + "message_count": 8, + "finish_count": 9, + "requeue_count": 10, + "connect_ts": 1452021675, + "sample_rate": 11, + "deflate": false, + "snappy": false, + "user_agent": "nsq_to_nsq\/0.3.6 go-nsq\/1.0.5", + "tls": false, + "tls_cipher_suite": "", + "tls_version": "", + "tls_negotiated_protocol": "", + "tls_negotiated_protocol_is_mutual": false + } + ], + "paused": false, + "e2e_processing_latency": { + "count": 0, + "percentiles": null + } + } + ], + "depth": 12, + "backend_depth": 13, + "message_count": 14, + "paused": false, + "e2e_processing_latency": { + "count": 0, + "percentiles": null + } + }, + { + "topic_name": "t2", + "channels": [ + { + "channel_name": "c2", + "depth": 15, + "backend_depth": 16, + "in_flight_count": 17, + "deferred_count": 18, + "message_count": 19, + "requeue_count": 20, + "timeout_count": 21, + "clients": [ + { + "name": "377569bd462b", + "client_id": "377569bd462b", + "hostname": "377569bd462b", + "version": "V2", + "remote_address": "172.17.0.8:48145", + "state": 3, + "ready_count": 22, + "in_flight_count": 23, + "message_count": 24, + "finish_count": 25, + "requeue_count": 26, + "connect_ts": 1452021678, + "sample_rate": 27, + "deflate": true, + "snappy": true, + "user_agent": "go-nsq\/1.0.5", + "tls": true, + "tls_cipher_suite": "", + "tls_version": "", + "tls_negotiated_protocol": "", + "tls_negotiated_protocol_is_mutual": false + } + ], + "paused": false, + "e2e_processing_latency": { + "count": 0, + "percentiles": null + } + } + ], + "depth": 28, + "backend_depth": 29, + "message_count": 30, + "paused": false, + "e2e_processing_latency": { + "count": 0, + "percentiles": null + } + } + ] + } +} +`