Add exec plugin
This commit is contained in:
		
							parent
							
								
									91f6c4b740
								
							
						
					
					
						commit
						090c0a60fa
					
				|  | @ -3,6 +3,7 @@ package all | ||||||
| import ( | import ( | ||||||
| 	_ "github.com/influxdb/telegraf/plugins/disque" | 	_ "github.com/influxdb/telegraf/plugins/disque" | ||||||
| 	_ "github.com/influxdb/telegraf/plugins/elasticsearch" | 	_ "github.com/influxdb/telegraf/plugins/elasticsearch" | ||||||
|  | 	_ "github.com/influxdb/telegraf/plugins/exec" | ||||||
| 	_ "github.com/influxdb/telegraf/plugins/haproxy" | 	_ "github.com/influxdb/telegraf/plugins/haproxy" | ||||||
| 	_ "github.com/influxdb/telegraf/plugins/kafka_consumer" | 	_ "github.com/influxdb/telegraf/plugins/kafka_consumer" | ||||||
| 	_ "github.com/influxdb/telegraf/plugins/lustre2" | 	_ "github.com/influxdb/telegraf/plugins/lustre2" | ||||||
|  |  | ||||||
|  | @ -0,0 +1,129 @@ | ||||||
|  | package exec | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"bytes" | ||||||
|  | 	"encoding/json" | ||||||
|  | 	"fmt" | ||||||
|  | 	"github.com/bitly/go-simplejson" | ||||||
|  | 	"github.com/gonuts/go-shellquote" | ||||||
|  | 	"github.com/influxdb/telegraf/plugins" | ||||||
|  | 	"os/exec" | ||||||
|  | 	"sync" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | const sampleConfig = ` | ||||||
|  | # specify commands via an array of tables | ||||||
|  | [[exec.commands]] | ||||||
|  | # the command to run | ||||||
|  | command = "/usr/bin/mycollector --foo=bar" | ||||||
|  | 
 | ||||||
|  | # name of the command (used as a prefix for measurements) | ||||||
|  | name = "mycollector" | ||||||
|  | ` | ||||||
|  | 
 | ||||||
|  | type Command struct { | ||||||
|  | 	Command string | ||||||
|  | 	Name    string | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type Exec struct { | ||||||
|  | 	Commands []*Command | ||||||
|  | 	runner   Runner | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type Runner interface { | ||||||
|  | 	Run(string, ...string) ([]byte, error) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type CommandRunner struct { | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func NewExec() *Exec { | ||||||
|  | 	return &Exec{runner: CommandRunner{}} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c CommandRunner) Run(command string, args ...string) ([]byte, error) { | ||||||
|  | 	cmd := exec.Command(command, args...) | ||||||
|  | 	var out bytes.Buffer | ||||||
|  | 	cmd.Stdout = &out | ||||||
|  | 
 | ||||||
|  | 	if err := cmd.Run(); err != nil { | ||||||
|  | 		return nil, fmt.Errorf("exec: %s for command '%s'", err, command) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return out.Bytes(), nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (e *Exec) SampleConfig() string { | ||||||
|  | 	return sampleConfig | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (e *Exec) Description() string { | ||||||
|  | 	return "Read flattened metrics from one or more commands that output JSON to stdout" | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (e *Exec) Gather(acc plugins.Accumulator) error { | ||||||
|  | 	var wg sync.WaitGroup | ||||||
|  | 
 | ||||||
|  | 	var outerr error | ||||||
|  | 
 | ||||||
|  | 	for _, c := range e.Commands { | ||||||
|  | 		wg.Add(1) | ||||||
|  | 		go func(c *Command, acc plugins.Accumulator) { | ||||||
|  | 			defer wg.Done() | ||||||
|  | 			outerr = e.gatherCommand(c, acc) | ||||||
|  | 		}(c, acc) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	wg.Wait() | ||||||
|  | 
 | ||||||
|  | 	return outerr | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (e *Exec) gatherCommand(c *Command, acc plugins.Accumulator) error { | ||||||
|  | 	words, err := shellquote.Split(c.Command) | ||||||
|  | 	if err != nil || len(words) == 0 { | ||||||
|  | 		return fmt.Errorf("exec: unable to parse command, %s", err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	out, err := e.runner.Run(words[0], words[1:]...) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	jsonOut, err := simplejson.NewJson(out) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s", c.Command, err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return processResponse(acc, c.Name, map[string]string{}, jsonOut.Interface()) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func processResponse(acc plugins.Accumulator, prefix string, tags map[string]string, v interface{}) error { | ||||||
|  | 	switch t := v.(type) { | ||||||
|  | 	case map[string]interface{}: | ||||||
|  | 		for k, v := range t { | ||||||
|  | 			if err := processResponse(acc, prefix+"_"+k, tags, v); err != nil { | ||||||
|  | 				return err | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	case json.Number: | ||||||
|  | 		value, err := v.(json.Number).Float64() | ||||||
|  | 		if err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 		acc.Add(prefix, value, tags) | ||||||
|  | 	case bool, string, []interface{}: | ||||||
|  | 		// ignored types
 | ||||||
|  | 		return nil | ||||||
|  | 	default: | ||||||
|  | 		return fmt.Errorf("exec: got unexpected type %T with value %v (%s)", t, v, prefix) | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func init() { | ||||||
|  | 	plugins.Add("exec", func() plugins.Plugin { | ||||||
|  | 		return NewExec() | ||||||
|  | 	}) | ||||||
|  | } | ||||||
|  | @ -0,0 +1,88 @@ | ||||||
|  | package exec | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"github.com/influxdb/telegraf/testutil" | ||||||
|  | 	"github.com/stretchr/testify/assert" | ||||||
|  | 	"github.com/stretchr/testify/require" | ||||||
|  | 	"testing" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | const validJson = ` | ||||||
|  | { | ||||||
|  |     "status": "green", | ||||||
|  |     "num_processes": 82, | ||||||
|  |     "cpu": { | ||||||
|  |         "status": "red", | ||||||
|  |         "used": 8234, | ||||||
|  |         "free": 32 | ||||||
|  |     }, | ||||||
|  |     "percent": 0.81, | ||||||
|  |     "users": [0, 1, 2, 3] | ||||||
|  | }` | ||||||
|  | 
 | ||||||
|  | const malformedJson = ` | ||||||
|  | { | ||||||
|  |     "status": "green", | ||||||
|  | ` | ||||||
|  | 
 | ||||||
|  | type runnerMock struct { | ||||||
|  | 	out []byte | ||||||
|  | 	err error | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func newRunnerMock(out []byte, err error) Runner { | ||||||
|  | 	return &runnerMock{out: out, err: err} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (r runnerMock) Run(command string, args ...string) ([]byte, error) { | ||||||
|  | 	if r.err != nil { | ||||||
|  | 		return nil, r.err | ||||||
|  | 	} | ||||||
|  | 	return r.out, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestExec(t *testing.T) { | ||||||
|  | 	runner := newRunnerMock([]byte(validJson), nil) | ||||||
|  | 	command := Command{Command: "testcommand arg1", Name: "mycollector"} | ||||||
|  | 	e := &Exec{runner: runner, Commands: []*Command{&command}} | ||||||
|  | 
 | ||||||
|  | 	var acc testutil.Accumulator | ||||||
|  | 	err := e.Gather(&acc) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  | 
 | ||||||
|  | 	checkFloat := []struct { | ||||||
|  | 		name  string | ||||||
|  | 		value float64 | ||||||
|  | 	}{ | ||||||
|  | 		{"mycollector_num_processes", 82}, | ||||||
|  | 		{"mycollector_cpu_used", 8234}, | ||||||
|  | 		{"mycollector_cpu_free", 32}, | ||||||
|  | 		{"mycollector_percent", 0.81}, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for _, c := range checkFloat { | ||||||
|  | 		assert.True(t, acc.CheckValue(c.name, c.value)) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	assert.Equal(t, len(acc.Points), 4, "non-numeric measurements should be ignored") | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestExecMalformed(t *testing.T) { | ||||||
|  | 	runner := newRunnerMock([]byte(malformedJson), nil) | ||||||
|  | 	command := Command{Command: "badcommand arg1", Name: "mycollector"} | ||||||
|  | 	e := &Exec{runner: runner, Commands: []*Command{&command}} | ||||||
|  | 
 | ||||||
|  | 	var acc testutil.Accumulator | ||||||
|  | 	err := e.Gather(&acc) | ||||||
|  | 	require.Error(t, err) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestCommandError(t *testing.T) { | ||||||
|  | 	runner := newRunnerMock(nil, fmt.Errorf("exit status code 1")) | ||||||
|  | 	command := Command{Command: "badcommand", Name: "mycollector"} | ||||||
|  | 	e := &Exec{runner: runner, Commands: []*Command{&command}} | ||||||
|  | 	var acc testutil.Accumulator | ||||||
|  | 	err := e.Gather(&acc) | ||||||
|  | 	require.Error(t, err) | ||||||
|  | } | ||||||
		Loading…
	
		Reference in New Issue