From 494626514b275301f1cb6e06e134719af089ae59 Mon Sep 17 00:00:00 2001 From: Mark Rushakoff Date: Sat, 5 Dec 2015 14:15:58 -0800 Subject: [PATCH] Add expvar plugin This was primarily intended to consume InfluxDB-style expvars, particularly InfluxDB's `/debug/vars` endpoint. That endpoint follows a structure like ```json { "httpd::8086": { "name": "httpd", "tags": { "bind": ":8086" }, "values": { "pointsWrittenOK": 33756, "queryReq": 19, "queryRespBytes": 26973, "req": 428, "writeReq": 205, "writeReqBytes": 3939161 } } } ``` There are an arbitrary number of top-level keys in the JSON response at the configured URLs, and this plugin will iterate through all of their values looking for objects with keys "name", "tags", and "values" indicating a metric to be consumed by telegraf. Running this on current master of InfluxDB, I am able to record nearly the same information that is normally stored in the `_internal` database; the only measurement missing from `_internal` is `runtime`, which is present in expvars under the "memstats" key but does not follow the expvar format and so is not consumed in this plugin. ``` $ influx -database=telegraf -execute 'SHOW FIELD KEYS FROM /expvar/' name: expvar_influxdb_engine ---------------------------- fieldKey blksWrite blksWriteBytes blksWriteBytesC pointsWrite pointsWriteDedupe name: expvar_influxdb_httpd --------------------------- fieldKey pingReq pointsWrittenOK queryReq queryRespBytes req writeReq writeReqBytes name: expvar_influxdb_shard --------------------------- fieldKey fieldsCreate seriesCreate writePointsOk writeReq name: expvar_influxdb_subscriber -------------------------------- fieldKey pointsWritten name: expvar_influxdb_wal ------------------------- fieldKey autoFlush flushDuration idleFlush memSize metaFlush pointsFlush pointsWrite pointsWriteReq seriesFlush name: expvar_influxdb_write --------------------------- fieldKey pointReq pointReqLocal req subWriteOk writeOk ``` --- plugins/all/all.go | 1 + plugins/expvar/README.md | 74 ++++++++++++++++ plugins/expvar/expvar.go | 154 ++++++++++++++++++++++++++++++++++ plugins/expvar/expvar_test.go | 104 +++++++++++++++++++++++ 4 files changed, 333 insertions(+) create mode 100644 plugins/expvar/README.md create mode 100644 plugins/expvar/expvar.go create mode 100644 plugins/expvar/expvar_test.go diff --git a/plugins/all/all.go b/plugins/all/all.go index 676d25707..e6303e066 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -7,6 +7,7 @@ import ( _ "github.com/influxdb/telegraf/plugins/disque" _ "github.com/influxdb/telegraf/plugins/elasticsearch" _ "github.com/influxdb/telegraf/plugins/exec" + _ "github.com/influxdb/telegraf/plugins/expvar" _ "github.com/influxdb/telegraf/plugins/haproxy" _ "github.com/influxdb/telegraf/plugins/httpjson" _ "github.com/influxdb/telegraf/plugins/jolokia" diff --git a/plugins/expvar/README.md b/plugins/expvar/README.md new file mode 100644 index 000000000..724a46b33 --- /dev/null +++ b/plugins/expvar/README.md @@ -0,0 +1,74 @@ +# expvar plugin + +The expvar plugin collects InfluxDB-style expvar data from JSON endpoints. + +With a configuration of: + +```toml +[plugins.expvar] + [[plugins.expvar.services]] + name = "produce" + urls = [ + "http://127.0.0.1:8086/debug/vars", + "http://192.168.2.1:8086/debug/vars" + ] +``` + +And if 127.0.0.1 responds with this JSON: + +```json +{ + "k1": { + "name": "fruit", + "tags": { + "kind": "apple" + }, + "values": { + "inventory": 371, + "sold": 112 + } + }, + "k2": { + "name": "fruit", + "tags": { + "kind": "banana" + }, + "values": { + "inventory": 1000, + "sold": 403 + } + } +} +``` + +And if 192.168.2.1 responds like so: + +```json +{ + "k3": { + "name": "transactions", + "tags": {}, + "values": { + "total": 100, + "balance": 184.75 + } + } +} +``` + +Then the collected metrics will be: + +``` +expvar_produce_fruit,expvar_url='http://127.0.0.1:8086/debug/vars',kind='apple' inventory=371.0,sold=112.0 +expvar_produce_fruit,expvar_url='http://127.0.0.1:8086/debug/vars',kind='banana' inventory=1000.0,sold=403.0 + +expvar_produce_transactions,expvar_url='http://192.168.2.1:8086/debug/vars' total=100.0,balance=184.75 +``` + +There are two important details to note about the collected metrics: + +1. Even though the values in JSON are being displayed as integers, the metrics are reported as floats. +JSON encoders usually don't print the fractional part for round floats. +Because you cannot change the type of an existing field in InfluxDB, we assume all numbers are floats. + +2. The top-level keys' names (in the example above, `"k1"`, `"k2"`, and `"k3"`) are not considered when recording the metrics. diff --git a/plugins/expvar/expvar.go b/plugins/expvar/expvar.go new file mode 100644 index 000000000..cacd68e43 --- /dev/null +++ b/plugins/expvar/expvar.go @@ -0,0 +1,154 @@ +package expvar + +import ( + "encoding/json" + "errors" + "net/http" + "strings" + "sync" + + "github.com/influxdb/telegraf/plugins" +) + +var sampleConfig = ` + # Specify services via an array of tables + [[plugins.expvar.services]] + # Name for the service being polled + name = "influxdb" + + # Multiple URLs from which to read expvars + urls = [ + "http://localhost:8086/debug/vars" + ] +` + +type Expvar struct { + Services []Service +} + +type Service struct { + Name string + URLs []string `toml:"urls"` +} + +func (*Expvar) Description() string { + return "Read InfluxDB-style expvar metrics from one or more HTTP endpoints" +} + +func (*Expvar) SampleConfig() string { + return sampleConfig +} + +func (e *Expvar) Gather(acc plugins.Accumulator) error { + var wg sync.WaitGroup + + totalURLs := 0 + for _, service := range e.Services { + totalURLs += len(service.URLs) + } + errorChannel := make(chan error, totalURLs) + + for _, service := range e.Services { + for _, u := range service.URLs { + wg.Add(1) + go func(service Service, url string) { + defer wg.Done() + if err := e.gatherURL(acc, service, url); err != nil { + errorChannel <- err + } + }(service, u) + } + } + + wg.Wait() + close(errorChannel) + + // Get all errors and return them as one giant error + errorStrings := []string{} + for err := range errorChannel { + errorStrings = append(errorStrings, err.Error()) + } + + if len(errorStrings) == 0 { + return nil + } + return errors.New(strings.Join(errorStrings, "\n")) +} + +type point struct { + Name string `json:"name"` + Tags map[string]string `json:"tags"` + Values map[string]interface{} `json:"values"` +} + +// Gathers data from a particular URL +// Parameters: +// acc : The telegraf Accumulator to use +// service: the service being queried +// url : endpoint to send request to +// +// Returns: +// error: Any error that may have occurred +func (e *Expvar) gatherURL( + acc plugins.Accumulator, + service Service, + url string, +) error { + resp, err := http.Get(url) + if err != nil { + return err + } + defer resp.Body.Close() + + // Can't predict what all is going to be in the response, so decode the top keys one at a time. + dec := json.NewDecoder(resp.Body) + + // Parse beginning of object + if t, err := dec.Token(); err != nil { + return err + } else if t != json.Delim('{') { + return errors.New("expvars must be a JSON object") + } + + // Loop through rest of object + for { + // Nothing left in this object, we're done + if !dec.More() { + break + } + + // Read in a string key. We actually don't care about the top-level keys + _, err := dec.Token() + if err != nil { + return err + } + + // Attempt to parse a whole object into a point. + // It might be a non-object, like a string or array. + // If we fail to decode it into a point, ignore it and move on. + var p point + if err := dec.Decode(&p); err != nil { + continue + } + + if p.Name == "" || p.Tags == nil || p.Values == nil || len(p.Values) == 0 { + continue + } + + p.Tags["expvar_url"] = url + + acc.AddFields( + service.Name+"_"+p.Name, + p.Values, + p.Tags, + ) + } + + return nil +} + +func init() { + plugins.Add("expvar", func() plugins.Plugin { + return &Expvar{} + }) +} diff --git a/plugins/expvar/expvar_test.go b/plugins/expvar/expvar_test.go new file mode 100644 index 000000000..a60147e98 --- /dev/null +++ b/plugins/expvar/expvar_test.go @@ -0,0 +1,104 @@ +package expvar + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestBasic(t *testing.T) { + js := ` +{ + "_1": { + "name": "foo", + "tags": { + "id": "ex1" + }, + "values": { + "i": -1, + "f": 0.5, + "b": true, + "s": "string" + } + }, + "ignored": { + "willBeRecorded": false + }, + "ignoredAndNested": { + "hash": { + "is": "nested" + } + }, + "array": [ + "makes parsing more difficult than necessary" + ], + "string": "makes parsing more difficult than necessary", + "_2": { + "name": "bar", + "tags": { + "id": "ex2" + }, + "values": { + "x": "x" + } + }, + "pointWithoutFields_willNotBeIncluded": { + "name": "asdf", + "tags": { + "id": "ex3" + }, + "values": {} + } +} +` + fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/expvar" { + _, _ = w.Write([]byte(js)) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + defer fakeServer.Close() + + expvar := &Expvar{ + Services: []Service{ + { + Name: "test", + URLs: []string{fakeServer.URL + "/expvar"}, + }, + }, + } + + var acc testutil.Accumulator + require.NoError(t, expvar.Gather(&acc)) + + require.Len(t, acc.Points, 2) + require.NoError(t, acc.ValidateTaggedFieldsValue( + "test_foo", + map[string]interface{}{ + // JSON will truncate floats to integer representations. + // Since there's no distinction in JSON, we can't assume it's an int. + "i": -1.0, + "f": 0.5, + "b": true, + "s": "string", + }, + map[string]string{ + "id": "ex1", + "expvar_url": fakeServer.URL + "/expvar", + }, + )) + require.NoError(t, acc.ValidateTaggedFieldsValue( + "test_bar", + map[string]interface{}{ + "x": "x", + }, + map[string]string{ + "id": "ex2", + "expvar_url": fakeServer.URL + "/expvar", + }, + )) +}