Fix NSQ input plugin when used with version 1.0.0-compat

This commit is contained in:
Ashton Kinslow 2017-08-25 20:06:48 -05:00 committed by Daniel Nelson
parent 8ecc58639a
commit 0a6541dfa8
2 changed files with 286 additions and 12 deletions

View File

@ -25,6 +25,7 @@ package nsq
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
"strconv" "strconv"
@ -101,28 +102,42 @@ func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error {
return fmt.Errorf("%s returned HTTP status %s", u.String(), r.Status) return fmt.Errorf("%s returned HTTP status %s", u.String(), r.Status)
} }
s := &NSQStats{} body, err := ioutil.ReadAll(r.Body)
err = json.NewDecoder(r.Body).Decode(s) if err != nil {
return fmt.Errorf(`Error reading body: %s`, err)
}
data := &NSQStatsData{}
err = json.Unmarshal(body, data)
if err != nil { if err != nil {
return fmt.Errorf(`Error parsing response: %s`, err) return fmt.Errorf(`Error parsing response: %s`, err)
} }
// Data was not parsed correctly attempt to use old format.
if len(data.Version) < 1 {
wrapper := &NSQStats{}
err = json.Unmarshal(body, wrapper)
if err != nil {
return fmt.Errorf(`Error parsing response: %s`, err)
}
data = &wrapper.Data
}
tags := map[string]string{ tags := map[string]string{
`server_host`: u.Host, `server_host`: u.Host,
`server_version`: s.Data.Version, `server_version`: data.Version,
} }
fields := make(map[string]interface{}) fields := make(map[string]interface{})
if s.Data.Health == `OK` { if data.Health == `OK` {
fields["server_count"] = int64(1) fields["server_count"] = int64(1)
} else { } else {
fields["server_count"] = int64(0) fields["server_count"] = int64(0)
} }
fields["topic_count"] = int64(len(s.Data.Topics)) fields["topic_count"] = int64(len(data.Topics))
acc.AddFields("nsq_server", fields, tags) acc.AddFields("nsq_server", fields, tags)
for _, t := range s.Data.Topics { for _, t := range data.Topics {
topicStats(t, acc, u.Host, s.Data.Version) topicStats(t, acc, u.Host, data.Version)
} }
return nil return nil
@ -189,7 +204,6 @@ func clientStats(c ClientStats, acc telegraf.Accumulator, host, version, topic,
"server_version": version, "server_version": version,
"topic": topic, "topic": topic,
"channel": channel, "channel": channel,
"client_name": c.Name,
"client_id": c.ID, "client_id": c.ID,
"client_hostname": c.Hostname, "client_hostname": c.Hostname,
"client_version": c.Version, "client_version": c.Version,
@ -199,6 +213,9 @@ func clientStats(c ClientStats, acc telegraf.Accumulator, host, version, topic,
"client_snappy": strconv.FormatBool(c.Snappy), "client_snappy": strconv.FormatBool(c.Snappy),
"client_deflate": strconv.FormatBool(c.Deflate), "client_deflate": strconv.FormatBool(c.Deflate),
} }
if len(c.Name) > 0 {
tags["client_name"] = c.Name
}
fields := map[string]interface{}{ fields := map[string]interface{}{
"ready_count": c.ReadyCount, "ready_count": c.ReadyCount,
@ -248,7 +265,7 @@ type ChannelStats struct {
} }
type ClientStats struct { type ClientStats struct {
Name string `json:"name"` Name string `json:"name"` // DEPRECATED 1.x+, still here as the structs are currently being shared for parsing v3.x and 1.x
ID string `json:"client_id"` ID string `json:"client_id"`
Hostname string `json:"hostname"` Hostname string `json:"hostname"`
Version string `json:"version"` Version string `json:"version"`

View File

@ -12,10 +12,267 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestNSQStats(t *testing.T) { func TestNSQStatsV1(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, response) fmt.Fprintln(w, responseV1)
}))
defer ts.Close()
n := &NSQ{
Endpoints: []string{ts.URL},
}
var acc testutil.Accumulator
err := acc.GatherError(n.Gather)
require.NoError(t, err)
u, err := url.Parse(ts.URL)
require.NoError(t, err)
host := u.Host
// actually validate the tests
tests := []struct {
m string
f map[string]interface{}
g map[string]string
}{
{
"nsq_server",
map[string]interface{}{
"server_count": int64(1),
"topic_count": int64(2),
},
map[string]string{
"server_host": host,
"server_version": "1.0.0-compat",
},
},
{
"nsq_topic",
map[string]interface{}{
"depth": int64(12),
"backend_depth": int64(13),
"message_count": int64(14),
"channel_count": int64(1),
},
map[string]string{
"server_host": host,
"server_version": "1.0.0-compat",
"topic": "t1"},
},
{
"nsq_channel",
map[string]interface{}{
"depth": int64(0),
"backend_depth": int64(1),
"inflight_count": int64(2),
"deferred_count": int64(3),
"message_count": int64(4),
"requeue_count": int64(5),
"timeout_count": int64(6),
"client_count": int64(1),
},
map[string]string{
"server_host": host,
"server_version": "1.0.0-compat",
"topic": "t1",
"channel": "c1",
},
},
{
"nsq_client",
map[string]interface{}{
"ready_count": int64(200),
"inflight_count": int64(7),
"message_count": int64(8),
"finish_count": int64(9),
"requeue_count": int64(10),
},
map[string]string{"server_host": host, "server_version": "1.0.0-compat",
"topic": "t1", "channel": "c1",
"client_id": "373a715cd990", "client_hostname": "373a715cd990",
"client_version": "V2", "client_address": "172.17.0.11:35560",
"client_tls": "false", "client_snappy": "false",
"client_deflate": "false",
"client_user_agent": "nsq_to_nsq/0.3.6 go-nsq/1.0.5"},
},
{
"nsq_topic",
map[string]interface{}{
"depth": int64(28),
"backend_depth": int64(29),
"message_count": int64(30),
"channel_count": int64(1),
},
map[string]string{
"server_host": host,
"server_version": "1.0.0-compat",
"topic": "t2"},
},
{
"nsq_channel",
map[string]interface{}{
"depth": int64(15),
"backend_depth": int64(16),
"inflight_count": int64(17),
"deferred_count": int64(18),
"message_count": int64(19),
"requeue_count": int64(20),
"timeout_count": int64(21),
"client_count": int64(1),
},
map[string]string{
"server_host": host,
"server_version": "1.0.0-compat",
"topic": "t2",
"channel": "c2",
},
},
{
"nsq_client",
map[string]interface{}{
"ready_count": int64(22),
"inflight_count": int64(23),
"message_count": int64(24),
"finish_count": int64(25),
"requeue_count": int64(26),
},
map[string]string{"server_host": host, "server_version": "1.0.0-compat",
"topic": "t2", "channel": "c2",
"client_id": "377569bd462b", "client_hostname": "377569bd462b",
"client_version": "V2", "client_address": "172.17.0.8:48145",
"client_user_agent": "go-nsq/1.0.5", "client_tls": "true",
"client_snappy": "true", "client_deflate": "true"},
},
}
for _, test := range tests {
acc.AssertContainsTaggedFields(t, test.m, test.f, test.g)
}
}
// v1 version of localhost/stats?format=json reesponse body
var responseV1 = `
{
"version": "1.0.0-compat",
"health": "OK",
"start_time": 1452021674,
"topics": [
{
"topic_name": "t1",
"channels": [
{
"channel_name": "c1",
"depth": 0,
"backend_depth": 1,
"in_flight_count": 2,
"deferred_count": 3,
"message_count": 4,
"requeue_count": 5,
"timeout_count": 6,
"clients": [
{
"client_id": "373a715cd990",
"hostname": "373a715cd990",
"version": "V2",
"remote_address": "172.17.0.11:35560",
"state": 3,
"ready_count": 200,
"in_flight_count": 7,
"message_count": 8,
"finish_count": 9,
"requeue_count": 10,
"connect_ts": 1452021675,
"sample_rate": 11,
"deflate": false,
"snappy": false,
"user_agent": "nsq_to_nsq\/0.3.6 go-nsq\/1.0.5",
"tls": false,
"tls_cipher_suite": "",
"tls_version": "",
"tls_negotiated_protocol": "",
"tls_negotiated_protocol_is_mutual": false
}
],
"paused": false,
"e2e_processing_latency": {
"count": 0,
"percentiles": null
}
}
],
"depth": 12,
"backend_depth": 13,
"message_count": 14,
"paused": false,
"e2e_processing_latency": {
"count": 0,
"percentiles": null
}
},
{
"topic_name": "t2",
"channels": [
{
"channel_name": "c2",
"depth": 15,
"backend_depth": 16,
"in_flight_count": 17,
"deferred_count": 18,
"message_count": 19,
"requeue_count": 20,
"timeout_count": 21,
"clients": [
{
"client_id": "377569bd462b",
"hostname": "377569bd462b",
"version": "V2",
"remote_address": "172.17.0.8:48145",
"state": 3,
"ready_count": 22,
"in_flight_count": 23,
"message_count": 24,
"finish_count": 25,
"requeue_count": 26,
"connect_ts": 1452021678,
"sample_rate": 27,
"deflate": true,
"snappy": true,
"user_agent": "go-nsq\/1.0.5",
"tls": true,
"tls_cipher_suite": "",
"tls_version": "",
"tls_negotiated_protocol": "",
"tls_negotiated_protocol_is_mutual": false
}
],
"paused": false,
"e2e_processing_latency": {
"count": 0,
"percentiles": null
}
}
],
"depth": 28,
"backend_depth": 29,
"message_count": 30,
"paused": false,
"e2e_processing_latency": {
"count": 0,
"percentiles": null
}
}
]
}
`
// TestNSQStatsPreV1 is for backwards compatibility with nsq versions < 1.0
func TestNSQStatsPreV1(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, responsePreV1)
})) }))
defer ts.Close() defer ts.Close()
@ -152,7 +409,7 @@ func TestNSQStats(t *testing.T) {
} }
} }
var response = ` var responsePreV1 = `
{ {
"status_code": 200, "status_code": 200,
"status_txt": "OK", "status_txt": "OK",