Fix NSQ input plugin when used with version 1.0.0-compat
This commit is contained in:
parent
e99099f49c
commit
5be114ca6f
|
@ -25,6 +25,7 @@ package nsq
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -101,28 +102,42 @@ func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error {
|
||||||
return fmt.Errorf("%s returned HTTP status %s", u.String(), r.Status)
|
return fmt.Errorf("%s returned HTTP status %s", u.String(), r.Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
s := &NSQStats{}
|
body, err := ioutil.ReadAll(r.Body)
|
||||||
err = json.NewDecoder(r.Body).Decode(s)
|
if err != nil {
|
||||||
|
return fmt.Errorf(`Error reading body: %s`, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
data := &NSQStatsData{}
|
||||||
|
err = json.Unmarshal(body, data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf(`Error parsing response: %s`, err)
|
return fmt.Errorf(`Error parsing response: %s`, err)
|
||||||
}
|
}
|
||||||
|
// Data was not parsed correctly attempt to use old format.
|
||||||
|
if len(data.Version) < 1 {
|
||||||
|
wrapper := &NSQStats{}
|
||||||
|
err = json.Unmarshal(body, wrapper)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf(`Error parsing response: %s`, err)
|
||||||
|
}
|
||||||
|
data = &wrapper.Data
|
||||||
|
}
|
||||||
|
|
||||||
tags := map[string]string{
|
tags := map[string]string{
|
||||||
`server_host`: u.Host,
|
`server_host`: u.Host,
|
||||||
`server_version`: s.Data.Version,
|
`server_version`: data.Version,
|
||||||
}
|
}
|
||||||
|
|
||||||
fields := make(map[string]interface{})
|
fields := make(map[string]interface{})
|
||||||
if s.Data.Health == `OK` {
|
if data.Health == `OK` {
|
||||||
fields["server_count"] = int64(1)
|
fields["server_count"] = int64(1)
|
||||||
} else {
|
} else {
|
||||||
fields["server_count"] = int64(0)
|
fields["server_count"] = int64(0)
|
||||||
}
|
}
|
||||||
fields["topic_count"] = int64(len(s.Data.Topics))
|
fields["topic_count"] = int64(len(data.Topics))
|
||||||
|
|
||||||
acc.AddFields("nsq_server", fields, tags)
|
acc.AddFields("nsq_server", fields, tags)
|
||||||
for _, t := range s.Data.Topics {
|
for _, t := range data.Topics {
|
||||||
topicStats(t, acc, u.Host, s.Data.Version)
|
topicStats(t, acc, u.Host, data.Version)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -189,7 +204,6 @@ func clientStats(c ClientStats, acc telegraf.Accumulator, host, version, topic,
|
||||||
"server_version": version,
|
"server_version": version,
|
||||||
"topic": topic,
|
"topic": topic,
|
||||||
"channel": channel,
|
"channel": channel,
|
||||||
"client_name": c.Name,
|
|
||||||
"client_id": c.ID,
|
"client_id": c.ID,
|
||||||
"client_hostname": c.Hostname,
|
"client_hostname": c.Hostname,
|
||||||
"client_version": c.Version,
|
"client_version": c.Version,
|
||||||
|
@ -199,6 +213,9 @@ func clientStats(c ClientStats, acc telegraf.Accumulator, host, version, topic,
|
||||||
"client_snappy": strconv.FormatBool(c.Snappy),
|
"client_snappy": strconv.FormatBool(c.Snappy),
|
||||||
"client_deflate": strconv.FormatBool(c.Deflate),
|
"client_deflate": strconv.FormatBool(c.Deflate),
|
||||||
}
|
}
|
||||||
|
if len(c.Name) > 0 {
|
||||||
|
tags["client_name"] = c.Name
|
||||||
|
}
|
||||||
|
|
||||||
fields := map[string]interface{}{
|
fields := map[string]interface{}{
|
||||||
"ready_count": c.ReadyCount,
|
"ready_count": c.ReadyCount,
|
||||||
|
@ -248,7 +265,7 @@ type ChannelStats struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type ClientStats struct {
|
type ClientStats struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"` // DEPRECATED 1.x+, still here as the structs are currently being shared for parsing v3.x and 1.x
|
||||||
ID string `json:"client_id"`
|
ID string `json:"client_id"`
|
||||||
Hostname string `json:"hostname"`
|
Hostname string `json:"hostname"`
|
||||||
Version string `json:"version"`
|
Version string `json:"version"`
|
||||||
|
|
|
@ -12,10 +12,267 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestNSQStats(t *testing.T) {
|
func TestNSQStatsV1(t *testing.T) {
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
fmt.Fprintln(w, response)
|
fmt.Fprintln(w, responseV1)
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
n := &NSQ{
|
||||||
|
Endpoints: []string{ts.URL},
|
||||||
|
}
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
err := acc.GatherError(n.Gather)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
u, err := url.Parse(ts.URL)
|
||||||
|
require.NoError(t, err)
|
||||||
|
host := u.Host
|
||||||
|
|
||||||
|
// actually validate the tests
|
||||||
|
tests := []struct {
|
||||||
|
m string
|
||||||
|
f map[string]interface{}
|
||||||
|
g map[string]string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"nsq_server",
|
||||||
|
map[string]interface{}{
|
||||||
|
"server_count": int64(1),
|
||||||
|
"topic_count": int64(2),
|
||||||
|
},
|
||||||
|
map[string]string{
|
||||||
|
"server_host": host,
|
||||||
|
"server_version": "1.0.0-compat",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"nsq_topic",
|
||||||
|
map[string]interface{}{
|
||||||
|
"depth": int64(12),
|
||||||
|
"backend_depth": int64(13),
|
||||||
|
"message_count": int64(14),
|
||||||
|
"channel_count": int64(1),
|
||||||
|
},
|
||||||
|
map[string]string{
|
||||||
|
"server_host": host,
|
||||||
|
"server_version": "1.0.0-compat",
|
||||||
|
"topic": "t1"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"nsq_channel",
|
||||||
|
map[string]interface{}{
|
||||||
|
"depth": int64(0),
|
||||||
|
"backend_depth": int64(1),
|
||||||
|
"inflight_count": int64(2),
|
||||||
|
"deferred_count": int64(3),
|
||||||
|
"message_count": int64(4),
|
||||||
|
"requeue_count": int64(5),
|
||||||
|
"timeout_count": int64(6),
|
||||||
|
"client_count": int64(1),
|
||||||
|
},
|
||||||
|
map[string]string{
|
||||||
|
"server_host": host,
|
||||||
|
"server_version": "1.0.0-compat",
|
||||||
|
"topic": "t1",
|
||||||
|
"channel": "c1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"nsq_client",
|
||||||
|
map[string]interface{}{
|
||||||
|
"ready_count": int64(200),
|
||||||
|
"inflight_count": int64(7),
|
||||||
|
"message_count": int64(8),
|
||||||
|
"finish_count": int64(9),
|
||||||
|
"requeue_count": int64(10),
|
||||||
|
},
|
||||||
|
map[string]string{"server_host": host, "server_version": "1.0.0-compat",
|
||||||
|
"topic": "t1", "channel": "c1",
|
||||||
|
"client_id": "373a715cd990", "client_hostname": "373a715cd990",
|
||||||
|
"client_version": "V2", "client_address": "172.17.0.11:35560",
|
||||||
|
"client_tls": "false", "client_snappy": "false",
|
||||||
|
"client_deflate": "false",
|
||||||
|
"client_user_agent": "nsq_to_nsq/0.3.6 go-nsq/1.0.5"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"nsq_topic",
|
||||||
|
map[string]interface{}{
|
||||||
|
"depth": int64(28),
|
||||||
|
"backend_depth": int64(29),
|
||||||
|
"message_count": int64(30),
|
||||||
|
"channel_count": int64(1),
|
||||||
|
},
|
||||||
|
map[string]string{
|
||||||
|
"server_host": host,
|
||||||
|
"server_version": "1.0.0-compat",
|
||||||
|
"topic": "t2"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"nsq_channel",
|
||||||
|
map[string]interface{}{
|
||||||
|
"depth": int64(15),
|
||||||
|
"backend_depth": int64(16),
|
||||||
|
"inflight_count": int64(17),
|
||||||
|
"deferred_count": int64(18),
|
||||||
|
"message_count": int64(19),
|
||||||
|
"requeue_count": int64(20),
|
||||||
|
"timeout_count": int64(21),
|
||||||
|
"client_count": int64(1),
|
||||||
|
},
|
||||||
|
map[string]string{
|
||||||
|
"server_host": host,
|
||||||
|
"server_version": "1.0.0-compat",
|
||||||
|
"topic": "t2",
|
||||||
|
"channel": "c2",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"nsq_client",
|
||||||
|
map[string]interface{}{
|
||||||
|
"ready_count": int64(22),
|
||||||
|
"inflight_count": int64(23),
|
||||||
|
"message_count": int64(24),
|
||||||
|
"finish_count": int64(25),
|
||||||
|
"requeue_count": int64(26),
|
||||||
|
},
|
||||||
|
map[string]string{"server_host": host, "server_version": "1.0.0-compat",
|
||||||
|
"topic": "t2", "channel": "c2",
|
||||||
|
"client_id": "377569bd462b", "client_hostname": "377569bd462b",
|
||||||
|
"client_version": "V2", "client_address": "172.17.0.8:48145",
|
||||||
|
"client_user_agent": "go-nsq/1.0.5", "client_tls": "true",
|
||||||
|
"client_snappy": "true", "client_deflate": "true"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
acc.AssertContainsTaggedFields(t, test.m, test.f, test.g)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// v1 version of localhost/stats?format=json reesponse body
|
||||||
|
var responseV1 = `
|
||||||
|
{
|
||||||
|
"version": "1.0.0-compat",
|
||||||
|
"health": "OK",
|
||||||
|
"start_time": 1452021674,
|
||||||
|
"topics": [
|
||||||
|
{
|
||||||
|
"topic_name": "t1",
|
||||||
|
"channels": [
|
||||||
|
{
|
||||||
|
"channel_name": "c1",
|
||||||
|
"depth": 0,
|
||||||
|
"backend_depth": 1,
|
||||||
|
"in_flight_count": 2,
|
||||||
|
"deferred_count": 3,
|
||||||
|
"message_count": 4,
|
||||||
|
"requeue_count": 5,
|
||||||
|
"timeout_count": 6,
|
||||||
|
"clients": [
|
||||||
|
{
|
||||||
|
"client_id": "373a715cd990",
|
||||||
|
"hostname": "373a715cd990",
|
||||||
|
"version": "V2",
|
||||||
|
"remote_address": "172.17.0.11:35560",
|
||||||
|
"state": 3,
|
||||||
|
"ready_count": 200,
|
||||||
|
"in_flight_count": 7,
|
||||||
|
"message_count": 8,
|
||||||
|
"finish_count": 9,
|
||||||
|
"requeue_count": 10,
|
||||||
|
"connect_ts": 1452021675,
|
||||||
|
"sample_rate": 11,
|
||||||
|
"deflate": false,
|
||||||
|
"snappy": false,
|
||||||
|
"user_agent": "nsq_to_nsq\/0.3.6 go-nsq\/1.0.5",
|
||||||
|
"tls": false,
|
||||||
|
"tls_cipher_suite": "",
|
||||||
|
"tls_version": "",
|
||||||
|
"tls_negotiated_protocol": "",
|
||||||
|
"tls_negotiated_protocol_is_mutual": false
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"paused": false,
|
||||||
|
"e2e_processing_latency": {
|
||||||
|
"count": 0,
|
||||||
|
"percentiles": null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"depth": 12,
|
||||||
|
"backend_depth": 13,
|
||||||
|
"message_count": 14,
|
||||||
|
"paused": false,
|
||||||
|
"e2e_processing_latency": {
|
||||||
|
"count": 0,
|
||||||
|
"percentiles": null
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"topic_name": "t2",
|
||||||
|
"channels": [
|
||||||
|
{
|
||||||
|
"channel_name": "c2",
|
||||||
|
"depth": 15,
|
||||||
|
"backend_depth": 16,
|
||||||
|
"in_flight_count": 17,
|
||||||
|
"deferred_count": 18,
|
||||||
|
"message_count": 19,
|
||||||
|
"requeue_count": 20,
|
||||||
|
"timeout_count": 21,
|
||||||
|
"clients": [
|
||||||
|
{
|
||||||
|
"client_id": "377569bd462b",
|
||||||
|
"hostname": "377569bd462b",
|
||||||
|
"version": "V2",
|
||||||
|
"remote_address": "172.17.0.8:48145",
|
||||||
|
"state": 3,
|
||||||
|
"ready_count": 22,
|
||||||
|
"in_flight_count": 23,
|
||||||
|
"message_count": 24,
|
||||||
|
"finish_count": 25,
|
||||||
|
"requeue_count": 26,
|
||||||
|
"connect_ts": 1452021678,
|
||||||
|
"sample_rate": 27,
|
||||||
|
"deflate": true,
|
||||||
|
"snappy": true,
|
||||||
|
"user_agent": "go-nsq\/1.0.5",
|
||||||
|
"tls": true,
|
||||||
|
"tls_cipher_suite": "",
|
||||||
|
"tls_version": "",
|
||||||
|
"tls_negotiated_protocol": "",
|
||||||
|
"tls_negotiated_protocol_is_mutual": false
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"paused": false,
|
||||||
|
"e2e_processing_latency": {
|
||||||
|
"count": 0,
|
||||||
|
"percentiles": null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"depth": 28,
|
||||||
|
"backend_depth": 29,
|
||||||
|
"message_count": 30,
|
||||||
|
"paused": false,
|
||||||
|
"e2e_processing_latency": {
|
||||||
|
"count": 0,
|
||||||
|
"percentiles": null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
`
|
||||||
|
|
||||||
|
// TestNSQStatsPreV1 is for backwards compatibility with nsq versions < 1.0
|
||||||
|
func TestNSQStatsPreV1(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
fmt.Fprintln(w, responsePreV1)
|
||||||
}))
|
}))
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
|
@ -152,7 +409,7 @@ func TestNSQStats(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var response = `
|
var responsePreV1 = `
|
||||||
{
|
{
|
||||||
"status_code": 200,
|
"status_code": 200,
|
||||||
"status_txt": "OK",
|
"status_txt": "OK",
|
||||||
|
|
Loading…
Reference in New Issue