diff --git a/Godeps b/Godeps index c659227ca..8fb336d08 100644 --- a/Godeps +++ b/Godeps @@ -45,6 +45,7 @@ github.com/miekg/dns 99f84ae56e75126dd77e5de4fae2ea034a468ca1 github.com/mitchellh/mapstructure d0303fe809921458f417bcf828397a65db30a7e4 github.com/multiplay/go-ts3 07477f49b8dfa3ada231afc7b7b17617d42afe8e github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b +github.com/nats-io/gnatsd 393bbb7c031433e68707c8810fda0bfcfbe6ab9b github.com/nats-io/go-nats ea9585611a4ab58a205b9b125ebd74c389a6b898 github.com/nats-io/nats ea9585611a4ab58a205b9b125ebd74c389a6b898 github.com/nats-io/nuid 289cccf02c178dc782430d534e3c1f5b72af807f diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 92acc3270..cdf991c5a 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -54,6 +54,7 @@ following works: - github.com/miekg/dns [BSD](https://github.com/miekg/dns/blob/master/LICENSE) - github.com/naoina/go-stringutil [MIT](https://github.com/naoina/go-stringutil/blob/master/LICENSE) - github.com/naoina/toml [MIT](https://github.com/naoina/toml/blob/master/LICENSE) +- github.com/nats-io/gnatsd [MIT](https://github.com/nats-io/gnatsd/blob/master/LICENSE) - github.com/nats-io/go-nats [MIT](https://github.com/nats-io/go-nats/blob/master/LICENSE) - github.com/nats-io/nats [MIT](https://github.com/nats-io/nats/blob/master/LICENSE) - github.com/nats-io/nuid [MIT](https://github.com/nats-io/nuid/blob/master/LICENSE) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index aaf5b6ae7..fa78b3ff0 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -53,6 +53,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/mongodb" _ "github.com/influxdata/telegraf/plugins/inputs/mqtt_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/mysql" + _ "github.com/influxdata/telegraf/plugins/inputs/nats" _ "github.com/influxdata/telegraf/plugins/inputs/nats_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/net_response" _ "github.com/influxdata/telegraf/plugins/inputs/nginx" diff --git a/plugins/inputs/nats/README.md b/plugins/inputs/nats/README.md new file mode 100644 index 000000000..3cd9ee7ac --- /dev/null +++ b/plugins/inputs/nats/README.md @@ -0,0 +1,12 @@ +# NATS Monitoring Input Plugin + +The [NATS](http://www.nats.io/about/) monitoring plugin reads from +specified NATS instance and submits metrics to InfluxDB. + +## Configuration + +```toml +[[inputs.nats]] + ## The address of the monitoring end-point of the NATS server + server = "http://localhost:8222" +``` diff --git a/plugins/inputs/nats/nats.go b/plugins/inputs/nats/nats.go new file mode 100644 index 000000000..5099bd9fd --- /dev/null +++ b/plugins/inputs/nats/nats.go @@ -0,0 +1,112 @@ +package nats + +import ( + "io/ioutil" + "net/http" + "net/url" + "path" + "time" + + "encoding/json" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" + + gnatsd "github.com/nats-io/gnatsd/server" +) + +type Nats struct { + Server string + ResponseTimeout internal.Duration + + client *http.Client +} + +var sampleConfig = ` + ## The address of the monitoring endpoint of the NATS server + server = "http://localhost:1337" + + ## Maximum time to receive response + # response_timeout = "5s" +` + +func (n *Nats) SampleConfig() string { + return sampleConfig +} + +func (n *Nats) Description() string { + return "Provides metrics about the state of a NATS server" +} + +func (n *Nats) Gather(acc telegraf.Accumulator) error { + url, err := url.Parse(n.Server) + if err != nil { + return err + } + url.Path = path.Join(url.Path, "varz") + + if n.client == nil { + n.client = n.createHTTPClient() + } + resp, err := n.client.Get(url.String()) + if err != nil { + return err + } + defer resp.Body.Close() + + bytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + stats := new(gnatsd.Varz) + err = json.Unmarshal([]byte(bytes), &stats) + if err != nil { + return err + } + + acc.AddFields("nats", + map[string]interface{}{ + "in_msgs": stats.InMsgs, + "out_msgs": stats.OutMsgs, + "in_bytes": stats.InBytes, + "out_bytes": stats.OutBytes, + "uptime": stats.Now.Sub(stats.Start).Nanoseconds(), + "cores": stats.Cores, + "cpu": stats.CPU, + "mem": stats.Mem, + "connections": stats.Connections, + "total_connections": stats.TotalConnections, + "subscriptions": stats.Subscriptions, + "slow_consumers": stats.SlowConsumers, + "routes": stats.Routes, + "remotes": stats.Remotes, + }, + map[string]string{"server": n.Server}, + time.Now()) + + return nil +} + +func (n *Nats) createHTTPClient() *http.Client { + transport := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + } + timeout := n.ResponseTimeout.Duration + if timeout == time.Duration(0) { + timeout = 5 * time.Second + } + return &http.Client{ + Transport: transport, + Timeout: timeout, + } +} + +func init() { + inputs.Add("nats", func() telegraf.Input { + return &Nats{ + Server: "http://localhost:8222", + } + }) +} diff --git a/plugins/inputs/nats/nats_test.go b/plugins/inputs/nats/nats_test.go new file mode 100644 index 000000000..7dd28006b --- /dev/null +++ b/plugins/inputs/nats/nats_test.go @@ -0,0 +1,112 @@ +package nats + +import ( + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +var sampleVarz = ` +{ + "server_id": "n2afhLHLl64Gcaj7S7jaNa", + "version": "1.0.0", + "go": "go1.8", + "host": "0.0.0.0", + "auth_required": false, + "ssl_required": false, + "tls_required": false, + "tls_verify": false, + "addr": "0.0.0.0", + "max_connections": 65536, + "ping_interval": 120000000000, + "ping_max": 2, + "http_host": "0.0.0.0", + "http_port": 1337, + "https_port": 0, + "auth_timeout": 1, + "max_control_line": 1024, + "cluster": { + "addr": "0.0.0.0", + "cluster_port": 0, + "auth_timeout": 1 + }, + "tls_timeout": 0.5, + "port": 4222, + "max_payload": 1048576, + "start": "1861-04-12T10:15:26.841483489-05:00", + "now": "2011-10-05T15:24:23.722084098-07:00", + "uptime": "150y5md237h8m57s", + "mem": 15581184, + "cores": 48, + "cpu": 9, + "connections": 5, + "total_connections": 109, + "routes": 1, + "remotes": 2, + "in_msgs": 74148556, + "out_msgs": 68863261, + "in_bytes": 946267004717, + "out_bytes": 948110960598, + "slow_consumers": 2, + "subscriptions": 4, + "http_req_stats": { + "/": 1, + "/connz": 100847, + "/routez": 0, + "/subsz": 1, + "/varz": 205785 + }, + "config_load_time": "2017-07-24T10:15:26.841483489-05:00" +} +` + +func TestMetricsCorrect(t *testing.T) { + var acc testutil.Accumulator + + srv := newTestNatsServer() + defer srv.Close() + + n := &Nats{Server: srv.URL} + err := n.Gather(&acc) + require.NoError(t, err) + + fields := map[string]interface{}{ + "in_msgs": int64(74148556), + "out_msgs": int64(68863261), + "in_bytes": int64(946267004717), + "out_bytes": int64(948110960598), + "uptime": int64(4748742536880600609), + "cores": 48, + "cpu": float64(9), + "mem": int64(15581184), + "connections": int(5), + "total_connections": uint64(109), + "subscriptions": uint32(4), + "slow_consumers": int64(2), + "routes": int(1), + "remotes": int(2), + } + tags := map[string]string{ + "server": srv.URL, + } + acc.AssertContainsTaggedFields(t, "nats", fields, tags) +} + +func newTestNatsServer() *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var rsp string + + switch r.URL.Path { + case "/varz": + rsp = sampleVarz + default: + panic("Cannot handle request") + } + + fmt.Fprintln(w, rsp) + })) +}