Add expvar plugin

This was primarily intended to consume InfluxDB-style expvars,
particularly InfluxDB's `/debug/vars` endpoint.

That endpoint follows a structure like

```json
{
  "httpd::8086": {
    "name": "httpd",
    "tags": {
      "bind": ":8086"
    },
    "values": {
      "pointsWrittenOK": 33756,
      "queryReq": 19,
      "queryRespBytes": 26973,
      "req": 428,
      "writeReq": 205,
      "writeReqBytes": 3939161
    }
  }
}
```

There are an arbitrary number of top-level keys in the JSON response at
the configured URLs, and this plugin will iterate through all of their
values looking for objects with keys "name", "tags", and "values"
indicating a metric to be consumed by telegraf.

Running this on current master of InfluxDB, I am able to record nearly
the same information that is normally stored in the `_internal`
database; the only measurement missing from `_internal` is `runtime`,
which is present in expvars under the "memstats" key but does not follow
the expvar format and so is not consumed in this plugin.

```
$ influx -database=telegraf -execute 'SHOW FIELD KEYS FROM /expvar/'

name: expvar_influxdb_engine
----------------------------
fieldKey
blksWrite
blksWriteBytes
blksWriteBytesC
pointsWrite
pointsWriteDedupe

name: expvar_influxdb_httpd
---------------------------
fieldKey
pingReq
pointsWrittenOK
queryReq
queryRespBytes
req
writeReq
writeReqBytes

name: expvar_influxdb_shard
---------------------------
fieldKey
fieldsCreate
seriesCreate
writePointsOk
writeReq

name: expvar_influxdb_subscriber
--------------------------------
fieldKey
pointsWritten

name: expvar_influxdb_wal
-------------------------
fieldKey
autoFlush
flushDuration
idleFlush
memSize
metaFlush
pointsFlush
pointsWrite
pointsWriteReq
seriesFlush

name: expvar_influxdb_write
---------------------------
fieldKey
pointReq
pointReqLocal
req
subWriteOk
writeOk
```
This commit is contained in:
Mark Rushakoff 2015-12-05 14:15:58 -08:00
parent d62e63c448
commit bc9a71e111
4 changed files with 333 additions and 0 deletions

View File

