From 2811ca61eaa8ee3852873fac1a0481121240a65e Mon Sep 17 00:00:00 2001 From: Mark Rushakoff Date: Sat, 5 Dec 2015 14:15:58 -0800 Subject: [PATCH] Add influxdb plugin This was primarily intended to consume InfluxDB-style expvars, particularly InfluxDB's `/debug/vars` endpoint. That endpoint follows a structure like ```json { "httpd::8086": { "name": "httpd", "tags": { "bind": ":8086" }, "values": { "pointsWrittenOK": 33756, "queryReq": 19, "queryRespBytes": 26973, "req": 428, "writeReq": 205, "writeReqBytes": 3939161 } } } ``` There are an arbitrary number of top-level keys in the JSON response at the configured URLs, and this plugin will iterate through all of their values looking for objects with keys "name", "tags", and "values" indicating a metric to be consumed by telegraf. Running this on current master of InfluxDB, I am able to record nearly the same information that is normally stored in the `_internal` database; the only measurement missing from `_internal` is `runtime`, which is present under the "memstats" key but does not follow the format and so is not consumed in this plugin. ``` $ influx -database=telegraf -execute 'SHOW FIELD KEYS FROM /influxdb/' name: influxdb_influxdb_engine ---------------------------- fieldKey blksWrite blksWriteBytes blksWriteBytesC pointsWrite pointsWriteDedupe name: influxdb_influxdb_httpd --------------------------- fieldKey pingReq pointsWrittenOK queryReq queryRespBytes req writeReq writeReqBytes name: influxdb_influxdb_shard --------------------------- fieldKey fieldsCreate seriesCreate writePointsOk writeReq name: influxdb_influxdb_subscriber -------------------------------- fieldKey pointsWritten name: influxdb_influxdb_wal ------------------------- fieldKey autoFlush flushDuration idleFlush memSize metaFlush pointsFlush pointsWrite pointsWriteReq seriesFlush name: influxdb_influxdb_write --------------------------- fieldKey pointReq pointReqLocal req subWriteOk writeOk ``` --- plugins/all/all.go | 1 + plugins/influxdb/README.md | 73 +++++++++++++++ plugins/influxdb/influxdb.go | 150 ++++++++++++++++++++++++++++++ plugins/influxdb/influxdb_test.go | 101 ++++++++++++++++++++ 4 files changed, 325 insertions(+) create mode 100644 plugins/influxdb/README.md create mode 100644 plugins/influxdb/influxdb.go create mode 100644 plugins/influxdb/influxdb_test.go diff --git a/plugins/all/all.go b/plugins/all/all.go index 676d25707..8b4e754be 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -9,6 +9,7 @@ import ( _ "github.com/influxdb/telegraf/plugins/exec" _ "github.com/influxdb/telegraf/plugins/haproxy" _ "github.com/influxdb/telegraf/plugins/httpjson" + _ "github.com/influxdb/telegraf/plugins/influxdb" _ "github.com/influxdb/telegraf/plugins/jolokia" _ "github.com/influxdb/telegraf/plugins/kafka_consumer" _ "github.com/influxdb/telegraf/plugins/leofs" diff --git a/plugins/influxdb/README.md b/plugins/influxdb/README.md new file mode 100644 index 000000000..c55aa1fc8 --- /dev/null +++ b/plugins/influxdb/README.md @@ -0,0 +1,73 @@ +# influxdb plugin + +The influxdb plugin collects InfluxDB-formatted data from JSON endpoints. + +With a configuration of: + +```toml +[[plugins.influxdb]] + name = "produce" + urls = [ + "http://127.0.0.1:8086/debug/vars", + "http://192.168.2.1:8086/debug/vars" + ] +``` + +And if 127.0.0.1 responds with this JSON: + +```json +{ + "k1": { + "name": "fruit", + "tags": { + "kind": "apple" + }, + "values": { + "inventory": 371, + "sold": 112 + } + }, + "k2": { + "name": "fruit", + "tags": { + "kind": "banana" + }, + "values": { + "inventory": 1000, + "sold": 403 + } + } +} +``` + +And if 192.168.2.1 responds like so: + +```json +{ + "k3": { + "name": "transactions", + "tags": {}, + "values": { + "total": 100, + "balance": 184.75 + } + } +} +``` + +Then the collected metrics will be: + +``` +influxdb_produce_fruit,url='http://127.0.0.1:8086/debug/vars',kind='apple' inventory=371.0,sold=112.0 +influxdb_produce_fruit,url='http://127.0.0.1:8086/debug/vars',kind='banana' inventory=1000.0,sold=403.0 + +influxdb_produce_transactions,url='http://192.168.2.1:8086/debug/vars' total=100.0,balance=184.75 +``` + +There are two important details to note about the collected metrics: + +1. Even though the values in JSON are being displayed as integers, the metrics are reported as floats. +JSON encoders usually don't print the fractional part for round floats. +Because you cannot change the type of an existing field in InfluxDB, we assume all numbers are floats. + +2. The top-level keys' names (in the example above, `"k1"`, `"k2"`, and `"k3"`) are not considered when recording the metrics. diff --git a/plugins/influxdb/influxdb.go b/plugins/influxdb/influxdb.go new file mode 100644 index 000000000..a261018ae --- /dev/null +++ b/plugins/influxdb/influxdb.go @@ -0,0 +1,150 @@ +package influxdb + +import ( + "encoding/json" + "errors" + "fmt" + "net/http" + "strings" + "sync" + + "github.com/influxdb/telegraf/plugins" +) + +type InfluxDB struct { + Name string + URLs []string `toml:"urls"` +} + +func (*InfluxDB) Description() string { + return "Read InfluxDB-formatted JSON metrics from one or more HTTP endpoints" +} + +func (*InfluxDB) SampleConfig() string { + return ` + # Reads InfluxDB-formatted JSON from given URLs. + # Works with InfluxDB debug endpoints out of the box, but other services can use this format too. + # See the influxdb plugin's README for more details. + [[plugins.influxdb]] + # Name to use for measurement + name = "influxdb" + + # Multiple URLs from which to read InfluxDB-formatted JSON + urls = [ + "http://localhost:8086/debug/vars" + ] +` +} + +func (i *InfluxDB) Gather(acc plugins.Accumulator) error { + errorChannel := make(chan error, len(i.URLs)) + + var wg sync.WaitGroup + for _, u := range i.URLs { + wg.Add(1) + go func(url string) { + defer wg.Done() + if err := i.gatherURL(acc, url); err != nil { + errorChannel <- fmt.Errorf("[name=%s][url=%s]: %s", i.Name, url, err) + } + }(u) + } + + wg.Wait() + close(errorChannel) + + // If there weren't any errors, we can return nil now. + if len(errorChannel) == 0 { + return nil + } + + // There were errors, so join them all together as one big error. + errorStrings := make([]string, 0, len(errorChannel)) + for err := range errorChannel { + errorStrings = append(errorStrings, err.Error()) + } + + return errors.New(strings.Join(errorStrings, "\n")) +} + +type point struct { + Name string `json:"name"` + Tags map[string]string `json:"tags"` + Values map[string]interface{} `json:"values"` +} + +// Gathers data from a particular URL +// Parameters: +// acc : The telegraf Accumulator to use +// url : endpoint to send request to +// +// Returns: +// error: Any error that may have occurred +func (i *InfluxDB) gatherURL( + acc plugins.Accumulator, + url string, +) error { + resp, err := http.Get(url) + if err != nil { + return err + } + defer resp.Body.Close() + + // It would be nice to be able to decode into a map[string]point, but + // we'll get a decoder error like: + // `json: cannot unmarshal array into Go value of type influxdb.point` + // if any of the values aren't objects. + // To avoid that error, we decode by hand. + dec := json.NewDecoder(resp.Body) + + // Parse beginning of object + if t, err := dec.Token(); err != nil { + return err + } else if t != json.Delim('{') { + return errors.New("document root must be a JSON object") + } + + // Loop through rest of object + for { + // Nothing left in this object, we're done + if !dec.More() { + break + } + + // Read in a string key. We don't do anything with the top-level keys, so it's discarded. + _, err := dec.Token() + if err != nil { + return err + } + + // Attempt to parse a whole object into a point. + // It might be a non-object, like a string or array. + // If we fail to decode it into a point, ignore it and move on. + var p point + if err := dec.Decode(&p); err != nil { + continue + } + + // If the object was a point, but was not fully initialized, ignore it and move on. + if p.Name == "" || p.Tags == nil || p.Values == nil || len(p.Values) == 0 { + continue + } + + // Add a tag to indicate the source of the data. + p.Tags["url"] = url + + acc.AddFields( + i.Name+"_"+p.Name, + p.Values, + p.Tags, + ) + } + + return nil +} + +func init() { + plugins.Add("influxdb", func() plugins.Plugin { + return &InfluxDB{} + }) +} diff --git a/plugins/influxdb/influxdb_test.go b/plugins/influxdb/influxdb_test.go new file mode 100644 index 000000000..f5035bc7e --- /dev/null +++ b/plugins/influxdb/influxdb_test.go @@ -0,0 +1,101 @@ +package influxdb_test + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/influxdb/telegraf/plugins/influxdb" + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestBasic(t *testing.T) { + js := ` +{ + "_1": { + "name": "foo", + "tags": { + "id": "ex1" + }, + "values": { + "i": -1, + "f": 0.5, + "b": true, + "s": "string" + } + }, + "ignored": { + "willBeRecorded": false + }, + "ignoredAndNested": { + "hash": { + "is": "nested" + } + }, + "array": [ + "makes parsing more difficult than necessary" + ], + "string": "makes parsing more difficult than necessary", + "_2": { + "name": "bar", + "tags": { + "id": "ex2" + }, + "values": { + "x": "x" + } + }, + "pointWithoutFields_willNotBeIncluded": { + "name": "asdf", + "tags": { + "id": "ex3" + }, + "values": {} + } +} +` + fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/endpoint" { + _, _ = w.Write([]byte(js)) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + defer fakeServer.Close() + + plugin := &influxdb.InfluxDB{ + Name: "test", + URLs: []string{fakeServer.URL + "/endpoint"}, + } + + var acc testutil.Accumulator + require.NoError(t, plugin.Gather(&acc)) + + require.Len(t, acc.Points, 2) + require.NoError(t, acc.ValidateTaggedFieldsValue( + "test_foo", + map[string]interface{}{ + // JSON will truncate floats to integer representations. + // Since there's no distinction in JSON, we can't assume it's an int. + "i": -1.0, + "f": 0.5, + "b": true, + "s": "string", + }, + map[string]string{ + "id": "ex1", + "url": fakeServer.URL + "/endpoint", + }, + )) + require.NoError(t, acc.ValidateTaggedFieldsValue( + "test_bar", + map[string]interface{}{ + "x": "x", + }, + map[string]string{ + "id": "ex2", + "url": fakeServer.URL + "/endpoint", + }, + )) +}