From 022ff63d29c30709c1bcd4efad1db80059775e47 Mon Sep 17 00:00:00 2001 From: Steven Soroka Date: Tue, 5 May 2020 17:43:45 -0400 Subject: [PATCH] fix issue with execd-multiline influx line protocol (#7463) --- plugins/inputs/execd/execd.go | 30 +++++++++++++++++++ plugins/inputs/execd/execd_test.go | 47 ++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/plugins/inputs/execd/execd.go b/plugins/inputs/execd/execd.go index b162c9776..1ea136a3d 100644 --- a/plugins/inputs/execd/execd.go +++ b/plugins/inputs/execd/execd.go @@ -15,6 +15,7 @@ import ( "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/influx" ) const sampleConfig = ` @@ -197,6 +198,12 @@ func (e *Execd) cmdWait() error { } func (e *Execd) cmdReadOut(out io.Reader) { + if _, isInfluxParser := e.parser.(*influx.Parser); isInfluxParser { + // work around the lack of built-in streaming parser. :( + e.cmdReadOutStream(out) + return + } + scanner := bufio.NewScanner(out) for scanner.Scan() { @@ -215,6 +222,29 @@ func (e *Execd) cmdReadOut(out io.Reader) { } } +func (e *Execd) cmdReadOutStream(out io.Reader) { + parser := influx.NewStreamParser(out) + + for { + metric, err := parser.Next() + if err != nil { + if err == influx.EOF { + break // stream ended + } + if parseErr, isParseError := err.(*influx.ParseError); isParseError { + // parse error. + e.acc.AddError(parseErr) + continue + } + // some non-recoverable error? + e.acc.AddError(err) + return + } + + e.acc.AddMetric(metric) + } +} + func (e *Execd) cmdReadErr(out io.Reader) { scanner := bufio.NewScanner(out) diff --git a/plugins/inputs/execd/execd_test.go b/plugins/inputs/execd/execd_test.go index 1c687a9df..52c0a214b 100644 --- a/plugins/inputs/execd/execd_test.go +++ b/plugins/inputs/execd/execd_test.go @@ -3,6 +3,8 @@ package execd import ( + "fmt" + "strings" "testing" "time" @@ -47,6 +49,51 @@ func TestExternalInputWorks(t *testing.T) { e.Gather(acc) } +func TestParsesLinesContainingNewline(t *testing.T) { + parser, err := parsers.NewInfluxParser() + require.NoError(t, err) + + metrics := make(chan telegraf.Metric, 10) + defer close(metrics) + acc := agent.NewAccumulator(&TestMetricMaker{}, metrics) + + e := &Execd{ + Command: []string{shell(), fileShellScriptPath()}, + RestartDelay: config.Duration(5 * time.Second), + parser: parser, + Signal: "STDIN", + acc: acc, + } + + cases := []struct { + Name string + Value string + }{ + { + Name: "no-newline", + Value: "my message", + }, { + Name: "newline", + Value: "my\nmessage", + }, + } + + for _, test := range cases { + t.Run(test.Name, func(t *testing.T) { + line := fmt.Sprintf("event message=\"%v\" 1587128639239000000", test.Value) + + e.cmdReadOut(strings.NewReader(line)) + + m := readChanWithTimeout(t, metrics, 1*time.Second) + + require.Equal(t, "event", m.Name()) + val, ok := m.GetField("message") + require.True(t, ok) + require.Equal(t, test.Value, val) + }) + } +} + func readChanWithTimeout(t *testing.T, metrics chan telegraf.Metric, timeout time.Duration) telegraf.Metric { to := time.NewTimer(timeout) defer to.Stop()