From bc9a71e1117b07599cbdbb45a783beb1624b17fb Mon Sep 17 00:00:00 2001 From: Mark Rushakoff Date: Sat, 5 Dec 2015 14:15:58 -0800 Subject: [PATCH 1/5] Add expvar plugin This was primarily intended to consume InfluxDB-style expvars, particularly InfluxDB's `/debug/vars` endpoint. That endpoint follows a structure like ```json { "httpd::8086": { "name": "httpd", "tags": { "bind": ":8086" }, "values": { "pointsWrittenOK": 33756, "queryReq": 19, "queryRespBytes": 26973, "req": 428, "writeReq": 205, "writeReqBytes": 3939161 } } } ``` There are an arbitrary number of top-level keys in the JSON response at the configured URLs, and this plugin will iterate through all of their values looking for objects with keys "name", "tags", and "values" indicating a metric to be consumed by telegraf. Running this on current master of InfluxDB, I am able to record nearly the same information that is normally stored in the `_internal` database; the only measurement missing from `_internal` is `runtime`, which is present in expvars under the "memstats" key but does not follow the expvar format and so is not consumed in this plugin. ``` $ influx -database=telegraf -execute 'SHOW FIELD KEYS FROM /expvar/' name: expvar_influxdb_engine ---------------------------- fieldKey blksWrite blksWriteBytes blksWriteBytesC pointsWrite pointsWriteDedupe name: expvar_influxdb_httpd --------------------------- fieldKey pingReq pointsWrittenOK queryReq queryRespBytes req writeReq writeReqBytes name: expvar_influxdb_shard --------------------------- fieldKey fieldsCreate seriesCreate writePointsOk writeReq name: expvar_influxdb_subscriber -------------------------------- fieldKey pointsWritten name: expvar_influxdb_wal ------------------------- fieldKey autoFlush flushDuration idleFlush memSize metaFlush pointsFlush pointsWrite pointsWriteReq seriesFlush name: expvar_influxdb_write --------------------------- fieldKey pointReq pointReqLocal req subWriteOk writeOk ``` --- plugins/all/all.go | 1 + plugins/expvar/README.md | 74 ++++++++++++++++ plugins/expvar/expvar.go | 154 ++++++++++++++++++++++++++++++++++ plugins/expvar/expvar_test.go | 104 +++++++++++++++++++++++ 4 files changed, 333 insertions(+) create mode 100644 plugins/expvar/README.md create mode 100644 plugins/expvar/expvar.go create mode 100644 plugins/expvar/expvar_test.go diff --git a/plugins/all/all.go b/plugins/all/all.go index 676d25707..e6303e066 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -7,6 +7,7 @@ import ( _ "github.com/influxdb/telegraf/plugins/disque" _ "github.com/influxdb/telegraf/plugins/elasticsearch" _ "github.com/influxdb/telegraf/plugins/exec" + _ "github.com/influxdb/telegraf/plugins/expvar" _ "github.com/influxdb/telegraf/plugins/haproxy" _ "github.com/influxdb/telegraf/plugins/httpjson" _ "github.com/influxdb/telegraf/plugins/jolokia" diff --git a/plugins/expvar/README.md b/plugins/expvar/README.md new file mode 100644 index 000000000..724a46b33 --- /dev/null +++ b/plugins/expvar/README.md @@ -0,0 +1,74 @@ +# expvar plugin + +The expvar plugin collects InfluxDB-style expvar data from JSON endpoints. + +With a configuration of: + +```toml +[plugins.expvar] + [[plugins.expvar.services]] + name = "produce" + urls = [ + "http://127.0.0.1:8086/debug/vars", + "http://192.168.2.1:8086/debug/vars" + ] +``` + +And if 127.0.0.1 responds with this JSON: + +```json +{ + "k1": { + "name": "fruit", + "tags": { + "kind": "apple" + }, + "values": { + "inventory": 371, + "sold": 112 + } + }, + "k2": { + "name": "fruit", + "tags": { + "kind": "banana" + }, + "values": { + "inventory": 1000, + "sold": 403 + } + } +} +``` + +And if 192.168.2.1 responds like so: + +```json +{ + "k3": { + "name": "transactions", + "tags": {}, + "values": { + "total": 100, + "balance": 184.75 + } + } +} +``` + +Then the collected metrics will be: + +``` +expvar_produce_fruit,expvar_url='http://127.0.0.1:8086/debug/vars',kind='apple' inventory=371.0,sold=112.0 +expvar_produce_fruit,expvar_url='http://127.0.0.1:8086/debug/vars',kind='banana' inventory=1000.0,sold=403.0 + +expvar_produce_transactions,expvar_url='http://192.168.2.1:8086/debug/vars' total=100.0,balance=184.75 +``` + +There are two important details to note about the collected metrics: + +1. Even though the values in JSON are being displayed as integers, the metrics are reported as floats. +JSON encoders usually don't print the fractional part for round floats. +Because you cannot change the type of an existing field in InfluxDB, we assume all numbers are floats. + +2. The top-level keys' names (in the example above, `"k1"`, `"k2"`, and `"k3"`) are not considered when recording the metrics. diff --git a/plugins/expvar/expvar.go b/plugins/expvar/expvar.go new file mode 100644 index 000000000..cacd68e43 --- /dev/null +++ b/plugins/expvar/expvar.go @@ -0,0 +1,154 @@ +package expvar + +import ( + "encoding/json" + "errors" + "net/http" + "strings" + "sync" + + "github.com/influxdb/telegraf/plugins" +) + +var sampleConfig = ` + # Specify services via an array of tables + [[plugins.expvar.services]] + # Name for the service being polled + name = "influxdb" + + # Multiple URLs from which to read expvars + urls = [ + "http://localhost:8086/debug/vars" + ] +` + +type Expvar struct { + Services []Service +} + +type Service struct { + Name string + URLs []string `toml:"urls"` +} + +func (*Expvar) Description() string { + return "Read InfluxDB-style expvar metrics from one or more HTTP endpoints" +} + +func (*Expvar) SampleConfig() string { + return sampleConfig +} + +func (e *Expvar) Gather(acc plugins.Accumulator) error { + var wg sync.WaitGroup + + totalURLs := 0 + for _, service := range e.Services { + totalURLs += len(service.URLs) + } + errorChannel := make(chan error, totalURLs) + + for _, service := range e.Services { + for _, u := range service.URLs { + wg.Add(1) + go func(service Service, url string) { + defer wg.Done() + if err := e.gatherURL(acc, service, url); err != nil { + errorChannel <- err + } + }(service, u) + } + } + + wg.Wait() + close(errorChannel) + + // Get all errors and return them as one giant error + errorStrings := []string{} + for err := range errorChannel { + errorStrings = append(errorStrings, err.Error()) + } + + if len(errorStrings) == 0 { + return nil + } + return errors.New(strings.Join(errorStrings, "\n")) +} + +type point struct { + Name string `json:"name"` + Tags map[string]string `json:"tags"` + Values map[string]interface{} `json:"values"` +} + +// Gathers data from a particular URL +// Parameters: +// acc : The telegraf Accumulator to use +// service: the service being queried +// url : endpoint to send request to +// +// Returns: +// error: Any error that may have occurred +func (e *Expvar) gatherURL( + acc plugins.Accumulator, + service Service, + url string, +) error { + resp, err := http.Get(url) + if err != nil { + return err + } + defer resp.Body.Close() + + // Can't predict what all is going to be in the response, so decode the top keys one at a time. + dec := json.NewDecoder(resp.Body) + + // Parse beginning of object + if t, err := dec.Token(); err != nil { + return err + } else if t != json.Delim('{') { + return errors.New("expvars must be a JSON object") + } + + // Loop through rest of object + for { + // Nothing left in this object, we're done + if !dec.More() { + break + } + + // Read in a string key. We actually don't care about the top-level keys + _, err := dec.Token() + if err != nil { + return err + } + + // Attempt to parse a whole object into a point. + // It might be a non-object, like a string or array. + // If we fail to decode it into a point, ignore it and move on. + var p point + if err := dec.Decode(&p); err != nil { + continue + } + + if p.Name == "" || p.Tags == nil || p.Values == nil || len(p.Values) == 0 { + continue + } + + p.Tags["expvar_url"] = url + + acc.AddFields( + service.Name+"_"+p.Name, + p.Values, + p.Tags, + ) + } + + return nil +} + +func init() { + plugins.Add("expvar", func() plugins.Plugin { + return &Expvar{} + }) +} diff --git a/plugins/expvar/expvar_test.go b/plugins/expvar/expvar_test.go new file mode 100644 index 000000000..a60147e98 --- /dev/null +++ b/plugins/expvar/expvar_test.go @@ -0,0 +1,104 @@ +package expvar + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestBasic(t *testing.T) { + js := ` +{ + "_1": { + "name": "foo", + "tags": { + "id": "ex1" + }, + "values": { + "i": -1, + "f": 0.5, + "b": true, + "s": "string" + } + }, + "ignored": { + "willBeRecorded": false + }, + "ignoredAndNested": { + "hash": { + "is": "nested" + } + }, + "array": [ + "makes parsing more difficult than necessary" + ], + "string": "makes parsing more difficult than necessary", + "_2": { + "name": "bar", + "tags": { + "id": "ex2" + }, + "values": { + "x": "x" + } + }, + "pointWithoutFields_willNotBeIncluded": { + "name": "asdf", + "tags": { + "id": "ex3" + }, + "values": {} + } +} +` + fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/expvar" { + _, _ = w.Write([]byte(js)) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + defer fakeServer.Close() + + expvar := &Expvar{ + Services: []Service{ + { + Name: "test", + URLs: []string{fakeServer.URL + "/expvar"}, + }, + }, + } + + var acc testutil.Accumulator + require.NoError(t, expvar.Gather(&acc)) + + require.Len(t, acc.Points, 2) + require.NoError(t, acc.ValidateTaggedFieldsValue( + "test_foo", + map[string]interface{}{ + // JSON will truncate floats to integer representations. + // Since there's no distinction in JSON, we can't assume it's an int. + "i": -1.0, + "f": 0.5, + "b": true, + "s": "string", + }, + map[string]string{ + "id": "ex1", + "expvar_url": fakeServer.URL + "/expvar", + }, + )) + require.NoError(t, acc.ValidateTaggedFieldsValue( + "test_bar", + map[string]interface{}{ + "x": "x", + }, + map[string]string{ + "id": "ex2", + "expvar_url": fakeServer.URL + "/expvar", + }, + )) +} From b267c218e205cfcd123979ec705738db4601c038 Mon Sep 17 00:00:00 2001 From: Mark Rushakoff Date: Mon, 7 Dec 2015 22:47:43 -0800 Subject: [PATCH 2/5] Rename expvar plugin -> influxdbjson --- plugins/all/all.go | 2 +- plugins/{expvar => influxdbjson}/README.md | 23 ++-- .../influxdb_json.go} | 101 ++++++++++-------- .../influxdb_json_test.go} | 25 ++--- 4 files changed, 77 insertions(+), 74 deletions(-) rename plugins/{expvar => influxdbjson}/README.md (64%) rename plugins/{expvar/expvar.go => influxdbjson/influxdb_json.go} (51%) rename plugins/{expvar/expvar_test.go => influxdbjson/influxdb_json_test.go} (78%) diff --git a/plugins/all/all.go b/plugins/all/all.go index e6303e066..c280f6bf4 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -7,9 +7,9 @@ import ( _ "github.com/influxdb/telegraf/plugins/disque" _ "github.com/influxdb/telegraf/plugins/elasticsearch" _ "github.com/influxdb/telegraf/plugins/exec" - _ "github.com/influxdb/telegraf/plugins/expvar" _ "github.com/influxdb/telegraf/plugins/haproxy" _ "github.com/influxdb/telegraf/plugins/httpjson" + _ "github.com/influxdb/telegraf/plugins/influxdbjson" _ "github.com/influxdb/telegraf/plugins/jolokia" _ "github.com/influxdb/telegraf/plugins/kafka_consumer" _ "github.com/influxdb/telegraf/plugins/leofs" diff --git a/plugins/expvar/README.md b/plugins/influxdbjson/README.md similarity index 64% rename from plugins/expvar/README.md rename to plugins/influxdbjson/README.md index 724a46b33..fc292a0dc 100644 --- a/plugins/expvar/README.md +++ b/plugins/influxdbjson/README.md @@ -1,17 +1,16 @@ -# expvar plugin +# influxdbjson plugin -The expvar plugin collects InfluxDB-style expvar data from JSON endpoints. +The influxdbjson plugin collects InfluxDB-formatted data from JSON endpoints. With a configuration of: ```toml -[plugins.expvar] - [[plugins.expvar.services]] - name = "produce" - urls = [ - "http://127.0.0.1:8086/debug/vars", - "http://192.168.2.1:8086/debug/vars" - ] +[[plugins.influxdbjson]] + name = "produce" + urls = [ + "http://127.0.0.1:8086/debug/vars", + "http://192.168.2.1:8086/debug/vars" + ] ``` And if 127.0.0.1 responds with this JSON: @@ -59,10 +58,10 @@ And if 192.168.2.1 responds like so: Then the collected metrics will be: ``` -expvar_produce_fruit,expvar_url='http://127.0.0.1:8086/debug/vars',kind='apple' inventory=371.0,sold=112.0 -expvar_produce_fruit,expvar_url='http://127.0.0.1:8086/debug/vars',kind='banana' inventory=1000.0,sold=403.0 +influxdbjson_produce_fruit,influxdbjson_url='http://127.0.0.1:8086/debug/vars',kind='apple' inventory=371.0,sold=112.0 +influxdbjson_produce_fruit,influxdbjson_url='http://127.0.0.1:8086/debug/vars',kind='banana' inventory=1000.0,sold=403.0 -expvar_produce_transactions,expvar_url='http://192.168.2.1:8086/debug/vars' total=100.0,balance=184.75 +influxdbjson_produce_transactions,influxdbjson_url='http://192.168.2.1:8086/debug/vars' total=100.0,balance=184.75 ``` There are two important details to note about the collected metrics: diff --git a/plugins/expvar/expvar.go b/plugins/influxdbjson/influxdb_json.go similarity index 51% rename from plugins/expvar/expvar.go rename to plugins/influxdbjson/influxdb_json.go index cacd68e43..f8837cd1d 100644 --- a/plugins/expvar/expvar.go +++ b/plugins/influxdbjson/influxdb_json.go @@ -1,4 +1,4 @@ -package expvar +package influxdbjson import ( "encoding/json" @@ -10,54 +10,61 @@ import ( "github.com/influxdb/telegraf/plugins" ) -var sampleConfig = ` - # Specify services via an array of tables - [[plugins.expvar.services]] - # Name for the service being polled - name = "influxdb" - - # Multiple URLs from which to read expvars - urls = [ - "http://localhost:8086/debug/vars" - ] -` - -type Expvar struct { - Services []Service -} - -type Service struct { +type InfluxDBJSON struct { Name string URLs []string `toml:"urls"` } -func (*Expvar) Description() string { - return "Read InfluxDB-style expvar metrics from one or more HTTP endpoints" +func (*InfluxDBJSON) Description() string { + return "Read InfluxDB-formatted JSON metrics from one or more HTTP endpoints" } -func (*Expvar) SampleConfig() string { - return sampleConfig +func (*InfluxDBJSON) SampleConfig() string { + return ` + # Reads InfluxDB-formatted JSON from given URLs. For example, + # monitoring a URL which responded with a JSON object formatted like this: + # + # { + # "(ignored_key)": { + # "name": "connections", + # "tags": { + # "host": "foo" + # }, + # "values": { + # "avg_ms": 1.234, + # } + # } + # } + # + # with configuration of { name = "server", urls = ["http://127.0.0.1:8086/x"] } + # + # Would result in this recorded metric: + # + # influxdbjson_server_connections,influxdbjson_url='http://127.0.0.1:8086/x',host='foo' avg_ms=1.234 + [[plugins.influxdbjson]] + # Name to use for measurement + name = "influxdb" + + # Multiple URLs from which to read InfluxDB-formatted JSON + urls = [ + "http://localhost:8086/debug/vars" + ] +` } -func (e *Expvar) Gather(acc plugins.Accumulator) error { +func (i *InfluxDBJSON) Gather(acc plugins.Accumulator) error { var wg sync.WaitGroup - totalURLs := 0 - for _, service := range e.Services { - totalURLs += len(service.URLs) - } - errorChannel := make(chan error, totalURLs) + errorChannel := make(chan error, len(i.URLs)) - for _, service := range e.Services { - for _, u := range service.URLs { - wg.Add(1) - go func(service Service, url string) { - defer wg.Done() - if err := e.gatherURL(acc, service, url); err != nil { - errorChannel <- err - } - }(service, u) - } + for _, u := range i.URLs { + wg.Add(1) + go func(url string) { + defer wg.Done() + if err := i.gatherURL(acc, url); err != nil { + errorChannel <- err + } + }(u) } wg.Wait() @@ -84,14 +91,12 @@ type point struct { // Gathers data from a particular URL // Parameters: // acc : The telegraf Accumulator to use -// service: the service being queried // url : endpoint to send request to // // Returns: // error: Any error that may have occurred -func (e *Expvar) gatherURL( +func (i *InfluxDBJSON) gatherURL( acc plugins.Accumulator, - service Service, url string, ) error { resp, err := http.Get(url) @@ -107,7 +112,7 @@ func (e *Expvar) gatherURL( if t, err := dec.Token(); err != nil { return err } else if t != json.Delim('{') { - return errors.New("expvars must be a JSON object") + return errors.New("document root must be a JSON object") } // Loop through rest of object @@ -117,7 +122,7 @@ func (e *Expvar) gatherURL( break } - // Read in a string key. We actually don't care about the top-level keys + // Read in a string key. We don't do anything with the top-level keys, so it's discarded. _, err := dec.Token() if err != nil { return err @@ -131,14 +136,16 @@ func (e *Expvar) gatherURL( continue } + // If the object was a point, but was not fully initialized, ignore it and move on. if p.Name == "" || p.Tags == nil || p.Values == nil || len(p.Values) == 0 { continue } - p.Tags["expvar_url"] = url + // Add a tag to indicate the source of the data. + p.Tags["influxdbjson_url"] = url acc.AddFields( - service.Name+"_"+p.Name, + i.Name+"_"+p.Name, p.Values, p.Tags, ) @@ -148,7 +155,7 @@ func (e *Expvar) gatherURL( } func init() { - plugins.Add("expvar", func() plugins.Plugin { - return &Expvar{} + plugins.Add("influxdbjson", func() plugins.Plugin { + return &InfluxDBJSON{} }) } diff --git a/plugins/expvar/expvar_test.go b/plugins/influxdbjson/influxdb_json_test.go similarity index 78% rename from plugins/expvar/expvar_test.go rename to plugins/influxdbjson/influxdb_json_test.go index a60147e98..ed75f4fb8 100644 --- a/plugins/expvar/expvar_test.go +++ b/plugins/influxdbjson/influxdb_json_test.go @@ -1,10 +1,11 @@ -package expvar +package influxdbjson_test import ( "net/http" "net/http/httptest" "testing" + "github.com/influxdb/telegraf/plugins/influxdbjson" "github.com/influxdb/telegraf/testutil" "github.com/stretchr/testify/require" ) @@ -55,7 +56,7 @@ func TestBasic(t *testing.T) { } ` fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path == "/expvar" { + if r.URL.Path == "/endpoint" { _, _ = w.Write([]byte(js)) } else { w.WriteHeader(http.StatusNotFound) @@ -63,17 +64,13 @@ func TestBasic(t *testing.T) { })) defer fakeServer.Close() - expvar := &Expvar{ - Services: []Service{ - { - Name: "test", - URLs: []string{fakeServer.URL + "/expvar"}, - }, - }, + plugin := &influxdbjson.InfluxDBJSON{ + Name: "test", + URLs: []string{fakeServer.URL + "/endpoint"}, } var acc testutil.Accumulator - require.NoError(t, expvar.Gather(&acc)) + require.NoError(t, plugin.Gather(&acc)) require.Len(t, acc.Points, 2) require.NoError(t, acc.ValidateTaggedFieldsValue( @@ -87,8 +84,8 @@ func TestBasic(t *testing.T) { "s": "string", }, map[string]string{ - "id": "ex1", - "expvar_url": fakeServer.URL + "/expvar", + "id": "ex1", + "influxdbjson_url": fakeServer.URL + "/endpoint", }, )) require.NoError(t, acc.ValidateTaggedFieldsValue( @@ -97,8 +94,8 @@ func TestBasic(t *testing.T) { "x": "x", }, map[string]string{ - "id": "ex2", - "expvar_url": fakeServer.URL + "/expvar", + "id": "ex2", + "influxdbjson_url": fakeServer.URL + "/endpoint", }, )) } From 78e73ed78c84964b8cced2884902417a0b01f148 Mon Sep 17 00:00:00 2001 From: Mark Rushakoff Date: Mon, 7 Dec 2015 23:27:40 -0800 Subject: [PATCH 3/5] Fix whitespace --- plugins/influxdbjson/README.md | 10 ++++----- plugins/influxdbjson/influxdb_json.go | 32 +++++++++++++-------------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/plugins/influxdbjson/README.md b/plugins/influxdbjson/README.md index fc292a0dc..e4c4bf118 100644 --- a/plugins/influxdbjson/README.md +++ b/plugins/influxdbjson/README.md @@ -6,11 +6,11 @@ With a configuration of: ```toml [[plugins.influxdbjson]] - name = "produce" - urls = [ - "http://127.0.0.1:8086/debug/vars", - "http://192.168.2.1:8086/debug/vars" - ] + name = "produce" + urls = [ + "http://127.0.0.1:8086/debug/vars", + "http://192.168.2.1:8086/debug/vars" + ] ``` And if 127.0.0.1 responds with this JSON: diff --git a/plugins/influxdbjson/influxdb_json.go b/plugins/influxdbjson/influxdb_json.go index f8837cd1d..f0066ab78 100644 --- a/plugins/influxdbjson/influxdb_json.go +++ b/plugins/influxdbjson/influxdb_json.go @@ -21,27 +21,27 @@ func (*InfluxDBJSON) Description() string { func (*InfluxDBJSON) SampleConfig() string { return ` - # Reads InfluxDB-formatted JSON from given URLs. For example, + # Reads InfluxDB-formatted JSON from given URLs. For example, # monitoring a URL which responded with a JSON object formatted like this: # - # { - # "(ignored_key)": { - # "name": "connections", - # "tags": { - # "host": "foo" - # }, - # "values": { - # "avg_ms": 1.234, - # } - # } - # } - # + # { + # "(ignored_key)": { + # "name": "connections", + # "tags": { + # "host": "foo" + # }, + # "values": { + # "avg_ms": 1.234, + # } + # } + # } + # # with configuration of { name = "server", urls = ["http://127.0.0.1:8086/x"] } - # + # # Would result in this recorded metric: # - # influxdbjson_server_connections,influxdbjson_url='http://127.0.0.1:8086/x',host='foo' avg_ms=1.234 - [[plugins.influxdbjson]] + # influxdbjson_server_connections,influxdbjson_url='http://127.0.0.1:8086/x',host='foo' avg_ms=1.234 + [[plugins.influxdbjson]] # Name to use for measurement name = "influxdb" From 65b9d2fcc9749c7bef98a0e6478036b0c193999e Mon Sep 17 00:00:00 2001 From: Mark Rushakoff Date: Thu, 10 Dec 2015 16:25:17 -0800 Subject: [PATCH 4/5] Rename `influxdbjson_url` tag to `url` --- plugins/influxdbjson/README.md | 6 +++--- plugins/influxdbjson/influxdb_json.go | 6 +++--- plugins/influxdbjson/influxdb_json_test.go | 22 +++++++++++----------- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/plugins/influxdbjson/README.md b/plugins/influxdbjson/README.md index e4c4bf118..f26c937f2 100644 --- a/plugins/influxdbjson/README.md +++ b/plugins/influxdbjson/README.md @@ -58,10 +58,10 @@ And if 192.168.2.1 responds like so: Then the collected metrics will be: ``` -influxdbjson_produce_fruit,influxdbjson_url='http://127.0.0.1:8086/debug/vars',kind='apple' inventory=371.0,sold=112.0 -influxdbjson_produce_fruit,influxdbjson_url='http://127.0.0.1:8086/debug/vars',kind='banana' inventory=1000.0,sold=403.0 +influxdbjson_produce_fruit,url='http://127.0.0.1:8086/debug/vars',kind='apple' inventory=371.0,sold=112.0 +influxdbjson_produce_fruit,url='http://127.0.0.1:8086/debug/vars',kind='banana' inventory=1000.0,sold=403.0 -influxdbjson_produce_transactions,influxdbjson_url='http://192.168.2.1:8086/debug/vars' total=100.0,balance=184.75 +influxdbjson_produce_transactions,url='http://192.168.2.1:8086/debug/vars' total=100.0,balance=184.75 ``` There are two important details to note about the collected metrics: diff --git a/plugins/influxdbjson/influxdb_json.go b/plugins/influxdbjson/influxdb_json.go index f0066ab78..61d966e47 100644 --- a/plugins/influxdbjson/influxdb_json.go +++ b/plugins/influxdbjson/influxdb_json.go @@ -40,14 +40,14 @@ func (*InfluxDBJSON) SampleConfig() string { # # Would result in this recorded metric: # - # influxdbjson_server_connections,influxdbjson_url='http://127.0.0.1:8086/x',host='foo' avg_ms=1.234 + # influxdbjson_server_connections,url='http://127.0.0.1:8086/x',host='foo' avg_ms=1.234 [[plugins.influxdbjson]] # Name to use for measurement name = "influxdb" # Multiple URLs from which to read InfluxDB-formatted JSON urls = [ - "http://localhost:8086/debug/vars" + "http://localhost:8086/debug/vars" ] ` } @@ -142,7 +142,7 @@ func (i *InfluxDBJSON) gatherURL( } // Add a tag to indicate the source of the data. - p.Tags["influxdbjson_url"] = url + p.Tags["url"] = url acc.AddFields( i.Name+"_"+p.Name, diff --git a/plugins/influxdbjson/influxdb_json_test.go b/plugins/influxdbjson/influxdb_json_test.go index ed75f4fb8..1f3663e20 100644 --- a/plugins/influxdbjson/influxdb_json_test.go +++ b/plugins/influxdbjson/influxdb_json_test.go @@ -26,11 +26,11 @@ func TestBasic(t *testing.T) { } }, "ignored": { - "willBeRecorded": false + "willBeRecorded": false }, "ignoredAndNested": { "hash": { - "is": "nested" + "is": "nested" } }, "array": [ @@ -40,11 +40,11 @@ func TestBasic(t *testing.T) { "_2": { "name": "bar", "tags": { - "id": "ex2" - }, - "values": { - "x": "x" - } + "id": "ex2" + }, + "values": { + "x": "x" + } }, "pointWithoutFields_willNotBeIncluded": { "name": "asdf", @@ -84,8 +84,8 @@ func TestBasic(t *testing.T) { "s": "string", }, map[string]string{ - "id": "ex1", - "influxdbjson_url": fakeServer.URL + "/endpoint", + "id": "ex1", + "url": fakeServer.URL + "/endpoint", }, )) require.NoError(t, acc.ValidateTaggedFieldsValue( @@ -94,8 +94,8 @@ func TestBasic(t *testing.T) { "x": "x", }, map[string]string{ - "id": "ex2", - "influxdbjson_url": fakeServer.URL + "/endpoint", + "id": "ex2", + "url": fakeServer.URL + "/endpoint", }, )) } From 60dc19fe69a1ad69ee05f9acee266bedb2d246b5 Mon Sep 17 00:00:00 2001 From: Mark Rushakoff Date: Tue, 15 Dec 2015 22:12:47 -0800 Subject: [PATCH 5/5] Rename influxdbjson to influxdb --- plugins/all/all.go | 2 +- plugins/{influxdbjson => influxdb}/README.md | 12 +-- .../influxdb_json.go => influxdb/influxdb.go} | 77 ++++++++----------- .../influxdb_test.go} | 6 +- 4 files changed, 43 insertions(+), 54 deletions(-) rename plugins/{influxdbjson => influxdb}/README.md (73%) rename plugins/{influxdbjson/influxdb_json.go => influxdb/influxdb.go} (63%) rename plugins/{influxdbjson/influxdb_json_test.go => influxdb/influxdb_test.go} (93%) diff --git a/plugins/all/all.go b/plugins/all/all.go index c280f6bf4..8b4e754be 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -9,7 +9,7 @@ import ( _ "github.com/influxdb/telegraf/plugins/exec" _ "github.com/influxdb/telegraf/plugins/haproxy" _ "github.com/influxdb/telegraf/plugins/httpjson" - _ "github.com/influxdb/telegraf/plugins/influxdbjson" + _ "github.com/influxdb/telegraf/plugins/influxdb" _ "github.com/influxdb/telegraf/plugins/jolokia" _ "github.com/influxdb/telegraf/plugins/kafka_consumer" _ "github.com/influxdb/telegraf/plugins/leofs" diff --git a/plugins/influxdbjson/README.md b/plugins/influxdb/README.md similarity index 73% rename from plugins/influxdbjson/README.md rename to plugins/influxdb/README.md index f26c937f2..c55aa1fc8 100644 --- a/plugins/influxdbjson/README.md +++ b/plugins/influxdb/README.md @@ -1,11 +1,11 @@ -# influxdbjson plugin +# influxdb plugin -The influxdbjson plugin collects InfluxDB-formatted data from JSON endpoints. +The influxdb plugin collects InfluxDB-formatted data from JSON endpoints. With a configuration of: ```toml -[[plugins.influxdbjson]] +[[plugins.influxdb]] name = "produce" urls = [ "http://127.0.0.1:8086/debug/vars", @@ -58,10 +58,10 @@ And if 192.168.2.1 responds like so: Then the collected metrics will be: ``` -influxdbjson_produce_fruit,url='http://127.0.0.1:8086/debug/vars',kind='apple' inventory=371.0,sold=112.0 -influxdbjson_produce_fruit,url='http://127.0.0.1:8086/debug/vars',kind='banana' inventory=1000.0,sold=403.0 +influxdb_produce_fruit,url='http://127.0.0.1:8086/debug/vars',kind='apple' inventory=371.0,sold=112.0 +influxdb_produce_fruit,url='http://127.0.0.1:8086/debug/vars',kind='banana' inventory=1000.0,sold=403.0 -influxdbjson_produce_transactions,url='http://192.168.2.1:8086/debug/vars' total=100.0,balance=184.75 +influxdb_produce_transactions,url='http://192.168.2.1:8086/debug/vars' total=100.0,balance=184.75 ``` There are two important details to note about the collected metrics: diff --git a/plugins/influxdbjson/influxdb_json.go b/plugins/influxdb/influxdb.go similarity index 63% rename from plugins/influxdbjson/influxdb_json.go rename to plugins/influxdb/influxdb.go index 61d966e47..a261018ae 100644 --- a/plugins/influxdbjson/influxdb_json.go +++ b/plugins/influxdb/influxdb.go @@ -1,8 +1,9 @@ -package influxdbjson +package influxdb import ( "encoding/json" "errors" + "fmt" "net/http" "strings" "sync" @@ -10,59 +11,41 @@ import ( "github.com/influxdb/telegraf/plugins" ) -type InfluxDBJSON struct { +type InfluxDB struct { Name string URLs []string `toml:"urls"` } -func (*InfluxDBJSON) Description() string { +func (*InfluxDB) Description() string { return "Read InfluxDB-formatted JSON metrics from one or more HTTP endpoints" } -func (*InfluxDBJSON) SampleConfig() string { +func (*InfluxDB) SampleConfig() string { return ` - # Reads InfluxDB-formatted JSON from given URLs. For example, - # monitoring a URL which responded with a JSON object formatted like this: - # - # { - # "(ignored_key)": { - # "name": "connections", - # "tags": { - # "host": "foo" - # }, - # "values": { - # "avg_ms": 1.234, - # } - # } - # } - # - # with configuration of { name = "server", urls = ["http://127.0.0.1:8086/x"] } - # - # Would result in this recorded metric: - # - # influxdbjson_server_connections,url='http://127.0.0.1:8086/x',host='foo' avg_ms=1.234 - [[plugins.influxdbjson]] - # Name to use for measurement - name = "influxdb" + # Reads InfluxDB-formatted JSON from given URLs. + # Works with InfluxDB debug endpoints out of the box, but other services can use this format too. + # See the influxdb plugin's README for more details. + [[plugins.influxdb]] + # Name to use for measurement + name = "influxdb" - # Multiple URLs from which to read InfluxDB-formatted JSON - urls = [ - "http://localhost:8086/debug/vars" - ] + # Multiple URLs from which to read InfluxDB-formatted JSON + urls = [ + "http://localhost:8086/debug/vars" + ] ` } -func (i *InfluxDBJSON) Gather(acc plugins.Accumulator) error { - var wg sync.WaitGroup - +func (i *InfluxDB) Gather(acc plugins.Accumulator) error { errorChannel := make(chan error, len(i.URLs)) + var wg sync.WaitGroup for _, u := range i.URLs { wg.Add(1) go func(url string) { defer wg.Done() if err := i.gatherURL(acc, url); err != nil { - errorChannel <- err + errorChannel <- fmt.Errorf("[name=%s][url=%s]: %s", i.Name, url, err) } }(u) } @@ -70,15 +53,17 @@ func (i *InfluxDBJSON) Gather(acc plugins.Accumulator) error { wg.Wait() close(errorChannel) - // Get all errors and return them as one giant error - errorStrings := []string{} + // If there weren't any errors, we can return nil now. + if len(errorChannel) == 0 { + return nil + } + + // There were errors, so join them all together as one big error. + errorStrings := make([]string, 0, len(errorChannel)) for err := range errorChannel { errorStrings = append(errorStrings, err.Error()) } - if len(errorStrings) == 0 { - return nil - } return errors.New(strings.Join(errorStrings, "\n")) } @@ -95,7 +80,7 @@ type point struct { // // Returns: // error: Any error that may have occurred -func (i *InfluxDBJSON) gatherURL( +func (i *InfluxDB) gatherURL( acc plugins.Accumulator, url string, ) error { @@ -105,7 +90,11 @@ func (i *InfluxDBJSON) gatherURL( } defer resp.Body.Close() - // Can't predict what all is going to be in the response, so decode the top keys one at a time. + // It would be nice to be able to decode into a map[string]point, but + // we'll get a decoder error like: + // `json: cannot unmarshal array into Go value of type influxdb.point` + // if any of the values aren't objects. + // To avoid that error, we decode by hand. dec := json.NewDecoder(resp.Body) // Parse beginning of object @@ -155,7 +144,7 @@ func (i *InfluxDBJSON) gatherURL( } func init() { - plugins.Add("influxdbjson", func() plugins.Plugin { - return &InfluxDBJSON{} + plugins.Add("influxdb", func() plugins.Plugin { + return &InfluxDB{} }) } diff --git a/plugins/influxdbjson/influxdb_json_test.go b/plugins/influxdb/influxdb_test.go similarity index 93% rename from plugins/influxdbjson/influxdb_json_test.go rename to plugins/influxdb/influxdb_test.go index 1f3663e20..f5035bc7e 100644 --- a/plugins/influxdbjson/influxdb_json_test.go +++ b/plugins/influxdb/influxdb_test.go @@ -1,11 +1,11 @@ -package influxdbjson_test +package influxdb_test import ( "net/http" "net/http/httptest" "testing" - "github.com/influxdb/telegraf/plugins/influxdbjson" + "github.com/influxdb/telegraf/plugins/influxdb" "github.com/influxdb/telegraf/testutil" "github.com/stretchr/testify/require" ) @@ -64,7 +64,7 @@ func TestBasic(t *testing.T) { })) defer fakeServer.Close() - plugin := &influxdbjson.InfluxDBJSON{ + plugin := &influxdb.InfluxDB{ Name: "test", URLs: []string{fakeServer.URL + "/endpoint"}, }