parent
2f73f95819
commit
434a14978f
|
@ -8,6 +8,7 @@
|
||||||
- [#603](https://github.com/influxdata/telegraf/pull/603): Aggregate statsd timing measurements into fields. Thanks @marcinbunsch!
|
- [#603](https://github.com/influxdata/telegraf/pull/603): Aggregate statsd timing measurements into fields. Thanks @marcinbunsch!
|
||||||
- [#601](https://github.com/influxdata/telegraf/issues/601): Warn when overwriting cached metrics.
|
- [#601](https://github.com/influxdata/telegraf/issues/601): Warn when overwriting cached metrics.
|
||||||
- [#614](https://github.com/influxdata/telegraf/pull/614): PowerDNS input plugin. Thanks @Kasen!
|
- [#614](https://github.com/influxdata/telegraf/pull/614): PowerDNS input plugin. Thanks @Kasen!
|
||||||
|
- [#617](https://github.com/influxdata/telegraf/pull/617): exec plugin: parse influx line protocol in addition to JSON.
|
||||||
|
|
||||||
### Bugfixes
|
### Bugfixes
|
||||||
- [#595](https://github.com/influxdata/telegraf/issues/595): graphite output should include tags to separate duplicate measurements.
|
- [#595](https://github.com/influxdata/telegraf/issues/595): graphite output should include tags to separate duplicate measurements.
|
||||||
|
|
|
@ -1,28 +1,39 @@
|
||||||
# Exec Plugin
|
# Exec Input Plugin
|
||||||
|
|
||||||
The exec plugin can execute arbitrary commands which output JSON. Then it flattens JSON and finds
|
The exec plugin can execute arbitrary commands which output JSON or
|
||||||
all numeric values, treating them as floats.
|
InfluxDB [line-protocol](https://docs.influxdata.com/influxdb/v0.9/write_protocols/line/).
|
||||||
|
|
||||||
For example, if you have a json-returning command called mycollector, you could
|
If using JSON, only numeric values are parsed and turned into floats. Booleans
|
||||||
setup the exec plugin with:
|
and strings will be ignored.
|
||||||
|
|
||||||
|
### Configuration
|
||||||
|
|
||||||
```
|
```
|
||||||
|
# Read flattened metrics from one or more commands that output JSON to stdout
|
||||||
[[inputs.exec]]
|
[[inputs.exec]]
|
||||||
command = "/usr/bin/mycollector --output=json"
|
# the command to run
|
||||||
|
command = "/usr/bin/mycollector --foo=bar"
|
||||||
|
|
||||||
|
# Data format to consume. This can be "json" or "influx" (line-protocol)
|
||||||
|
# NOTE json only reads numerical measurements, strings and booleans are ignored.
|
||||||
|
data_format = "json"
|
||||||
|
|
||||||
|
# measurement name suffix (for separating different commands)
|
||||||
name_suffix = "_mycollector"
|
name_suffix = "_mycollector"
|
||||||
interval = "10s"
|
|
||||||
```
|
```
|
||||||
|
|
||||||
The name suffix is appended to exec as "exec_name_suffix" to identify the input stream.
|
Other options for modifying the measurement names are:
|
||||||
|
|
||||||
The interval is used to determine how often a particular command should be run. Each
|
```
|
||||||
time the exec plugin runs, it will only run a particular command if it has been at least
|
name_override = "measurement_name"
|
||||||
`interval` seconds since the exec plugin last ran the command.
|
name_prefix = "prefix_"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Example 1
|
||||||
|
|
||||||
# Sample
|
Let's say that we have the above configuration, and mycollector outputs the
|
||||||
|
following JSON:
|
||||||
|
|
||||||
Let's say that we have a command with the name_suffix "_mycollector", which gives the following output:
|
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"a": 0.5,
|
"a": 0.5,
|
||||||
|
@ -33,13 +44,39 @@ Let's say that we have a command with the name_suffix "_mycollector", which give
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
The collected metrics will be stored as field values under the same measurement "exec_mycollector":
|
The collected metrics will be stored as fields under the measurement
|
||||||
|
"exec_mycollector":
|
||||||
|
|
||||||
```
|
```
|
||||||
exec_mycollector a=0.5,b_c=0.1,b_d=5 1452815002357578567
|
exec_mycollector a=0.5,b_c=0.1,b_d=5 1452815002357578567
|
||||||
```
|
```
|
||||||
|
|
||||||
Other options for modifying the measurement names are:
|
### Example 2
|
||||||
|
|
||||||
|
Now let's say we have the following configuration:
|
||||||
|
|
||||||
```
|
```
|
||||||
name_override = "newname"
|
[[inputs.exec]]
|
||||||
name_prefix = "prefix_"
|
# the command to run
|
||||||
|
command = "/usr/bin/line_protocol_collector"
|
||||||
|
|
||||||
|
# Data format to consume. This can be "json" or "influx" (line-protocol)
|
||||||
|
# NOTE json only reads numerical measurements, strings and booleans are ignored.
|
||||||
|
data_format = "influx"
|
||||||
```
|
```
|
||||||
|
|
||||||
|
And line_protocol_collector outputs the following line protocol:
|
||||||
|
|
||||||
|
```
|
||||||
|
cpu,cpu=cpu0,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
|
||||||
|
cpu,cpu=cpu1,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
|
||||||
|
cpu,cpu=cpu2,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
|
||||||
|
cpu,cpu=cpu3,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
|
||||||
|
cpu,cpu=cpu4,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
|
||||||
|
cpu,cpu=cpu5,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
|
||||||
|
cpu,cpu=cpu6,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
|
||||||
|
```
|
||||||
|
|
||||||
|
You will get data in InfluxDB exactly as it is defined above,
|
||||||
|
tags are cpu=cpuN, host=foo, and datacenter=us-east with fields usage_idle
|
||||||
|
and usage_busy. They will receive a timestamp at collection time.
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/gonuts/go-shellquote"
|
"github.com/gonuts/go-shellquote"
|
||||||
|
|
||||||
|
@ -14,18 +15,20 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const sampleConfig = `
|
const sampleConfig = `
|
||||||
# NOTE This plugin only reads numerical measurements, strings and booleans
|
|
||||||
# will be ignored.
|
|
||||||
|
|
||||||
# the command to run
|
# the command to run
|
||||||
command = "/usr/bin/mycollector --foo=bar"
|
command = "/usr/bin/mycollector --foo=bar"
|
||||||
|
|
||||||
|
# Data format to consume. This can be "json" or "influx" (line-protocol)
|
||||||
|
# NOTE json only reads numerical measurements, strings and booleans are ignored.
|
||||||
|
data_format = "json"
|
||||||
|
|
||||||
# measurement name suffix (for separating different commands)
|
# measurement name suffix (for separating different commands)
|
||||||
name_suffix = "_mycollector"
|
name_suffix = "_mycollector"
|
||||||
`
|
`
|
||||||
|
|
||||||
type Exec struct {
|
type Exec struct {
|
||||||
Command string
|
Command string
|
||||||
|
DataFormat string
|
||||||
|
|
||||||
runner Runner
|
runner Runner
|
||||||
}
|
}
|
||||||
|
@ -71,6 +74,8 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
switch e.DataFormat {
|
||||||
|
case "", "json":
|
||||||
var jsonOut interface{}
|
var jsonOut interface{}
|
||||||
err = json.Unmarshal(out, &jsonOut)
|
err = json.Unmarshal(out, &jsonOut)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -83,8 +88,18 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
acc.AddFields("exec", f.Fields, nil)
|
acc.AddFields("exec", f.Fields, nil)
|
||||||
|
case "influx":
|
||||||
|
now := time.Now()
|
||||||
|
metrics, err := telegraf.ParseMetrics(out)
|
||||||
|
for _, metric := range metrics {
|
||||||
|
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), now)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("Unsupported data format: %s. Must be either json "+
|
||||||
|
"or influx.", e.DataFormat)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,6 +31,18 @@ const malformedJson = `
|
||||||
"status": "green",
|
"status": "green",
|
||||||
`
|
`
|
||||||
|
|
||||||
|
const lineProtocol = "cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1"
|
||||||
|
|
||||||
|
const lineProtocolMulti = `
|
||||||
|
cpu,cpu=cpu0,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
|
||||||
|
cpu,cpu=cpu1,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
|
||||||
|
cpu,cpu=cpu2,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
|
||||||
|
cpu,cpu=cpu3,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
|
||||||
|
cpu,cpu=cpu4,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
|
||||||
|
cpu,cpu=cpu5,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
|
||||||
|
cpu,cpu=cpu6,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
|
||||||
|
`
|
||||||
|
|
||||||
type runnerMock struct {
|
type runnerMock struct {
|
||||||
out []byte
|
out []byte
|
||||||
err error
|
err error
|
||||||
|
@ -97,3 +109,64 @@ func TestCommandError(t *testing.T) {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
assert.Equal(t, acc.NFields(), 0, "No new points should have been added")
|
assert.Equal(t, acc.NFields(), 0, "No new points should have been added")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLineProtocolParse(t *testing.T) {
|
||||||
|
e := &Exec{
|
||||||
|
runner: newRunnerMock([]byte(lineProtocol), nil),
|
||||||
|
Command: "line-protocol",
|
||||||
|
DataFormat: "influx",
|
||||||
|
}
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
err := e.Gather(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"usage_idle": float64(99),
|
||||||
|
"usage_busy": float64(1),
|
||||||
|
}
|
||||||
|
tags := map[string]string{
|
||||||
|
"host": "foo",
|
||||||
|
"datacenter": "us-east",
|
||||||
|
}
|
||||||
|
acc.AssertContainsTaggedFields(t, "cpu", fields, tags)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLineProtocolParseMultiple(t *testing.T) {
|
||||||
|
e := &Exec{
|
||||||
|
runner: newRunnerMock([]byte(lineProtocolMulti), nil),
|
||||||
|
Command: "line-protocol",
|
||||||
|
DataFormat: "influx",
|
||||||
|
}
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
err := e.Gather(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"usage_idle": float64(99),
|
||||||
|
"usage_busy": float64(1),
|
||||||
|
}
|
||||||
|
tags := map[string]string{
|
||||||
|
"host": "foo",
|
||||||
|
"datacenter": "us-east",
|
||||||
|
}
|
||||||
|
cpuTags := []string{"cpu0", "cpu1", "cpu2", "cpu3", "cpu4", "cpu5", "cpu6"}
|
||||||
|
|
||||||
|
for _, cpu := range cpuTags {
|
||||||
|
tags["cpu"] = cpu
|
||||||
|
acc.AssertContainsTaggedFields(t, "cpu", fields, tags)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestInvalidDataFormat(t *testing.T) {
|
||||||
|
e := &Exec{
|
||||||
|
runner: newRunnerMock([]byte(lineProtocol), nil),
|
||||||
|
Command: "bad data format",
|
||||||
|
DataFormat: "FooBar",
|
||||||
|
}
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
err := e.Gather(&acc)
|
||||||
|
require.Error(t, err)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue