diff --git a/README.md b/README.md index 6587ec3bc..7e5f18954 100644 --- a/README.md +++ b/README.md @@ -367,6 +367,7 @@ For documentation on the latest development code see the [documentation index][d * [datadog](./plugins/outputs/datadog) * [discard](./plugins/outputs/discard) * [elasticsearch](./plugins/outputs/elasticsearch) +* [exec](./plugins/output/exec) * [file](./plugins/outputs/file) * [graphite](./plugins/outputs/graphite) * [graylog](./plugins/outputs/graylog) diff --git a/internal/internal.go b/internal/internal.go index a38f7703a..6f135938a 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -230,17 +230,17 @@ func WaitTimeout(c *exec.Cmd, timeout time.Duration) error { timer := time.AfterFunc(timeout, func() { err := c.Process.Kill() if err != nil { - log.Printf("E! FATAL error killing process: %s", err) + log.Printf("E! [agent] Error killing process: %s", err) return } }) err := c.Wait() - isTimeout := timer.Stop() + if err == nil { + return nil + } - if err != nil { - return err - } else if isTimeout == false { + if !timer.Stop() { return TimeoutErr } diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index f9dd73c44..e40230993 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -11,6 +11,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/datadog" _ "github.com/influxdata/telegraf/plugins/outputs/discard" _ "github.com/influxdata/telegraf/plugins/outputs/elasticsearch" + _ "github.com/influxdata/telegraf/plugins/outputs/exec" _ "github.com/influxdata/telegraf/plugins/outputs/file" _ "github.com/influxdata/telegraf/plugins/outputs/graphite" _ "github.com/influxdata/telegraf/plugins/outputs/graylog" diff --git a/plugins/outputs/exec/README.md b/plugins/outputs/exec/README.md new file mode 100644 index 000000000..2b0c2d3f1 --- /dev/null +++ b/plugins/outputs/exec/README.md @@ -0,0 +1,26 @@ +# Exec Output Plugin + +This plugin sends telegraf metrics to an external application over stdin. + +The command should be defined similar to docker's `exec` form: + + ["executable", "param1", "param2"] + +On non-zero exit stderr will be logged at error level. + +### Configuration + +```toml +[[outputs.exec]] + ## Command to injest metrics via stdin. + command = ["tee", "-a", "/dev/null"] + + ## Timeout for command to complete. + # timeout = "5s" + + ## Data format to output. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + # data_format = "influx" +``` diff --git a/plugins/outputs/exec/exec.go b/plugins/outputs/exec/exec.go new file mode 100644 index 000000000..583646bb5 --- /dev/null +++ b/plugins/outputs/exec/exec.go @@ -0,0 +1,153 @@ +package exec + +import ( + "bytes" + "fmt" + "io" + "log" + "os/exec" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/serializers" +) + +const maxStderrBytes = 512 + +// Exec defines the exec output plugin. +type Exec struct { + Command []string `toml:"command"` + Timeout internal.Duration `toml:"timeout"` + + runner Runner + serializer serializers.Serializer +} + +var sampleConfig = ` + ## Command to injest metrics via stdin. + command = ["tee", "-a", "/dev/null"] + + ## Timeout for command to complete. + # timeout = "5s" + + ## Data format to output. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + # data_format = "influx" +` + +// SetSerializer sets the serializer for the output. +func (e *Exec) SetSerializer(serializer serializers.Serializer) { + e.serializer = serializer +} + +// Connect satisfies the Ouput interface. +func (e *Exec) Connect() error { + return nil +} + +// Close satisfies the Ouput interface. +func (e *Exec) Close() error { + return nil +} + +// Description describes the plugin. +func (e *Exec) Description() string { + return "Send metrics to command as input over stdin" +} + +// SampleConfig returns a sample configuration. +func (e *Exec) SampleConfig() string { + return sampleConfig +} + +// Write writes the metrics to the configured command. +func (e *Exec) Write(metrics []telegraf.Metric) error { + var buffer bytes.Buffer + for _, metric := range metrics { + value, err := e.serializer.Serialize(metric) + if err != nil { + return err + } + buffer.Write(value) + } + + if buffer.Len() <= 0 { + return nil + } + + return e.runner.Run(e.Timeout.Duration, e.Command, &buffer) +} + +// Runner provides an interface for running exec.Cmd. +type Runner interface { + Run(time.Duration, []string, io.Reader) error +} + +// CommandRunner runs a command with the ability to kill the process before the timeout. +type CommandRunner struct { + cmd *exec.Cmd +} + +// Run runs the command. +func (c *CommandRunner) Run(timeout time.Duration, command []string, buffer io.Reader) error { + cmd := exec.Command(command[0], command[1:]...) + cmd.Stdin = buffer + var stderr bytes.Buffer + cmd.Stderr = &stderr + + err := internal.RunTimeout(cmd, timeout) + s := stderr + + if err != nil { + if err == internal.TimeoutErr { + return fmt.Errorf("%q timed out and was killed", command) + } + + if s.Len() > 0 { + log.Printf("E! [outputs.exec] Command error: %q", truncate(s)) + } + + if status, ok := internal.ExitStatus(err); ok { + return fmt.Errorf("%q exited %d with %s", command, status, err.Error()) + } + + return fmt.Errorf("%q failed with %s", command, err.Error()) + } + + c.cmd = cmd + + return nil +} + +func truncate(buf bytes.Buffer) string { + // Limit the number of bytes. + didTruncate := false + if buf.Len() > maxStderrBytes { + buf.Truncate(maxStderrBytes) + didTruncate = true + } + if i := bytes.IndexByte(buf.Bytes(), '\n'); i > 0 { + // Only show truncation if the newline wasn't the last character. + if i < buf.Len()-1 { + didTruncate = true + } + buf.Truncate(i) + } + if didTruncate { + buf.WriteString("...") + } + return buf.String() +} + +func init() { + outputs.Add("exec", func() telegraf.Output { + return &Exec{ + runner: &CommandRunner{}, + Timeout: internal.Duration{Duration: time.Second * 5}, + } + }) +} diff --git a/plugins/outputs/exec/exec_test.go b/plugins/outputs/exec/exec_test.go new file mode 100644 index 000000000..850ba7328 --- /dev/null +++ b/plugins/outputs/exec/exec_test.go @@ -0,0 +1,105 @@ +package exec + +import ( + "bytes" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/serializers" + "github.com/influxdata/telegraf/testutil" +) + +func TestExec(t *testing.T) { + if testing.Short() { + t.Skip("Skipping test due to OS/executable dependencies") + } + + tests := []struct { + name string + command []string + err bool + metrics []telegraf.Metric + }{ + { + name: "test success", + command: []string{"tee"}, + err: false, + metrics: testutil.MockMetrics(), + }, + { + name: "test doesn't accept stdin", + command: []string{"sleep", "5s"}, + err: true, + metrics: testutil.MockMetrics(), + }, + { + name: "test command not found", + command: []string{"/no/exist", "-h"}, + err: true, + metrics: testutil.MockMetrics(), + }, + { + name: "test no metrics output", + command: []string{"tee"}, + err: false, + metrics: []telegraf.Metric{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := &Exec{ + Command: tt.command, + Timeout: internal.Duration{Duration: time.Second}, + runner: &CommandRunner{}, + } + + s, _ := serializers.NewInfluxSerializer() + e.SetSerializer(s) + + e.Connect() + + require.Equal(t, tt.err, e.Write(tt.metrics) != nil) + }) + } +} + +func TestTruncate(t *testing.T) { + tests := []struct { + name string + buf *bytes.Buffer + len int + }{ + { + name: "long out", + buf: bytes.NewBufferString(strings.Repeat("a", maxStderrBytes+100)), + len: maxStderrBytes + len("..."), + }, + { + name: "multiline out", + buf: bytes.NewBufferString("hola\ngato\n"), + len: len("hola") + len("..."), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := truncate(*tt.buf) + require.Equal(t, tt.len, len(s)) + }) + } +} + +func TestExecDocs(t *testing.T) { + e := &Exec{} + e.Description() + e.SampleConfig() + require.NoError(t, e.Close()) + + e = &Exec{runner: &CommandRunner{}} + require.NoError(t, e.Close()) +}