Rename expvar plugin -> influxdbjson
This commit is contained in:
parent
bc9a71e111
commit
b267c218e2
|
@ -7,9 +7,9 @@ import (
|
||||||
_ "github.com/influxdb/telegraf/plugins/disque"
|
_ "github.com/influxdb/telegraf/plugins/disque"
|
||||||
_ "github.com/influxdb/telegraf/plugins/elasticsearch"
|
_ "github.com/influxdb/telegraf/plugins/elasticsearch"
|
||||||
_ "github.com/influxdb/telegraf/plugins/exec"
|
_ "github.com/influxdb/telegraf/plugins/exec"
|
||||||
_ "github.com/influxdb/telegraf/plugins/expvar"
|
|
||||||
_ "github.com/influxdb/telegraf/plugins/haproxy"
|
_ "github.com/influxdb/telegraf/plugins/haproxy"
|
||||||
_ "github.com/influxdb/telegraf/plugins/httpjson"
|
_ "github.com/influxdb/telegraf/plugins/httpjson"
|
||||||
|
_ "github.com/influxdb/telegraf/plugins/influxdbjson"
|
||||||
_ "github.com/influxdb/telegraf/plugins/jolokia"
|
_ "github.com/influxdb/telegraf/plugins/jolokia"
|
||||||
_ "github.com/influxdb/telegraf/plugins/kafka_consumer"
|
_ "github.com/influxdb/telegraf/plugins/kafka_consumer"
|
||||||
_ "github.com/influxdb/telegraf/plugins/leofs"
|
_ "github.com/influxdb/telegraf/plugins/leofs"
|
||||||
|
|
|
@ -1,12 +1,11 @@
|
||||||
# expvar plugin
|
# influxdbjson plugin
|
||||||
|
|
||||||
The expvar plugin collects InfluxDB-style expvar data from JSON endpoints.
|
The influxdbjson plugin collects InfluxDB-formatted data from JSON endpoints.
|
||||||
|
|
||||||
With a configuration of:
|
With a configuration of:
|
||||||
|
|
||||||
```toml
|
```toml
|
||||||
[plugins.expvar]
|
[[plugins.influxdbjson]]
|
||||||
[[plugins.expvar.services]]
|
|
||||||
name = "produce"
|
name = "produce"
|
||||||
urls = [
|
urls = [
|
||||||
"http://127.0.0.1:8086/debug/vars",
|
"http://127.0.0.1:8086/debug/vars",
|
||||||
|
@ -59,10 +58,10 @@ And if 192.168.2.1 responds like so:
|
||||||
Then the collected metrics will be:
|
Then the collected metrics will be:
|
||||||
|
|
||||||
```
|
```
|
||||||
expvar_produce_fruit,expvar_url='http://127.0.0.1:8086/debug/vars',kind='apple' inventory=371.0,sold=112.0
|
influxdbjson_produce_fruit,influxdbjson_url='http://127.0.0.1:8086/debug/vars',kind='apple' inventory=371.0,sold=112.0
|
||||||
expvar_produce_fruit,expvar_url='http://127.0.0.1:8086/debug/vars',kind='banana' inventory=1000.0,sold=403.0
|
influxdbjson_produce_fruit,influxdbjson_url='http://127.0.0.1:8086/debug/vars',kind='banana' inventory=1000.0,sold=403.0
|
||||||
|
|
||||||
expvar_produce_transactions,expvar_url='http://192.168.2.1:8086/debug/vars' total=100.0,balance=184.75
|
influxdbjson_produce_transactions,influxdbjson_url='http://192.168.2.1:8086/debug/vars' total=100.0,balance=184.75
|
||||||
```
|
```
|
||||||
|
|
||||||
There are two important details to note about the collected metrics:
|
There are two important details to note about the collected metrics:
|
|
@ -1,4 +1,4 @@
|
||||||
package expvar
|
package influxdbjson
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
@ -10,54 +10,61 @@ import (
|
||||||
"github.com/influxdb/telegraf/plugins"
|
"github.com/influxdb/telegraf/plugins"
|
||||||
)
|
)
|
||||||
|
|
||||||
var sampleConfig = `
|
type InfluxDBJSON struct {
|
||||||
# Specify services via an array of tables
|
|
||||||
[[plugins.expvar.services]]
|
|
||||||
# Name for the service being polled
|
|
||||||
name = "influxdb"
|
|
||||||
|
|
||||||
# Multiple URLs from which to read expvars
|
|
||||||
urls = [
|
|
||||||
"http://localhost:8086/debug/vars"
|
|
||||||
]
|
|
||||||
`
|
|
||||||
|
|
||||||
type Expvar struct {
|
|
||||||
Services []Service
|
|
||||||
}
|
|
||||||
|
|
||||||
type Service struct {
|
|
||||||
Name string
|
Name string
|
||||||
URLs []string `toml:"urls"`
|
URLs []string `toml:"urls"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*Expvar) Description() string {
|
func (*InfluxDBJSON) Description() string {
|
||||||
return "Read InfluxDB-style expvar metrics from one or more HTTP endpoints"
|
return "Read InfluxDB-formatted JSON metrics from one or more HTTP endpoints"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*Expvar) SampleConfig() string {
|
func (*InfluxDBJSON) SampleConfig() string {
|
||||||
return sampleConfig
|
return `
|
||||||
|
# Reads InfluxDB-formatted JSON from given URLs. For example,
|
||||||
|
# monitoring a URL which responded with a JSON object formatted like this:
|
||||||
|
#
|
||||||
|
# {
|
||||||
|
# "(ignored_key)": {
|
||||||
|
# "name": "connections",
|
||||||
|
# "tags": {
|
||||||
|
# "host": "foo"
|
||||||
|
# },
|
||||||
|
# "values": {
|
||||||
|
# "avg_ms": 1.234,
|
||||||
|
# }
|
||||||
|
# }
|
||||||
|
# }
|
||||||
|
#
|
||||||
|
# with configuration of { name = "server", urls = ["http://127.0.0.1:8086/x"] }
|
||||||
|
#
|
||||||
|
# Would result in this recorded metric:
|
||||||
|
#
|
||||||
|
# influxdbjson_server_connections,influxdbjson_url='http://127.0.0.1:8086/x',host='foo' avg_ms=1.234
|
||||||
|
[[plugins.influxdbjson]]
|
||||||
|
# Name to use for measurement
|
||||||
|
name = "influxdb"
|
||||||
|
|
||||||
|
# Multiple URLs from which to read InfluxDB-formatted JSON
|
||||||
|
urls = [
|
||||||
|
"http://localhost:8086/debug/vars"
|
||||||
|
]
|
||||||
|
`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Expvar) Gather(acc plugins.Accumulator) error {
|
func (i *InfluxDBJSON) Gather(acc plugins.Accumulator) error {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
totalURLs := 0
|
errorChannel := make(chan error, len(i.URLs))
|
||||||
for _, service := range e.Services {
|
|
||||||
totalURLs += len(service.URLs)
|
|
||||||
}
|
|
||||||
errorChannel := make(chan error, totalURLs)
|
|
||||||
|
|
||||||
for _, service := range e.Services {
|
for _, u := range i.URLs {
|
||||||
for _, u := range service.URLs {
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(service Service, url string) {
|
go func(url string) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
if err := e.gatherURL(acc, service, url); err != nil {
|
if err := i.gatherURL(acc, url); err != nil {
|
||||||
errorChannel <- err
|
errorChannel <- err
|
||||||
}
|
}
|
||||||
}(service, u)
|
}(u)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
@ -84,14 +91,12 @@ type point struct {
|
||||||
// Gathers data from a particular URL
|
// Gathers data from a particular URL
|
||||||
// Parameters:
|
// Parameters:
|
||||||
// acc : The telegraf Accumulator to use
|
// acc : The telegraf Accumulator to use
|
||||||
// service: the service being queried
|
|
||||||
// url : endpoint to send request to
|
// url : endpoint to send request to
|
||||||
//
|
//
|
||||||
// Returns:
|
// Returns:
|
||||||
// error: Any error that may have occurred
|
// error: Any error that may have occurred
|
||||||
func (e *Expvar) gatherURL(
|
func (i *InfluxDBJSON) gatherURL(
|
||||||
acc plugins.Accumulator,
|
acc plugins.Accumulator,
|
||||||
service Service,
|
|
||||||
url string,
|
url string,
|
||||||
) error {
|
) error {
|
||||||
resp, err := http.Get(url)
|
resp, err := http.Get(url)
|
||||||
|
@ -107,7 +112,7 @@ func (e *Expvar) gatherURL(
|
||||||
if t, err := dec.Token(); err != nil {
|
if t, err := dec.Token(); err != nil {
|
||||||
return err
|
return err
|
||||||
} else if t != json.Delim('{') {
|
} else if t != json.Delim('{') {
|
||||||
return errors.New("expvars must be a JSON object")
|
return errors.New("document root must be a JSON object")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Loop through rest of object
|
// Loop through rest of object
|
||||||
|
@ -117,7 +122,7 @@ func (e *Expvar) gatherURL(
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read in a string key. We actually don't care about the top-level keys
|
// Read in a string key. We don't do anything with the top-level keys, so it's discarded.
|
||||||
_, err := dec.Token()
|
_, err := dec.Token()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -131,14 +136,16 @@ func (e *Expvar) gatherURL(
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the object was a point, but was not fully initialized, ignore it and move on.
|
||||||
if p.Name == "" || p.Tags == nil || p.Values == nil || len(p.Values) == 0 {
|
if p.Name == "" || p.Tags == nil || p.Values == nil || len(p.Values) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
p.Tags["expvar_url"] = url
|
// Add a tag to indicate the source of the data.
|
||||||
|
p.Tags["influxdbjson_url"] = url
|
||||||
|
|
||||||
acc.AddFields(
|
acc.AddFields(
|
||||||
service.Name+"_"+p.Name,
|
i.Name+"_"+p.Name,
|
||||||
p.Values,
|
p.Values,
|
||||||
p.Tags,
|
p.Tags,
|
||||||
)
|
)
|
||||||
|
@ -148,7 +155,7 @@ func (e *Expvar) gatherURL(
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
plugins.Add("expvar", func() plugins.Plugin {
|
plugins.Add("influxdbjson", func() plugins.Plugin {
|
||||||
return &Expvar{}
|
return &InfluxDBJSON{}
|
||||||
})
|
})
|
||||||
}
|
}
|
|
@ -1,10 +1,11 @@
|
||||||
package expvar
|
package influxdbjson_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdb/telegraf/plugins/influxdbjson"
|
||||||
"github.com/influxdb/telegraf/testutil"
|
"github.com/influxdb/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
@ -55,7 +56,7 @@ func TestBasic(t *testing.T) {
|
||||||
}
|
}
|
||||||
`
|
`
|
||||||
fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
if r.URL.Path == "/expvar" {
|
if r.URL.Path == "/endpoint" {
|
||||||
_, _ = w.Write([]byte(js))
|
_, _ = w.Write([]byte(js))
|
||||||
} else {
|
} else {
|
||||||
w.WriteHeader(http.StatusNotFound)
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
@ -63,17 +64,13 @@ func TestBasic(t *testing.T) {
|
||||||
}))
|
}))
|
||||||
defer fakeServer.Close()
|
defer fakeServer.Close()
|
||||||
|
|
||||||
expvar := &Expvar{
|
plugin := &influxdbjson.InfluxDBJSON{
|
||||||
Services: []Service{
|
|
||||||
{
|
|
||||||
Name: "test",
|
Name: "test",
|
||||||
URLs: []string{fakeServer.URL + "/expvar"},
|
URLs: []string{fakeServer.URL + "/endpoint"},
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
require.NoError(t, expvar.Gather(&acc))
|
require.NoError(t, plugin.Gather(&acc))
|
||||||
|
|
||||||
require.Len(t, acc.Points, 2)
|
require.Len(t, acc.Points, 2)
|
||||||
require.NoError(t, acc.ValidateTaggedFieldsValue(
|
require.NoError(t, acc.ValidateTaggedFieldsValue(
|
||||||
|
@ -88,7 +85,7 @@ func TestBasic(t *testing.T) {
|
||||||
},
|
},
|
||||||
map[string]string{
|
map[string]string{
|
||||||
"id": "ex1",
|
"id": "ex1",
|
||||||
"expvar_url": fakeServer.URL + "/expvar",
|
"influxdbjson_url": fakeServer.URL + "/endpoint",
|
||||||
},
|
},
|
||||||
))
|
))
|
||||||
require.NoError(t, acc.ValidateTaggedFieldsValue(
|
require.NoError(t, acc.ValidateTaggedFieldsValue(
|
||||||
|
@ -98,7 +95,7 @@ func TestBasic(t *testing.T) {
|
||||||
},
|
},
|
||||||
map[string]string{
|
map[string]string{
|
||||||
"id": "ex2",
|
"id": "ex2",
|
||||||
"expvar_url": fakeServer.URL + "/expvar",
|
"influxdbjson_url": fakeServer.URL + "/endpoint",
|
||||||
},
|
},
|
||||||
))
|
))
|
||||||
}
|
}
|
Loading…
Reference in New Issue