fix issue with execd-multiline influx line protocol (#7463)

This commit is contained in:
Steven Soroka 2020-05-05 17:43:45 -04:00 committed by GitHub
parent f25936b796
commit 022ff63d29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 77 additions and 0 deletions

View File

@ -15,6 +15,7 @@ import (
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
) )
const sampleConfig = ` const sampleConfig = `
@ -197,6 +198,12 @@ func (e *Execd) cmdWait() error {
} }
func (e *Execd) cmdReadOut(out io.Reader) { func (e *Execd) cmdReadOut(out io.Reader) {
if _, isInfluxParser := e.parser.(*influx.Parser); isInfluxParser {
// work around the lack of built-in streaming parser. :(
e.cmdReadOutStream(out)
return
}
scanner := bufio.NewScanner(out) scanner := bufio.NewScanner(out)
for scanner.Scan() { for scanner.Scan() {
@ -215,6 +222,29 @@ func (e *Execd) cmdReadOut(out io.Reader) {
} }
} }
func (e *Execd) cmdReadOutStream(out io.Reader) {
parser := influx.NewStreamParser(out)
for {
metric, err := parser.Next()
if err != nil {
if err == influx.EOF {
break // stream ended
}
if parseErr, isParseError := err.(*influx.ParseError); isParseError {
// parse error.
e.acc.AddError(parseErr)
continue
}
// some non-recoverable error?
e.acc.AddError(err)
return
}
e.acc.AddMetric(metric)
}
}
func (e *Execd) cmdReadErr(out io.Reader) { func (e *Execd) cmdReadErr(out io.Reader) {
scanner := bufio.NewScanner(out) scanner := bufio.NewScanner(out)

View File

@ -3,6 +3,8 @@
package execd package execd
import ( import (
"fmt"
"strings"
"testing" "testing"
"time" "time"
@ -47,6 +49,51 @@ func TestExternalInputWorks(t *testing.T) {
e.Gather(acc) e.Gather(acc)
} }
func TestParsesLinesContainingNewline(t *testing.T) {
parser, err := parsers.NewInfluxParser()
require.NoError(t, err)
metrics := make(chan telegraf.Metric, 10)
defer close(metrics)
acc := agent.NewAccumulator(&TestMetricMaker{}, metrics)
e := &Execd{
Command: []string{shell(), fileShellScriptPath()},
RestartDelay: config.Duration(5 * time.Second),
parser: parser,
Signal: "STDIN",
acc: acc,
}
cases := []struct {
Name string
Value string
}{
{
Name: "no-newline",
Value: "my message",
}, {
Name: "newline",
Value: "my\nmessage",
},
}
for _, test := range cases {
t.Run(test.Name, func(t *testing.T) {
line := fmt.Sprintf("event message=\"%v\" 1587128639239000000", test.Value)
e.cmdReadOut(strings.NewReader(line))
m := readChanWithTimeout(t, metrics, 1*time.Second)
require.Equal(t, "event", m.Name())
val, ok := m.GetField("message")
require.True(t, ok)
require.Equal(t, test.Value, val)
})
}
}
func readChanWithTimeout(t *testing.T, metrics chan telegraf.Metric, timeout time.Duration) telegraf.Metric { func readChanWithTimeout(t *testing.T, metrics chan telegraf.Metric, timeout time.Duration) telegraf.Metric {
to := time.NewTimer(timeout) to := time.NewTimer(timeout)
defer to.Stop() defer to.Stop()