@ -7,6 +7,7 @@ import (
_ "github.com/influxdb/telegraf/plugins/disque"
_ "github.com/influxdb/telegraf/plugins/elasticsearch"
_ "github.com/influxdb/telegraf/plugins/exec"
_ "github.com/influxdb/telegraf/plugins/expvar"
_ "github.com/influxdb/telegraf/plugins/haproxy"
_ "github.com/influxdb/telegraf/plugins/httpjson"
_ "github.com/influxdb/telegraf/plugins/jolokia"

74
plugins/expvar/README.md Normal file
View File

@ -0,0 +1,74 @@
# expvar plugin
The expvar plugin collects InfluxDB-style expvar data from JSON endpoints.
With a configuration of:
```toml
[plugins.expvar]
[[plugins.expvar.services]]
name = "produce"
urls = [
"http://127.0.0.1:8086/debug/vars",
"http://192.168.2.1:8086/debug/vars"
]
```
And if 127.0.0.1 responds with this JSON:
```json
{
"k1": {
"name": "fruit",
"tags": {
"kind": "apple"
},
"values": {
"inventory": 371,
"sold": 112
}
},
"k2": {
"name": "fruit",
"tags": {
"kind": "banana"
},
"values": {
"inventory": 1000,
"sold": 403
}
}
}
```
And if 192.168.2.1 responds like so:
```json
{
"k3": {
"name": "transactions",
"tags": {},
"values": {
"total": 100,
"balance": 184.75
}
}
}
```
Then the collected metrics will be:
```
expvar_produce_fruit,expvar_url='http://127.0.0.1:8086/debug/vars',kind='apple' inventory=371.0,sold=112.0
expvar_produce_fruit,expvar_url='http://127.0.0.1:8086/debug/vars',kind='banana' inventory=1000.0,sold=403.0
expvar_produce_transactions,expvar_url='http://192.168.2.1:8086/debug/vars' total=100.0,balance=184.75
```
There are two important details to note about the collected metrics:
1. Even though the values in JSON are being displayed as integers, the metrics are reported as floats.
JSON encoders usually don't print the fractional part for round floats.
Because you cannot change the type of an existing field in InfluxDB, we assume all numbers are floats.
2. The top-level keys' names (in the example above, `"k1"`, `"k2"`, and `"k3"`) are not considered when recording the metrics.

154
plugins/expvar/expvar.go Normal file
View File

@ -0,0 +1,154 @@
package expvar
import (
"encoding/json"
"errors"
"net/http"
"strings"
"sync"
"github.com/influxdb/telegraf/plugins"
)
var sampleConfig = `
# Specify services via an array of tables
[[plugins.expvar.services]]
# Name for the service being polled
name = "influxdb"
# Multiple URLs from which to read expvars
urls = [
"http://localhost:8086/debug/vars"
]
`
type Expvar struct {
Services []Service
}
type Service struct {
Name string
URLs []string `toml:"urls"`
}
func (*Expvar) Description() string {
return "Read InfluxDB-style expvar metrics from one or more HTTP endpoints"
}
func (*Expvar) SampleConfig() string {
return sampleConfig
}
func (e *Expvar) Gather(acc plugins.Accumulator) error {
var wg sync.WaitGroup
totalURLs := 0
for _, service := range e.Services {
totalURLs += len(service.URLs)
}
errorChannel := make(chan error, totalURLs)
for _, service := range e.Services {
for _, u := range service.URLs {
wg.Add(1)
go func(service Service, url string) {
defer wg.Done()
if err := e.gatherURL(acc, service, url); err != nil {
errorChannel <- err
}
}(service, u)
}
}
wg.Wait()
close(errorChannel)
// Get all errors and return them as one giant error
errorStrings := []string{}
for err := range errorChannel {
errorStrings = append(errorStrings, err.Error())
}
if len(errorStrings) == 0 {
return nil
}
return errors.New(strings.Join(errorStrings, "\n"))
}
type point struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Values map[string]interface{} `json:"values"`
}
// Gathers data from a particular URL
// Parameters:
// acc : The telegraf Accumulator to use
// service: the service being queried
// url : endpoint to send request to
//
// Returns:
// error: Any error that may have occurred
func (e *Expvar) gatherURL(
acc plugins.Accumulator,
service Service,
url string,
) error {
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
// Can't predict what all is going to be in the response, so decode the top keys one at a time.
dec := json.NewDecoder(resp.Body)
// Parse beginning of object
if t, err := dec.Token(); err != nil {
return err
} else if t != json.Delim('{') {
return errors.New("expvars must be a JSON object")
}
// Loop through rest of object
for {
// Nothing left in this object, we're done
if !dec.More() {
break
}
// Read in a string key. We actually don't care about the top-level keys
_, err := dec.Token()
if err != nil {
return err
}
// Attempt to parse a whole object into a point.
// It might be a non-object, like a string or array.
// If we fail to decode it into a point, ignore it and move on.
var p point
if err := dec.Decode(&p); err != nil {
continue
}
if p.Name == "" || p.Tags == nil || p.Values == nil || len(p.Values) == 0 {
continue
}
p.Tags["expvar_url"] = url
acc.AddFields(
service.Name+"_"+p.Name,
p.Values,
p.Tags,
)
}
return nil
}
func init() {
plugins.Add("expvar", func() plugins.Plugin {
return &Expvar{}
})
}

View File

@ -0,0 +1,104 @@
package expvar
import (
"net/http"
"net/http/httptest"
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestBasic(t *testing.T) {
js := `
{
"_1": {
"name": "foo",
"tags": {
"id": "ex1"
},
"values": {
"i": -1,
"f": 0.5,
"b": true,
"s": "string"
}
},
"ignored": {
"willBeRecorded": false
},
"ignoredAndNested": {
"hash": {
"is": "nested"
}
},
"array": [
"makes parsing more difficult than necessary"
],
"string": "makes parsing more difficult than necessary",
"_2": {
"name": "bar",
"tags": {
"id": "ex2"
},
"values": {
"x": "x"
}
},
"pointWithoutFields_willNotBeIncluded": {
"name": "asdf",
"tags": {
"id": "ex3"
},
"values": {}
}
}
`
fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/expvar" {
_, _ = w.Write([]byte(js))
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
defer fakeServer.Close()
expvar := &Expvar{
Services: []Service{
{
Name: "test",
URLs: []string{fakeServer.URL + "/expvar"},
},
},
}
var acc testutil.Accumulator
require.NoError(t, expvar.Gather(&acc))
require.Len(t, acc.Points, 2)
require.NoError(t, acc.ValidateTaggedFieldsValue(
"test_foo",
map[string]interface{}{
// JSON will truncate floats to integer representations.
// Since there's no distinction in JSON, we can't assume it's an int.
"i": -1.0,
"f": 0.5,
"b": true,
"s": "string",
},
map[string]string{
"id": "ex1",
"expvar_url": fakeServer.URL + "/expvar",
},
))
require.NoError(t, acc.ValidateTaggedFieldsValue(
"test_bar",
map[string]interface{}{
"x": "x",
},
map[string]string{
"id": "ex2",
"expvar_url": fakeServer.URL + "/expvar",
},
))
}