telegraf/plugins/inputs/influxdb/influxdb.go

147 lines
3.3 KiB
Go
Raw Permalink Normal View History

Add influxdb plugin This was primarily intended to consume InfluxDB-style expvars, particularly InfluxDB's `/debug/vars` endpoint. That endpoint follows a structure like ```json { "httpd::8086": { "name": "httpd", "tags": { "bind": ":8086" }, "values": { "pointsWrittenOK": 33756, "queryReq": 19, "queryRespBytes": 26973, "req": 428, "writeReq": 205, "writeReqBytes": 3939161 } } } ``` There are an arbitrary number of top-level keys in the JSON response at the configured URLs, and this plugin will iterate through all of their values looking for objects with keys "name", "tags", and "values" indicating a metric to be consumed by telegraf. Running this on current master of InfluxDB, I am able to record nearly the same information that is normally stored in the `_internal` database; the only measurement missing from `_internal` is `runtime`, which is present under the "memstats" key but does not follow the format and so is not consumed in this plugin. ``` $ influx -database=telegraf -execute 'SHOW FIELD KEYS FROM /influxdb/' name: influxdb_influxdb_engine ---------------------------- fieldKey blksWrite blksWriteBytes blksWriteBytesC pointsWrite pointsWriteDedupe name: influxdb_influxdb_httpd --------------------------- fieldKey pingReq pointsWrittenOK queryReq queryRespBytes req writeReq writeReqBytes name: influxdb_influxdb_shard --------------------------- fieldKey fieldsCreate seriesCreate writePointsOk writeReq name: influxdb_influxdb_subscriber -------------------------------- fieldKey pointsWritten name: influxdb_influxdb_wal ------------------------- fieldKey autoFlush flushDuration idleFlush memSize metaFlush pointsFlush pointsWrite pointsWriteReq seriesFlush name: influxdb_influxdb_write --------------------------- fieldKey pointReq pointReqLocal req subWriteOk writeOk ```
2015-12-05 22:15:58 +00:00
package influxdb
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"sync"
2016-01-20 18:57:35 +00:00
"github.com/influxdata/telegraf/plugins/inputs"
Add influxdb plugin This was primarily intended to consume InfluxDB-style expvars, particularly InfluxDB's `/debug/vars` endpoint. That endpoint follows a structure like ```json { "httpd::8086": { "name": "httpd", "tags": { "bind": ":8086" }, "values": { "pointsWrittenOK": 33756, "queryReq": 19, "queryRespBytes": 26973, "req": 428, "writeReq": 205, "writeReqBytes": 3939161 } } } ``` There are an arbitrary number of top-level keys in the JSON response at the configured URLs, and this plugin will iterate through all of their values looking for objects with keys "name", "tags", and "values" indicating a metric to be consumed by telegraf. Running this on current master of InfluxDB, I am able to record nearly the same information that is normally stored in the `_internal` database; the only measurement missing from `_internal` is `runtime`, which is present under the "memstats" key but does not follow the format and so is not consumed in this plugin. ``` $ influx -database=telegraf -execute 'SHOW FIELD KEYS FROM /influxdb/' name: influxdb_influxdb_engine ---------------------------- fieldKey blksWrite blksWriteBytes blksWriteBytesC pointsWrite pointsWriteDedupe name: influxdb_influxdb_httpd --------------------------- fieldKey pingReq pointsWrittenOK queryReq queryRespBytes req writeReq writeReqBytes name: influxdb_influxdb_shard --------------------------- fieldKey fieldsCreate seriesCreate writePointsOk writeReq name: influxdb_influxdb_subscriber -------------------------------- fieldKey pointsWritten name: influxdb_influxdb_wal ------------------------- fieldKey autoFlush flushDuration idleFlush memSize metaFlush pointsFlush pointsWrite pointsWriteReq seriesFlush name: influxdb_influxdb_write --------------------------- fieldKey pointReq pointReqLocal req subWriteOk writeOk ```
2015-12-05 22:15:58 +00:00
)
type InfluxDB struct {
URLs []string `toml:"urls"`
}
func (*InfluxDB) Description() string {
return "Read InfluxDB-formatted JSON metrics from one or more HTTP endpoints"
}
func (*InfluxDB) SampleConfig() string {
return `
# Works with InfluxDB debug endpoints out of the box,
# but other services can use this format too.
Add influxdb plugin This was primarily intended to consume InfluxDB-style expvars, particularly InfluxDB's `/debug/vars` endpoint. That endpoint follows a structure like ```json { "httpd::8086": { "name": "httpd", "tags": { "bind": ":8086" }, "values": { "pointsWrittenOK": 33756, "queryReq": 19, "queryRespBytes": 26973, "req": 428, "writeReq": 205, "writeReqBytes": 3939161 } } } ``` There are an arbitrary number of top-level keys in the JSON response at the configured URLs, and this plugin will iterate through all of their values looking for objects with keys "name", "tags", and "values" indicating a metric to be consumed by telegraf. Running this on current master of InfluxDB, I am able to record nearly the same information that is normally stored in the `_internal` database; the only measurement missing from `_internal` is `runtime`, which is present under the "memstats" key but does not follow the format and so is not consumed in this plugin. ``` $ influx -database=telegraf -execute 'SHOW FIELD KEYS FROM /influxdb/' name: influxdb_influxdb_engine ---------------------------- fieldKey blksWrite blksWriteBytes blksWriteBytesC pointsWrite pointsWriteDedupe name: influxdb_influxdb_httpd --------------------------- fieldKey pingReq pointsWrittenOK queryReq queryRespBytes req writeReq writeReqBytes name: influxdb_influxdb_shard --------------------------- fieldKey fieldsCreate seriesCreate writePointsOk writeReq name: influxdb_influxdb_subscriber -------------------------------- fieldKey pointsWritten name: influxdb_influxdb_wal ------------------------- fieldKey autoFlush flushDuration idleFlush memSize metaFlush pointsFlush pointsWrite pointsWriteReq seriesFlush name: influxdb_influxdb_write --------------------------- fieldKey pointReq pointReqLocal req subWriteOk writeOk ```
2015-12-05 22:15:58 +00:00
# See the influxdb plugin's README for more details.
# Multiple URLs from which to read InfluxDB-formatted JSON
urls = [
"http://localhost:8086/debug/vars"
]
`
}
2016-01-07 20:39:43 +00:00
func (i *InfluxDB) Gather(acc inputs.Accumulator) error {
Add influxdb plugin This was primarily intended to consume InfluxDB-style expvars, particularly InfluxDB's `/debug/vars` endpoint. That endpoint follows a structure like ```json { "httpd::8086": { "name": "httpd", "tags": { "bind": ":8086" }, "values": { "pointsWrittenOK": 33756, "queryReq": 19, "queryRespBytes": 26973, "req": 428, "writeReq": 205, "writeReqBytes": 3939161 } } } ``` There are an arbitrary number of top-level keys in the JSON response at the configured URLs, and this plugin will iterate through all of their values looking for objects with keys "name", "tags", and "values" indicating a metric to be consumed by telegraf. Running this on current master of InfluxDB, I am able to record nearly the same information that is normally stored in the `_internal` database; the only measurement missing from `_internal` is `runtime`, which is present under the "memstats" key but does not follow the format and so is not consumed in this plugin. ``` $ influx -database=telegraf -execute 'SHOW FIELD KEYS FROM /influxdb/' name: influxdb_influxdb_engine ---------------------------- fieldKey blksWrite blksWriteBytes blksWriteBytesC pointsWrite pointsWriteDedupe name: influxdb_influxdb_httpd --------------------------- fieldKey pingReq pointsWrittenOK queryReq queryRespBytes req writeReq writeReqBytes name: influxdb_influxdb_shard --------------------------- fieldKey fieldsCreate seriesCreate writePointsOk writeReq name: influxdb_influxdb_subscriber -------------------------------- fieldKey pointsWritten name: influxdb_influxdb_wal ------------------------- fieldKey autoFlush flushDuration idleFlush memSize metaFlush pointsFlush pointsWrite pointsWriteReq seriesFlush name: influxdb_influxdb_write --------------------------- fieldKey pointReq pointReqLocal req subWriteOk writeOk ```
2015-12-05 22:15:58 +00:00
errorChannel := make(chan error, len(i.URLs))
var wg sync.WaitGroup
for _, u := range i.URLs {
wg.Add(1)
go func(url string) {
defer wg.Done()
if err := i.gatherURL(acc, url); err != nil {
errorChannel <- fmt.Errorf("[url=%s]: %s", url, err)
Add influxdb plugin This was primarily intended to consume InfluxDB-style expvars, particularly InfluxDB's `/debug/vars` endpoint. That endpoint follows a structure like ```json { "httpd::8086": { "name": "httpd", "tags": { "bind": ":8086" }, "values": { "pointsWrittenOK": 33756, "queryReq": 19, "queryRespBytes": 26973, "req": 428, "writeReq": 205, "writeReqBytes": 3939161 } } } ``` There are an arbitrary number of top-level keys in the JSON response at the configured URLs, and this plugin will iterate through all of their values looking for objects with keys "name", "tags", and "values" indicating a metric to be consumed by telegraf. Running this on current master of InfluxDB, I am able to record nearly the same information that is normally stored in the `_internal` database; the only measurement missing from `_internal` is `runtime`, which is present under the "memstats" key but does not follow the format and so is not consumed in this plugin. ``` $ influx -database=telegraf -execute 'SHOW FIELD KEYS FROM /influxdb/' name: influxdb_influxdb_engine ---------------------------- fieldKey blksWrite blksWriteBytes blksWriteBytesC pointsWrite pointsWriteDedupe name: influxdb_influxdb_httpd --------------------------- fieldKey pingReq pointsWrittenOK queryReq queryRespBytes req writeReq writeReqBytes name: influxdb_influxdb_shard --------------------------- fieldKey fieldsCreate seriesCreate writePointsOk writeReq name: influxdb_influxdb_subscriber -------------------------------- fieldKey pointsWritten name: influxdb_influxdb_wal ------------------------- fieldKey autoFlush flushDuration idleFlush memSize metaFlush pointsFlush pointsWrite pointsWriteReq seriesFlush name: influxdb_influxdb_write --------------------------- fieldKey pointReq pointReqLocal req subWriteOk writeOk ```
2015-12-05 22:15:58 +00:00
}
}(u)
}
wg.Wait()
close(errorChannel)
// If there weren't any errors, we can return nil now.
if len(errorChannel) == 0 {
return nil
}
// There were errors, so join them all together as one big error.
errorStrings := make([]string, 0, len(errorChannel))
for err := range errorChannel {
errorStrings = append(errorStrings, err.Error())
}
return errors.New(strings.Join(errorStrings, "\n"))
}
type point struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Values map[string]interface{} `json:"values"`
}
// Gathers data from a particular URL
// Parameters:
// acc : The telegraf Accumulator to use
// url : endpoint to send request to
//
// Returns:
// error: Any error that may have occurred
func (i *InfluxDB) gatherURL(
2016-01-07 20:39:43 +00:00
acc inputs.Accumulator,
Add influxdb plugin This was primarily intended to consume InfluxDB-style expvars, particularly InfluxDB's `/debug/vars` endpoint. That endpoint follows a structure like ```json { "httpd::8086": { "name": "httpd", "tags": { "bind": ":8086" }, "values": { "pointsWrittenOK": 33756, "queryReq": 19, "queryRespBytes": 26973, "req": 428, "writeReq": 205, "writeReqBytes": 3939161 } } } ``` There are an arbitrary number of top-level keys in the JSON response at the configured URLs, and this plugin will iterate through all of their values looking for objects with keys "name", "tags", and "values" indicating a metric to be consumed by telegraf. Running this on current master of InfluxDB, I am able to record nearly the same information that is normally stored in the `_internal` database; the only measurement missing from `_internal` is `runtime`, which is present under the "memstats" key but does not follow the format and so is not consumed in this plugin. ``` $ influx -database=telegraf -execute 'SHOW FIELD KEYS FROM /influxdb/' name: influxdb_influxdb_engine ---------------------------- fieldKey blksWrite blksWriteBytes blksWriteBytesC pointsWrite pointsWriteDedupe name: influxdb_influxdb_httpd --------------------------- fieldKey pingReq pointsWrittenOK queryReq queryRespBytes req writeReq writeReqBytes name: influxdb_influxdb_shard --------------------------- fieldKey fieldsCreate seriesCreate writePointsOk writeReq name: influxdb_influxdb_subscriber -------------------------------- fieldKey pointsWritten name: influxdb_influxdb_wal ------------------------- fieldKey autoFlush flushDuration idleFlush memSize metaFlush pointsFlush pointsWrite pointsWriteReq seriesFlush name: influxdb_influxdb_write --------------------------- fieldKey pointReq pointReqLocal req subWriteOk writeOk ```
2015-12-05 22:15:58 +00:00
url string,
) error {
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
// It would be nice to be able to decode into a map[string]point, but
// we'll get a decoder error like:
// `json: cannot unmarshal array into Go value of type influxdb.point`
// if any of the values aren't objects.
// To avoid that error, we decode by hand.
dec := json.NewDecoder(resp.Body)
// Parse beginning of object
if t, err := dec.Token(); err != nil {
return err
} else if t != json.Delim('{') {
return errors.New("document root must be a JSON object")
}
// Loop through rest of object
for {
// Nothing left in this object, we're done
if !dec.More() {
break
}
// Read in a string key. We don't do anything with the top-level keys, so it's discarded.
_, err := dec.Token()
if err != nil {
return err
}
// Attempt to parse a whole object into a point.
// It might be a non-object, like a string or array.
// If we fail to decode it into a point, ignore it and move on.
var p point
if err := dec.Decode(&p); err != nil {
continue
}
// If the object was a point, but was not fully initialized, ignore it and move on.
if p.Name == "" || p.Tags == nil || p.Values == nil || len(p.Values) == 0 {
continue
}
// Add a tag to indicate the source of the data.
p.Tags["url"] = url
acc.AddFields(
"influxdb_"+p.Name,
Add influxdb plugin This was primarily intended to consume InfluxDB-style expvars, particularly InfluxDB's `/debug/vars` endpoint. That endpoint follows a structure like ```json { "httpd::8086": { "name": "httpd", "tags": { "bind": ":8086" }, "values": { "pointsWrittenOK": 33756, "queryReq": 19, "queryRespBytes": 26973, "req": 428, "writeReq": 205, "writeReqBytes": 3939161 } } } ``` There are an arbitrary number of top-level keys in the JSON response at the configured URLs, and this plugin will iterate through all of their values looking for objects with keys "name", "tags", and "values" indicating a metric to be consumed by telegraf. Running this on current master of InfluxDB, I am able to record nearly the same information that is normally stored in the `_internal` database; the only measurement missing from `_internal` is `runtime`, which is present under the "memstats" key but does not follow the format and so is not consumed in this plugin. ``` $ influx -database=telegraf -execute 'SHOW FIELD KEYS FROM /influxdb/' name: influxdb_influxdb_engine ---------------------------- fieldKey blksWrite blksWriteBytes blksWriteBytesC pointsWrite pointsWriteDedupe name: influxdb_influxdb_httpd --------------------------- fieldKey pingReq pointsWrittenOK queryReq queryRespBytes req writeReq writeReqBytes name: influxdb_influxdb_shard --------------------------- fieldKey fieldsCreate seriesCreate writePointsOk writeReq name: influxdb_influxdb_subscriber -------------------------------- fieldKey pointsWritten name: influxdb_influxdb_wal ------------------------- fieldKey autoFlush flushDuration idleFlush memSize metaFlush pointsFlush pointsWrite pointsWriteReq seriesFlush name: influxdb_influxdb_write --------------------------- fieldKey pointReq pointReqLocal req subWriteOk writeOk ```
2015-12-05 22:15:58 +00:00
p.Values,
p.Tags,
)
}
return nil
}
func init() {
2016-01-07 20:39:43 +00:00
inputs.Add("influxdb", func() inputs.Input {
Add influxdb plugin This was primarily intended to consume InfluxDB-style expvars, particularly InfluxDB's `/debug/vars` endpoint. That endpoint follows a structure like ```json { "httpd::8086": { "name": "httpd", "tags": { "bind": ":8086" }, "values": { "pointsWrittenOK": 33756, "queryReq": 19, "queryRespBytes": 26973, "req": 428, "writeReq": 205, "writeReqBytes": 3939161 } } } ``` There are an arbitrary number of top-level keys in the JSON response at the configured URLs, and this plugin will iterate through all of their values looking for objects with keys "name", "tags", and "values" indicating a metric to be consumed by telegraf. Running this on current master of InfluxDB, I am able to record nearly the same information that is normally stored in the `_internal` database; the only measurement missing from `_internal` is `runtime`, which is present under the "memstats" key but does not follow the format and so is not consumed in this plugin. ``` $ influx -database=telegraf -execute 'SHOW FIELD KEYS FROM /influxdb/' name: influxdb_influxdb_engine ---------------------------- fieldKey blksWrite blksWriteBytes blksWriteBytesC pointsWrite pointsWriteDedupe name: influxdb_influxdb_httpd --------------------------- fieldKey pingReq pointsWrittenOK queryReq queryRespBytes req writeReq writeReqBytes name: influxdb_influxdb_shard --------------------------- fieldKey fieldsCreate seriesCreate writePointsOk writeReq name: influxdb_influxdb_subscriber -------------------------------- fieldKey pointsWritten name: influxdb_influxdb_wal ------------------------- fieldKey autoFlush flushDuration idleFlush memSize metaFlush pointsFlush pointsWrite pointsWriteReq seriesFlush name: influxdb_influxdb_write --------------------------- fieldKey pointReq pointReqLocal req subWriteOk writeOk ```
2015-12-05 22:15:58 +00:00
return &InfluxDB{}
})
}