Fix NSQ input plugin when used with version 1.0.0-compat
(cherry picked from commit 0a6541dfa8
)
This commit is contained in:
parent
a9abfe8f08
commit
b15ec21ba7
|
@ -25,6 +25,7 @@ package nsq
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
|
@ -101,28 +102,42 @@ func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error {
|
|||
return fmt.Errorf("%s returned HTTP status %s", u.String(), r.Status)
|
||||
}
|
||||
|
||||
s := &NSQStats{}
|
||||
err = json.NewDecoder(r.Body).Decode(s)
|
||||
body, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf(`Error reading body: %s`, err)
|
||||
}
|
||||
|
||||
data := &NSQStatsData{}
|
||||
err = json.Unmarshal(body, data)
|
||||
if err != nil {
|
||||
return fmt.Errorf(`Error parsing response: %s`, err)
|
||||
}
|
||||
// Data was not parsed correctly attempt to use old format.
|
||||
if len(data.Version) < 1 {
|
||||
wrapper := &NSQStats{}
|
||||
err = json.Unmarshal(body, wrapper)
|
||||
if err != nil {
|
||||
return fmt.Errorf(`Error parsing response: %s`, err)
|
||||
}
|
||||
data = &wrapper.Data
|
||||
}
|
||||
|
||||
tags := map[string]string{
|
||||
`server_host`: u.Host,
|
||||
`server_version`: s.Data.Version,
|
||||
`server_version`: data.Version,
|
||||
}
|
||||
|
||||
fields := make(map[string]interface{})
|
||||
if s.Data.Health == `OK` {
|
||||
if data.Health == `OK` {
|
||||
fields["server_count"] = int64(1)
|
||||
} else {
|
||||
fields["server_count"] = int64(0)
|
||||
}
|
||||
fields["topic_count"] = int64(len(s.Data.Topics))
|
||||
fields["topic_count"] = int64(len(data.Topics))
|
||||
|
||||
acc.AddFields("nsq_server", fields, tags)
|
||||
for _, t := range s.Data.Topics {
|
||||
topicStats(t, acc, u.Host, s.Data.Version)
|
||||
for _, t := range data.Topics {
|
||||
topicStats(t, acc, u.Host, data.Version)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -189,7 +204,6 @@ func clientStats(c ClientStats, acc telegraf.Accumulator, host, version, topic,
|
|||
"server_version": version,
|
||||
"topic": topic,
|
||||
"channel": channel,
|
||||
"client_name": c.Name,
|
||||
"client_id": c.ID,
|
||||
"client_hostname": c.Hostname,
|
||||
"client_version": c.Version,
|
||||
|
@ -199,6 +213,9 @@ func clientStats(c ClientStats, acc telegraf.Accumulator, host, version, topic,
|
|||
"client_snappy": strconv.FormatBool(c.Snappy),
|
||||
"client_deflate": strconv.FormatBool(c.Deflate),
|
||||
}
|
||||
if len(c.Name) > 0 {
|
||||
tags["client_name"] = c.Name
|
||||
}
|
||||
|
||||
fields := map[string]interface{}{
|
||||
"ready_count": c.ReadyCount,
|
||||
|
@ -248,7 +265,7 @@ type ChannelStats struct {
|
|||
}
|
||||
|
||||
type ClientStats struct {
|
||||
Name string `json:"name"`
|
||||
Name string `json:"name"` // DEPRECATED 1.x+, still here as the structs are currently being shared for parsing v3.x and 1.x
|
||||
ID string `json:"client_id"`
|
||||
Hostname string `json:"hostname"`
|
||||
Version string `json:"version"`
|
||||
|
|
|
@ -12,10 +12,267 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNSQStats(t *testing.T) {
|
||||
func TestNSQStatsV1(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
fmt.Fprintln(w, response)
|
||||
fmt.Fprintln(w, responseV1)
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
n := &NSQ{
|
||||
Endpoints: []string{ts.URL},
|
||||
}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
err := acc.GatherError(n.Gather)
|
||||
require.NoError(t, err)
|
||||
|
||||
u, err := url.Parse(ts.URL)
|
||||
require.NoError(t, err)
|
||||
host := u.Host
|
||||
|
||||
// actually validate the tests
|
||||
tests := []struct {
|
||||
m string
|
||||
f map[string]interface{}
|
||||
g map[string]string
|
||||
}{
|
||||
{
|
||||
"nsq_server",
|
||||
map[string]interface{}{
|
||||
"server_count": int64(1),
|
||||
"topic_count": int64(2),
|
||||
},
|
||||
map[string]string{
|
||||
"server_host": host,
|
||||
"server_version": "1.0.0-compat",
|
||||
},
|
||||
},
|
||||
{
|
||||
"nsq_topic",
|
||||
map[string]interface{}{
|
||||
"depth": int64(12),
|
||||
"backend_depth": int64(13),
|
||||
"message_count": int64(14),
|
||||
"channel_count": int64(1),
|
||||
},
|
||||
map[string]string{
|
||||
"server_host": host,
|
||||
"server_version": "1.0.0-compat",
|
||||
"topic": "t1"},
|
||||
},
|
||||
{
|
||||
"nsq_channel",
|
||||
map[string]interface{}{
|
||||
"depth": int64(0),
|
||||
"backend_depth": int64(1),
|
||||
"inflight_count": int64(2),
|
||||
"deferred_count": int64(3),
|
||||
"message_count": int64(4),
|
||||
"requeue_count": int64(5),
|
||||
"timeout_count": int64(6),
|
||||
"client_count": int64(1),
|
||||
},
|
||||
map[string]string{
|
||||
"server_host": host,
|
||||
"server_version": "1.0.0-compat",
|
||||
"topic": "t1",
|
||||
"channel": "c1",
|
||||
},
|
||||
},
|
||||
{
|
||||
"nsq_client",
|
||||
map[string]interface{}{
|
||||
"ready_count": int64(200),
|
||||
"inflight_count": int64(7),
|
||||
"message_count": int64(8),
|
||||
"finish_count": int64(9),
|
||||
"requeue_count": int64(10),
|
||||
},
|
||||
map[string]string{"server_host": host, "server_version": "1.0.0-compat",
|
||||
"topic": "t1", "channel": "c1",
|
||||
"client_id": "373a715cd990", "client_hostname": "373a715cd990",
|
||||
"client_version": "V2", "client_address": "172.17.0.11:35560",
|
||||
"client_tls": "false", "client_snappy": "false",
|
||||
"client_deflate": "false",
|
||||
"client_user_agent": "nsq_to_nsq/0.3.6 go-nsq/1.0.5"},
|
||||
},
|
||||
{
|
||||
"nsq_topic",
|
||||
map[string]interface{}{
|
||||
"depth": int64(28),
|
||||
"backend_depth": int64(29),
|
||||
"message_count": int64(30),
|
||||
"channel_count": int64(1),
|
||||
},
|
||||
map[string]string{
|
||||
"server_host": host,
|
||||
"server_version": "1.0.0-compat",
|
||||
"topic": "t2"},
|
||||
},
|
||||
{
|
||||
"nsq_channel",
|
||||
map[string]interface{}{
|
||||
"depth": int64(15),
|
||||
"backend_depth": int64(16),
|
||||
"inflight_count": int64(17),
|
||||
"deferred_count": int64(18),
|
||||
"message_count": int64(19),
|
||||
"requeue_count": int64(20),
|
||||
"timeout_count": int64(21),
|
||||
"client_count": int64(1),
|
||||
},
|
||||
map[string]string{
|
||||
"server_host": host,
|
||||
"server_version": "1.0.0-compat",
|
||||
"topic": "t2",
|
||||
"channel": "c2",
|
||||
},
|
||||
},
|
||||
{
|
||||
"nsq_client",
|
||||
map[string]interface{}{
|
||||
"ready_count": int64(22),
|
||||
"inflight_count": int64(23),
|
||||
"message_count": int64(24),
|
||||
"finish_count": int64(25),
|
||||
"requeue_count": int64(26),
|
||||
},
|
||||
map[string]string{"server_host": host, "server_version": "1.0.0-compat",
|
||||
"topic": "t2", "channel": "c2",
|
||||
"client_id": "377569bd462b", "client_hostname": "377569bd462b",
|
||||
"client_version": "V2", "client_address": "172.17.0.8:48145",
|
||||
"client_user_agent": "go-nsq/1.0.5", "client_tls": "true",
|
||||
"client_snappy": "true", "client_deflate": "true"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
acc.AssertContainsTaggedFields(t, test.m, test.f, test.g)
|
||||
}
|
||||
}
|
||||
|
||||
// v1 version of localhost/stats?format=json reesponse body
|
||||
var responseV1 = `
|
||||
{
|
||||
"version": "1.0.0-compat",
|
||||
"health": "OK",
|
||||
"start_time": 1452021674,
|
||||
"topics": [
|
||||
{
|
||||
"topic_name": "t1",
|
||||
"channels": [
|
||||
{
|
||||
"channel_name": "c1",
|
||||
"depth": 0,
|
||||
"backend_depth": 1,
|
||||
"in_flight_count": 2,
|
||||
"deferred_count": 3,
|
||||
"message_count": 4,
|
||||
"requeue_count": 5,
|
||||
"timeout_count": 6,
|
||||
"clients": [
|
||||
{
|
||||
"client_id": "373a715cd990",
|
||||
"hostname": "373a715cd990",
|
||||
"version": "V2",
|
||||
"remote_address": "172.17.0.11:35560",
|
||||
"state": 3,
|
||||
"ready_count": 200,
|
||||
"in_flight_count": 7,
|
||||
"message_count": 8,
|
||||
"finish_count": 9,
|
||||
"requeue_count": 10,
|
||||
"connect_ts": 1452021675,
|
||||
"sample_rate": 11,
|
||||
"deflate": false,
|
||||
"snappy": false,
|
||||
"user_agent": "nsq_to_nsq\/0.3.6 go-nsq\/1.0.5",
|
||||
"tls": false,
|
||||
"tls_cipher_suite": "",
|
||||
"tls_version": "",
|
||||
"tls_negotiated_protocol": "",
|
||||
"tls_negotiated_protocol_is_mutual": false
|
||||
}
|
||||
],
|
||||
"paused": false,
|
||||
"e2e_processing_latency": {
|
||||
"count": 0,
|
||||
"percentiles": null
|
||||
}
|
||||
}
|
||||
],
|
||||
"depth": 12,
|
||||
"backend_depth": 13,
|
||||
"message_count": 14,
|
||||
"paused": false,
|
||||
"e2e_processing_latency": {
|
||||
"count": 0,
|
||||
"percentiles": null
|
||||
}
|
||||
},
|
||||
{
|
||||
"topic_name": "t2",
|
||||
"channels": [
|
||||
{
|
||||
"channel_name": "c2",
|
||||
"depth": 15,
|
||||
"backend_depth": 16,
|
||||
"in_flight_count": 17,
|
||||
"deferred_count": 18,
|
||||
"message_count": 19,
|
||||
"requeue_count": 20,
|
||||
"timeout_count": 21,
|
||||
"clients": [
|
||||
{
|
||||
"client_id": "377569bd462b",
|
||||
"hostname": "377569bd462b",
|
||||
"version": "V2",
|
||||
"remote_address": "172.17.0.8:48145",
|
||||
"state": 3,
|
||||
"ready_count": 22,
|
||||
"in_flight_count": 23,
|
||||
"message_count": 24,
|
||||
"finish_count": 25,
|
||||
"requeue_count": 26,
|
||||
"connect_ts": 1452021678,
|
||||
"sample_rate": 27,
|
||||
"deflate": true,
|
||||
"snappy": true,
|
||||
"user_agent": "go-nsq\/1.0.5",
|
||||
"tls": true,
|
||||
"tls_cipher_suite": "",
|
||||
"tls_version": "",
|
||||
"tls_negotiated_protocol": "",
|
||||
"tls_negotiated_protocol_is_mutual": false
|
||||
}
|
||||
],
|
||||
"paused": false,
|
||||
"e2e_processing_latency": {
|
||||
"count": 0,
|
||||
"percentiles": null
|
||||
}
|
||||
}
|
||||
],
|
||||
"depth": 28,
|
||||
"backend_depth": 29,
|
||||
"message_count": 30,
|
||||
"paused": false,
|
||||
"e2e_processing_latency": {
|
||||
"count": 0,
|
||||
"percentiles": null
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
`
|
||||
|
||||
// TestNSQStatsPreV1 is for backwards compatibility with nsq versions < 1.0
|
||||
func TestNSQStatsPreV1(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
fmt.Fprintln(w, responsePreV1)
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
|
@ -152,7 +409,7 @@ func TestNSQStats(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
var response = `
|
||||
var responsePreV1 = `
|
||||
{
|
||||
"status_code": 200,
|
||||
"status_txt": "OK",
|
||||
|
|
Loading…
Reference in New Issue