From 32cbbdbf735213d3d20518ce6a0c042fbe8f045d Mon Sep 17 00:00:00 2001 From: Alvaro Morales Date: Wed, 5 Aug 2015 17:29:27 -0700 Subject: [PATCH 1/2] Add exec plugin --- plugins/all/all.go | 1 + plugins/exec/exec.go | 129 ++++++++++++++++++++++++++++++++++++++ plugins/exec/exec_test.go | 88 ++++++++++++++++++++++++++ 3 files changed, 218 insertions(+) create mode 100644 plugins/exec/exec.go create mode 100644 plugins/exec/exec_test.go diff --git a/plugins/all/all.go b/plugins/all/all.go index ab7dfcbbf..1bd332da3 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -3,6 +3,7 @@ package all import ( _ "github.com/influxdb/telegraf/plugins/disque" _ "github.com/influxdb/telegraf/plugins/elasticsearch" + _ "github.com/influxdb/telegraf/plugins/exec" _ "github.com/influxdb/telegraf/plugins/haproxy" _ "github.com/influxdb/telegraf/plugins/kafka_consumer" _ "github.com/influxdb/telegraf/plugins/lustre2" diff --git a/plugins/exec/exec.go b/plugins/exec/exec.go new file mode 100644 index 000000000..a8f5bcb95 --- /dev/null +++ b/plugins/exec/exec.go @@ -0,0 +1,129 @@ +package exec + +import ( + "bytes" + "encoding/json" + "fmt" + "github.com/bitly/go-simplejson" + "github.com/gonuts/go-shellquote" + "github.com/influxdb/telegraf/plugins" + "os/exec" + "sync" +) + +const sampleConfig = ` +# specify commands via an array of tables +[[exec.commands]] +# the command to run +command = "/usr/bin/mycollector --foo=bar" + +# name of the command (used as a prefix for measurements) +name = "mycollector" +` + +type Command struct { + Command string + Name string +} + +type Exec struct { + Commands []*Command + runner Runner +} + +type Runner interface { + Run(string, ...string) ([]byte, error) +} + +type CommandRunner struct { +} + +func NewExec() *Exec { + return &Exec{runner: CommandRunner{}} +} + +func (c CommandRunner) Run(command string, args ...string) ([]byte, error) { + cmd := exec.Command(command, args...) + var out bytes.Buffer + cmd.Stdout = &out + + if err := cmd.Run(); err != nil { + return nil, fmt.Errorf("exec: %s for command '%s'", err, command) + } + + return out.Bytes(), nil +} + +func (e *Exec) SampleConfig() string { + return sampleConfig +} + +func (e *Exec) Description() string { + return "Read flattened metrics from one or more commands that output JSON to stdout" +} + +func (e *Exec) Gather(acc plugins.Accumulator) error { + var wg sync.WaitGroup + + var outerr error + + for _, c := range e.Commands { + wg.Add(1) + go func(c *Command, acc plugins.Accumulator) { + defer wg.Done() + outerr = e.gatherCommand(c, acc) + }(c, acc) + } + + wg.Wait() + + return outerr +} + +func (e *Exec) gatherCommand(c *Command, acc plugins.Accumulator) error { + words, err := shellquote.Split(c.Command) + if err != nil || len(words) == 0 { + return fmt.Errorf("exec: unable to parse command, %s", err) + } + + out, err := e.runner.Run(words[0], words[1:]...) + if err != nil { + return err + } + + jsonOut, err := simplejson.NewJson(out) + if err != nil { + return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s", c.Command, err) + } + + return processResponse(acc, c.Name, map[string]string{}, jsonOut.Interface()) +} + +func processResponse(acc plugins.Accumulator, prefix string, tags map[string]string, v interface{}) error { + switch t := v.(type) { + case map[string]interface{}: + for k, v := range t { + if err := processResponse(acc, prefix+"_"+k, tags, v); err != nil { + return err + } + } + case json.Number: + value, err := v.(json.Number).Float64() + if err != nil { + return err + } + acc.Add(prefix, value, tags) + case bool, string, []interface{}: + // ignored types + return nil + default: + return fmt.Errorf("exec: got unexpected type %T with value %v (%s)", t, v, prefix) + } + return nil +} + +func init() { + plugins.Add("exec", func() plugins.Plugin { + return NewExec() + }) +} diff --git a/plugins/exec/exec_test.go b/plugins/exec/exec_test.go new file mode 100644 index 000000000..051e5a3d5 --- /dev/null +++ b/plugins/exec/exec_test.go @@ -0,0 +1,88 @@ +package exec + +import ( + "fmt" + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" +) + +const validJson = ` +{ + "status": "green", + "num_processes": 82, + "cpu": { + "status": "red", + "used": 8234, + "free": 32 + }, + "percent": 0.81, + "users": [0, 1, 2, 3] +}` + +const malformedJson = ` +{ + "status": "green", +` + +type runnerMock struct { + out []byte + err error +} + +func newRunnerMock(out []byte, err error) Runner { + return &runnerMock{out: out, err: err} +} + +func (r runnerMock) Run(command string, args ...string) ([]byte, error) { + if r.err != nil { + return nil, r.err + } + return r.out, nil +} + +func TestExec(t *testing.T) { + runner := newRunnerMock([]byte(validJson), nil) + command := Command{Command: "testcommand arg1", Name: "mycollector"} + e := &Exec{runner: runner, Commands: []*Command{&command}} + + var acc testutil.Accumulator + err := e.Gather(&acc) + require.NoError(t, err) + + checkFloat := []struct { + name string + value float64 + }{ + {"mycollector_num_processes", 82}, + {"mycollector_cpu_used", 8234}, + {"mycollector_cpu_free", 32}, + {"mycollector_percent", 0.81}, + } + + for _, c := range checkFloat { + assert.True(t, acc.CheckValue(c.name, c.value)) + } + + assert.Equal(t, len(acc.Points), 4, "non-numeric measurements should be ignored") +} + +func TestExecMalformed(t *testing.T) { + runner := newRunnerMock([]byte(malformedJson), nil) + command := Command{Command: "badcommand arg1", Name: "mycollector"} + e := &Exec{runner: runner, Commands: []*Command{&command}} + + var acc testutil.Accumulator + err := e.Gather(&acc) + require.Error(t, err) +} + +func TestCommandError(t *testing.T) { + runner := newRunnerMock(nil, fmt.Errorf("exit status code 1")) + command := Command{Command: "badcommand", Name: "mycollector"} + e := &Exec{runner: runner, Commands: []*Command{&command}} + var acc testutil.Accumulator + err := e.Gather(&acc) + require.Error(t, err) +} From ad2e0bc4e3aba7eb87fd94aafcc6cae80bb9d366 Mon Sep 17 00:00:00 2001 From: Alvaro Morales Date: Thu, 6 Aug 2015 12:01:42 -0700 Subject: [PATCH 2/2] Remove simplejson dependency in exec plugin --- plugins/exec/exec.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/plugins/exec/exec.go b/plugins/exec/exec.go index a8f5bcb95..35637aa7f 100644 --- a/plugins/exec/exec.go +++ b/plugins/exec/exec.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/json" "fmt" - "github.com/bitly/go-simplejson" "github.com/gonuts/go-shellquote" "github.com/influxdb/telegraf/plugins" "os/exec" @@ -91,12 +90,13 @@ func (e *Exec) gatherCommand(c *Command, acc plugins.Accumulator) error { return err } - jsonOut, err := simplejson.NewJson(out) + var jsonOut interface{} + err = json.Unmarshal(out, &jsonOut) if err != nil { return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s", c.Command, err) } - return processResponse(acc, c.Name, map[string]string{}, jsonOut.Interface()) + return processResponse(acc, c.Name, map[string]string{}, jsonOut) } func processResponse(acc plugins.Accumulator, prefix string, tags map[string]string, v interface{}) error { @@ -107,12 +107,8 @@ func processResponse(acc plugins.Accumulator, prefix string, tags map[string]str return err } } - case json.Number: - value, err := v.(json.Number).Float64() - if err != nil { - return err - } - acc.Add(prefix, value, tags) + case float64: + acc.Add(prefix, v, tags) case bool, string, []interface{}: // ignored types return nil