diff --git a/plugins/inputs/nsq/nsq.go b/plugins/inputs/nsq/nsq.go index b59420c26..1ef47ef05 100644 --- a/plugins/inputs/nsq/nsq.go +++ b/plugins/inputs/nsq/nsq.go @@ -25,6 +25,7 @@ package nsq import ( "encoding/json" "fmt" + "io/ioutil" "net/http" "net/url" "strconv" @@ -101,28 +102,42 @@ func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error { return fmt.Errorf("%s returned HTTP status %s", u.String(), r.Status) } - s := &NSQStats{} - err = json.NewDecoder(r.Body).Decode(s) + body, err := ioutil.ReadAll(r.Body) + if err != nil { + return fmt.Errorf(`Error reading body: %s`, err) + } + + data := &NSQStatsData{} + err = json.Unmarshal(body, data) if err != nil { return fmt.Errorf(`Error parsing response: %s`, err) } + // Data was not parsed correctly attempt to use old format. + if len(data.Version) < 1 { + wrapper := &NSQStats{} + err = json.Unmarshal(body, wrapper) + if err != nil { + return fmt.Errorf(`Error parsing response: %s`, err) + } + data = &wrapper.Data + } tags := map[string]string{ `server_host`: u.Host, - `server_version`: s.Data.Version, + `server_version`: data.Version, } fields := make(map[string]interface{}) - if s.Data.Health == `OK` { + if data.Health == `OK` { fields["server_count"] = int64(1) } else { fields["server_count"] = int64(0) } - fields["topic_count"] = int64(len(s.Data.Topics)) + fields["topic_count"] = int64(len(data.Topics)) acc.AddFields("nsq_server", fields, tags) - for _, t := range s.Data.Topics { - topicStats(t, acc, u.Host, s.Data.Version) + for _, t := range data.Topics { + topicStats(t, acc, u.Host, data.Version) } return nil @@ -189,7 +204,6 @@ func clientStats(c ClientStats, acc telegraf.Accumulator, host, version, topic, "server_version": version, "topic": topic, "channel": channel, - "client_name": c.Name, "client_id": c.ID, "client_hostname": c.Hostname, "client_version": c.Version, @@ -199,6 +213,9 @@ func clientStats(c ClientStats, acc telegraf.Accumulator, host, version, topic, "client_snappy": strconv.FormatBool(c.Snappy), "client_deflate": strconv.FormatBool(c.Deflate), } + if len(c.Name) > 0 { + tags["client_name"] = c.Name + } fields := map[string]interface{}{ "ready_count": c.ReadyCount, @@ -248,7 +265,7 @@ type ChannelStats struct { } type ClientStats struct { - Name string `json:"name"` + Name string `json:"name"` // DEPRECATED 1.x+, still here as the structs are currently being shared for parsing v3.x and 1.x ID string `json:"client_id"` Hostname string `json:"hostname"` Version string `json:"version"` diff --git a/plugins/inputs/nsq/nsq_test.go b/plugins/inputs/nsq/nsq_test.go index 926f99638..f3e9ce868 100644 --- a/plugins/inputs/nsq/nsq_test.go +++ b/plugins/inputs/nsq/nsq_test.go @@ -12,10 +12,267 @@ import ( "github.com/stretchr/testify/require" ) -func TestNSQStats(t *testing.T) { +func TestNSQStatsV1(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) - fmt.Fprintln(w, response) + fmt.Fprintln(w, responseV1) + })) + defer ts.Close() + + n := &NSQ{ + Endpoints: []string{ts.URL}, + } + + var acc testutil.Accumulator + err := acc.GatherError(n.Gather) + require.NoError(t, err) + + u, err := url.Parse(ts.URL) + require.NoError(t, err) + host := u.Host + + // actually validate the tests + tests := []struct { + m string + f map[string]interface{} + g map[string]string + }{ + { + "nsq_server", + map[string]interface{}{ + "server_count": int64(1), + "topic_count": int64(2), + }, + map[string]string{ + "server_host": host, + "server_version": "1.0.0-compat", + }, + }, + { + "nsq_topic", + map[string]interface{}{ + "depth": int64(12), + "backend_depth": int64(13), + "message_count": int64(14), + "channel_count": int64(1), + }, + map[string]string{ + "server_host": host, + "server_version": "1.0.0-compat", + "topic": "t1"}, + }, + { + "nsq_channel", + map[string]interface{}{ + "depth": int64(0), + "backend_depth": int64(1), + "inflight_count": int64(2), + "deferred_count": int64(3), + "message_count": int64(4), + "requeue_count": int64(5), + "timeout_count": int64(6), + "client_count": int64(1), + }, + map[string]string{ + "server_host": host, + "server_version": "1.0.0-compat", + "topic": "t1", + "channel": "c1", + }, + }, + { + "nsq_client", + map[string]interface{}{ + "ready_count": int64(200), + "inflight_count": int64(7), + "message_count": int64(8), + "finish_count": int64(9), + "requeue_count": int64(10), + }, + map[string]string{"server_host": host, "server_version": "1.0.0-compat", + "topic": "t1", "channel": "c1", + "client_id": "373a715cd990", "client_hostname": "373a715cd990", + "client_version": "V2", "client_address": "172.17.0.11:35560", + "client_tls": "false", "client_snappy": "false", + "client_deflate": "false", + "client_user_agent": "nsq_to_nsq/0.3.6 go-nsq/1.0.5"}, + }, + { + "nsq_topic", + map[string]interface{}{ + "depth": int64(28), + "backend_depth": int64(29), + "message_count": int64(30), + "channel_count": int64(1), + }, + map[string]string{ + "server_host": host, + "server_version": "1.0.0-compat", + "topic": "t2"}, + }, + { + "nsq_channel", + map[string]interface{}{ + "depth": int64(15), + "backend_depth": int64(16), + "inflight_count": int64(17), + "deferred_count": int64(18), + "message_count": int64(19), + "requeue_count": int64(20), + "timeout_count": int64(21), + "client_count": int64(1), + }, + map[string]string{ + "server_host": host, + "server_version": "1.0.0-compat", + "topic": "t2", + "channel": "c2", + }, + }, + { + "nsq_client", + map[string]interface{}{ + "ready_count": int64(22), + "inflight_count": int64(23), + "message_count": int64(24), + "finish_count": int64(25), + "requeue_count": int64(26), + }, + map[string]string{"server_host": host, "server_version": "1.0.0-compat", + "topic": "t2", "channel": "c2", + "client_id": "377569bd462b", "client_hostname": "377569bd462b", + "client_version": "V2", "client_address": "172.17.0.8:48145", + "client_user_agent": "go-nsq/1.0.5", "client_tls": "true", + "client_snappy": "true", "client_deflate": "true"}, + }, + } + + for _, test := range tests { + acc.AssertContainsTaggedFields(t, test.m, test.f, test.g) + } +} + +// v1 version of localhost/stats?format=json reesponse body +var responseV1 = ` +{ + "version": "1.0.0-compat", + "health": "OK", + "start_time": 1452021674, + "topics": [ + { + "topic_name": "t1", + "channels": [ + { + "channel_name": "c1", + "depth": 0, + "backend_depth": 1, + "in_flight_count": 2, + "deferred_count": 3, + "message_count": 4, + "requeue_count": 5, + "timeout_count": 6, + "clients": [ + { + "client_id": "373a715cd990", + "hostname": "373a715cd990", + "version": "V2", + "remote_address": "172.17.0.11:35560", + "state": 3, + "ready_count": 200, + "in_flight_count": 7, + "message_count": 8, + "finish_count": 9, + "requeue_count": 10, + "connect_ts": 1452021675, + "sample_rate": 11, + "deflate": false, + "snappy": false, + "user_agent": "nsq_to_nsq\/0.3.6 go-nsq\/1.0.5", + "tls": false, + "tls_cipher_suite": "", + "tls_version": "", + "tls_negotiated_protocol": "", + "tls_negotiated_protocol_is_mutual": false + } + ], + "paused": false, + "e2e_processing_latency": { + "count": 0, + "percentiles": null + } + } + ], + "depth": 12, + "backend_depth": 13, + "message_count": 14, + "paused": false, + "e2e_processing_latency": { + "count": 0, + "percentiles": null + } + }, + { + "topic_name": "t2", + "channels": [ + { + "channel_name": "c2", + "depth": 15, + "backend_depth": 16, + "in_flight_count": 17, + "deferred_count": 18, + "message_count": 19, + "requeue_count": 20, + "timeout_count": 21, + "clients": [ + { + "client_id": "377569bd462b", + "hostname": "377569bd462b", + "version": "V2", + "remote_address": "172.17.0.8:48145", + "state": 3, + "ready_count": 22, + "in_flight_count": 23, + "message_count": 24, + "finish_count": 25, + "requeue_count": 26, + "connect_ts": 1452021678, + "sample_rate": 27, + "deflate": true, + "snappy": true, + "user_agent": "go-nsq\/1.0.5", + "tls": true, + "tls_cipher_suite": "", + "tls_version": "", + "tls_negotiated_protocol": "", + "tls_negotiated_protocol_is_mutual": false + } + ], + "paused": false, + "e2e_processing_latency": { + "count": 0, + "percentiles": null + } + } + ], + "depth": 28, + "backend_depth": 29, + "message_count": 30, + "paused": false, + "e2e_processing_latency": { + "count": 0, + "percentiles": null + } + } + ] + } + +` + +// TestNSQStatsPreV1 is for backwards compatibility with nsq versions < 1.0 +func TestNSQStatsPreV1(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w, responsePreV1) })) defer ts.Close() @@ -152,7 +409,7 @@ func TestNSQStats(t *testing.T) { } } -var response = ` +var responsePreV1 = ` { "status_code": 200, "status_txt": "OK",