Rename influxdbjson to influxdb

This commit is contained in:
Mark Rushakoff 2015-12-15 22:12:47 -08:00
parent 65b9d2fcc9
commit 60dc19fe69
4 changed files with 43 additions and 54 deletions

View File

@ -9,7 +9,7 @@ import (
_ "github.com/influxdb/telegraf/plugins/exec" _ "github.com/influxdb/telegraf/plugins/exec"
_ "github.com/influxdb/telegraf/plugins/haproxy" _ "github.com/influxdb/telegraf/plugins/haproxy"
_ "github.com/influxdb/telegraf/plugins/httpjson" _ "github.com/influxdb/telegraf/plugins/httpjson"
_ "github.com/influxdb/telegraf/plugins/influxdbjson" _ "github.com/influxdb/telegraf/plugins/influxdb"
_ "github.com/influxdb/telegraf/plugins/jolokia" _ "github.com/influxdb/telegraf/plugins/jolokia"
_ "github.com/influxdb/telegraf/plugins/kafka_consumer" _ "github.com/influxdb/telegraf/plugins/kafka_consumer"
_ "github.com/influxdb/telegraf/plugins/leofs" _ "github.com/influxdb/telegraf/plugins/leofs"

View File

@ -1,11 +1,11 @@
# influxdbjson plugin # influxdb plugin
The influxdbjson plugin collects InfluxDB-formatted data from JSON endpoints. The influxdb plugin collects InfluxDB-formatted data from JSON endpoints.
With a configuration of: With a configuration of:
```toml ```toml
[[plugins.influxdbjson]] [[plugins.influxdb]]
name = "produce" name = "produce"
urls = [ urls = [
"http://127.0.0.1:8086/debug/vars", "http://127.0.0.1:8086/debug/vars",
@ -58,10 +58,10 @@ And if 192.168.2.1 responds like so:
Then the collected metrics will be: Then the collected metrics will be:
``` ```
influxdbjson_produce_fruit,url='http://127.0.0.1:8086/debug/vars',kind='apple' inventory=371.0,sold=112.0 influxdb_produce_fruit,url='http://127.0.0.1:8086/debug/vars',kind='apple' inventory=371.0,sold=112.0
influxdbjson_produce_fruit,url='http://127.0.0.1:8086/debug/vars',kind='banana' inventory=1000.0,sold=403.0 influxdb_produce_fruit,url='http://127.0.0.1:8086/debug/vars',kind='banana' inventory=1000.0,sold=403.0
influxdbjson_produce_transactions,url='http://192.168.2.1:8086/debug/vars' total=100.0,balance=184.75 influxdb_produce_transactions,url='http://192.168.2.1:8086/debug/vars' total=100.0,balance=184.75
``` ```
There are two important details to note about the collected metrics: There are two important details to note about the collected metrics:

View File

@ -1,8 +1,9 @@
package influxdbjson package influxdb
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"net/http" "net/http"
"strings" "strings"
"sync" "sync"
@ -10,59 +11,41 @@ import (
"github.com/influxdb/telegraf/plugins" "github.com/influxdb/telegraf/plugins"
) )
type InfluxDBJSON struct { type InfluxDB struct {
Name string Name string
URLs []string `toml:"urls"` URLs []string `toml:"urls"`
} }
func (*InfluxDBJSON) Description() string { func (*InfluxDB) Description() string {
return "Read InfluxDB-formatted JSON metrics from one or more HTTP endpoints" return "Read InfluxDB-formatted JSON metrics from one or more HTTP endpoints"
} }
func (*InfluxDBJSON) SampleConfig() string { func (*InfluxDB) SampleConfig() string {
return ` return `
# Reads InfluxDB-formatted JSON from given URLs. For example, # Reads InfluxDB-formatted JSON from given URLs.
# monitoring a URL which responded with a JSON object formatted like this: # Works with InfluxDB debug endpoints out of the box, but other services can use this format too.
# # See the influxdb plugin's README for more details.
# { [[plugins.influxdb]]
# "(ignored_key)": { # Name to use for measurement
# "name": "connections", name = "influxdb"
# "tags": {
# "host": "foo"
# },
# "values": {
# "avg_ms": 1.234,
# }
# }
# }
#
# with configuration of { name = "server", urls = ["http://127.0.0.1:8086/x"] }
#
# Would result in this recorded metric:
#
# influxdbjson_server_connections,url='http://127.0.0.1:8086/x',host='foo' avg_ms=1.234
[[plugins.influxdbjson]]
# Name to use for measurement
name = "influxdb"
# Multiple URLs from which to read InfluxDB-formatted JSON # Multiple URLs from which to read InfluxDB-formatted JSON
urls = [ urls = [
"http://localhost:8086/debug/vars" "http://localhost:8086/debug/vars"
] ]
` `
} }
func (i *InfluxDBJSON) Gather(acc plugins.Accumulator) error { func (i *InfluxDB) Gather(acc plugins.Accumulator) error {
var wg sync.WaitGroup
errorChannel := make(chan error, len(i.URLs)) errorChannel := make(chan error, len(i.URLs))
var wg sync.WaitGroup
for _, u := range i.URLs { for _, u := range i.URLs {
wg.Add(1) wg.Add(1)
go func(url string) { go func(url string) {
defer wg.Done() defer wg.Done()
if err := i.gatherURL(acc, url); err != nil { if err := i.gatherURL(acc, url); err != nil {
errorChannel <- err errorChannel <- fmt.Errorf("[name=%s][url=%s]: %s", i.Name, url, err)
} }
}(u) }(u)
} }
@ -70,15 +53,17 @@ func (i *InfluxDBJSON) Gather(acc plugins.Accumulator) error {
wg.Wait() wg.Wait()
close(errorChannel) close(errorChannel)
// Get all errors and return them as one giant error // If there weren't any errors, we can return nil now.
errorStrings := []string{} if len(errorChannel) == 0 {
return nil
}
// There were errors, so join them all together as one big error.
errorStrings := make([]string, 0, len(errorChannel))
for err := range errorChannel { for err := range errorChannel {
errorStrings = append(errorStrings, err.Error()) errorStrings = append(errorStrings, err.Error())
} }
if len(errorStrings) == 0 {
return nil
}
return errors.New(strings.Join(errorStrings, "\n")) return errors.New(strings.Join(errorStrings, "\n"))
} }
@ -95,7 +80,7 @@ type point struct {
// //
// Returns: // Returns:
// error: Any error that may have occurred // error: Any error that may have occurred
func (i *InfluxDBJSON) gatherURL( func (i *InfluxDB) gatherURL(
acc plugins.Accumulator, acc plugins.Accumulator,
url string, url string,
) error { ) error {
@ -105,7 +90,11 @@ func (i *InfluxDBJSON) gatherURL(
} }
defer resp.Body.Close() defer resp.Body.Close()
// Can't predict what all is going to be in the response, so decode the top keys one at a time. // It would be nice to be able to decode into a map[string]point, but
// we'll get a decoder error like:
// `json: cannot unmarshal array into Go value of type influxdb.point`
// if any of the values aren't objects.
// To avoid that error, we decode by hand.
dec := json.NewDecoder(resp.Body) dec := json.NewDecoder(resp.Body)
// Parse beginning of object // Parse beginning of object
@ -155,7 +144,7 @@ func (i *InfluxDBJSON) gatherURL(
} }
func init() { func init() {
plugins.Add("influxdbjson", func() plugins.Plugin { plugins.Add("influxdb", func() plugins.Plugin {
return &InfluxDBJSON{} return &InfluxDB{}
}) })
} }

View File

@ -1,11 +1,11 @@
package influxdbjson_test package influxdb_test
import ( import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"testing" "testing"
"github.com/influxdb/telegraf/plugins/influxdbjson" "github.com/influxdb/telegraf/plugins/influxdb"
"github.com/influxdb/telegraf/testutil" "github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -64,7 +64,7 @@ func TestBasic(t *testing.T) {
})) }))
defer fakeServer.Close() defer fakeServer.Close()
plugin := &influxdbjson.InfluxDBJSON{ plugin := &influxdb.InfluxDB{
Name: "test", Name: "test",
URLs: []string{fakeServer.URL + "/endpoint"}, URLs: []string{fakeServer.URL + "/endpoint"},
} }