Allow exec plugin to parse line-protocol

closes #613
This commit is contained in:
Cameron Sparr 2016-01-29 11:30:35 -07:00
parent 4ea3f82e50
commit b97027ac9a
4 changed files with 159 additions and 33 deletions

View File

@ -8,6 +8,7 @@
- [#603](https://github.com/influxdata/telegraf/pull/603): Aggregate statsd timing measurements into fields. Thanks @marcinbunsch! - [#603](https://github.com/influxdata/telegraf/pull/603): Aggregate statsd timing measurements into fields. Thanks @marcinbunsch!
- [#601](https://github.com/influxdata/telegraf/issues/601): Warn when overwriting cached metrics. - [#601](https://github.com/influxdata/telegraf/issues/601): Warn when overwriting cached metrics.
- [#614](https://github.com/influxdata/telegraf/pull/614): PowerDNS input plugin. Thanks @Kasen! - [#614](https://github.com/influxdata/telegraf/pull/614): PowerDNS input plugin. Thanks @Kasen!
- [#617](https://github.com/influxdata/telegraf/pull/617): exec plugin: parse influx line protocol in addition to JSON.
### Bugfixes ### Bugfixes
- [#595](https://github.com/influxdata/telegraf/issues/595): graphite output should include tags to separate duplicate measurements. - [#595](https://github.com/influxdata/telegraf/issues/595): graphite output should include tags to separate duplicate measurements.

View File

@ -1,28 +1,39 @@
# Exec Plugin # Exec Input Plugin
The exec plugin can execute arbitrary commands which output JSON. Then it flattens JSON and finds The exec plugin can execute arbitrary commands which output JSON or
all numeric values, treating them as floats. InfluxDB [line-protocol](https://docs.influxdata.com/influxdb/v0.9/write_protocols/line/).
For example, if you have a json-returning command called mycollector, you could If using JSON, only numeric values are parsed and turned into floats. Booleans
setup the exec plugin with: and strings will be ignored.
### Configuration
``` ```
# Read flattened metrics from one or more commands that output JSON to stdout
[[inputs.exec]] [[inputs.exec]]
command = "/usr/bin/mycollector --output=json" # the command to run
command = "/usr/bin/mycollector --foo=bar"
# Data format to consume. This can be "json" or "influx" (line-protocol)
# NOTE json only reads numerical measurements, strings and booleans are ignored.
data_format = "json"
# measurement name suffix (for separating different commands)
name_suffix = "_mycollector" name_suffix = "_mycollector"
interval = "10s"
``` ```
The name suffix is appended to exec as "exec_name_suffix" to identify the input stream. Other options for modifying the measurement names are:
The interval is used to determine how often a particular command should be run. Each ```
time the exec plugin runs, it will only run a particular command if it has been at least name_override = "measurement_name"
`interval` seconds since the exec plugin last ran the command. name_prefix = "prefix_"
```
### Example 1
# Sample Let's say that we have the above configuration, and mycollector outputs the
following JSON:
Let's say that we have a command with the name_suffix "_mycollector", which gives the following output:
```json ```json
{ {
"a": 0.5, "a": 0.5,
@ -33,13 +44,39 @@ Let's say that we have a command with the name_suffix "_mycollector", which give
} }
``` ```
The collected metrics will be stored as field values under the same measurement "exec_mycollector": The collected metrics will be stored as fields under the measurement
"exec_mycollector":
``` ```
exec_mycollector a=0.5,b_c=0.1,b_d=5 1452815002357578567 exec_mycollector a=0.5,b_c=0.1,b_d=5 1452815002357578567
``` ```
Other options for modifying the measurement names are: ### Example 2
Now let's say we have the following configuration:
``` ```
name_override = "newname" [[inputs.exec]]
name_prefix = "prefix_" # the command to run
command = "/usr/bin/line_protocol_collector"
# Data format to consume. This can be "json" or "influx" (line-protocol)
# NOTE json only reads numerical measurements, strings and booleans are ignored.
data_format = "influx"
``` ```
And line_protocol_collector outputs the following line protocol:
```
cpu,cpu=cpu0,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu1,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu2,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu3,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu4,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu5,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu6,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
```
You will get data in InfluxDB exactly as it is defined above,
tags are cpu=cpuN, host=foo, and datacenter=us-east with fields usage_idle
and usage_busy. They will receive a timestamp at collection time.

View File

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"os/exec" "os/exec"
"time"
"github.com/gonuts/go-shellquote" "github.com/gonuts/go-shellquote"
@ -14,18 +15,20 @@ import (
) )
const sampleConfig = ` const sampleConfig = `
# NOTE This plugin only reads numerical measurements, strings and booleans
# will be ignored.
# the command to run # the command to run
command = "/usr/bin/mycollector --foo=bar" command = "/usr/bin/mycollector --foo=bar"
# Data format to consume. This can be "json" or "influx" (line-protocol)
# NOTE json only reads numerical measurements, strings and booleans are ignored.
data_format = "json"
# measurement name suffix (for separating different commands) # measurement name suffix (for separating different commands)
name_suffix = "_mycollector" name_suffix = "_mycollector"
` `
type Exec struct { type Exec struct {
Command string Command string
DataFormat string
runner Runner runner Runner
} }
@ -71,6 +74,8 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error {
return err return err
} }
switch e.DataFormat {
case "", "json":
var jsonOut interface{} var jsonOut interface{}
err = json.Unmarshal(out, &jsonOut) err = json.Unmarshal(out, &jsonOut)
if err != nil { if err != nil {
@ -83,8 +88,18 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error {
if err != nil { if err != nil {
return err return err
} }
acc.AddFields("exec", f.Fields, nil) acc.AddFields("exec", f.Fields, nil)
case "influx":
now := time.Now()
metrics, err := telegraf.ParseMetrics(out)
for _, metric := range metrics {
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), now)
}
return err
default:
return fmt.Errorf("Unsupported data format: %s. Must be either json "+
"or influx.", e.DataFormat)
}
return nil return nil
} }

View File

@ -31,6 +31,18 @@ const malformedJson = `
"status": "green", "status": "green",
` `
const lineProtocol = "cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1"
const lineProtocolMulti = `
cpu,cpu=cpu0,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu1,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu2,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu3,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu4,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu5,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu6,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
`
type runnerMock struct { type runnerMock struct {
out []byte out []byte
err error err error
@ -97,3 +109,64 @@ func TestCommandError(t *testing.T) {
require.Error(t, err) require.Error(t, err)
assert.Equal(t, acc.NFields(), 0, "No new points should have been added") assert.Equal(t, acc.NFields(), 0, "No new points should have been added")
} }
func TestLineProtocolParse(t *testing.T) {
e := &Exec{
runner: newRunnerMock([]byte(lineProtocol), nil),
Command: "line-protocol",
DataFormat: "influx",
}
var acc testutil.Accumulator
err := e.Gather(&acc)
require.NoError(t, err)
fields := map[string]interface{}{
"usage_idle": float64(99),
"usage_busy": float64(1),
}
tags := map[string]string{
"host": "foo",
"datacenter": "us-east",
}
acc.AssertContainsTaggedFields(t, "cpu", fields, tags)
}
func TestLineProtocolParseMultiple(t *testing.T) {
e := &Exec{
runner: newRunnerMock([]byte(lineProtocolMulti), nil),
Command: "line-protocol",
DataFormat: "influx",
}
var acc testutil.Accumulator
err := e.Gather(&acc)
require.NoError(t, err)
fields := map[string]interface{}{
"usage_idle": float64(99),
"usage_busy": float64(1),
}
tags := map[string]string{
"host": "foo",
"datacenter": "us-east",
}
cpuTags := []string{"cpu0", "cpu1", "cpu2", "cpu3", "cpu4", "cpu5", "cpu6"}
for _, cpu := range cpuTags {
tags["cpu"] = cpu
acc.AssertContainsTaggedFields(t, "cpu", fields, tags)
}
}
func TestInvalidDataFormat(t *testing.T) {
e := &Exec{
runner: newRunnerMock([]byte(lineProtocol), nil),
Command: "bad data format",
DataFormat: "FooBar",
}
var acc testutil.Accumulator
err := e.Gather(&acc)
require.Error(t, err)
}