diff --git a/plugins/inputs/tail/tail.go b/plugins/inputs/tail/tail.go index 245010764..da5b81a60 100644 --- a/plugins/inputs/tail/tail.go +++ b/plugins/inputs/tail/tail.go @@ -9,11 +9,11 @@ import ( "sync" "github.com/influxdata/tail" - "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/globpath" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/csv" ) const ( @@ -172,55 +172,64 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error { // create a goroutine for each "tailer" t.wg.Add(1) - go t.receiver(parser, tailer) + go func() { + defer t.wg.Done() + t.receiver(parser, tailer) + }() t.tailers[tailer.Filename] = tailer } } return nil } -// this is launched as a goroutine to continuously watch a tailed logfile +// ParseLine parses a line of text. +func parseLine(parser parsers.Parser, line string, firstLine bool) ([]telegraf.Metric, error) { + switch parser.(type) { + case *csv.Parser: + // The csv parser parses headers in Parse and skips them in ParseLine. + // As a temporary solution call Parse only when getting the first + // line from the file. + if firstLine { + return parser.Parse([]byte(line)) + } else { + m, err := parser.ParseLine(line) + if err != nil { + return nil, err + } + + if m != nil { + return []telegraf.Metric{m}, nil + } + return []telegraf.Metric{}, nil + } + default: + return parser.Parse([]byte(line)) + } +} + +// Receiver is launched as a goroutine to continuously watch a tailed logfile // for changes, parse any incoming msgs, and add to the accumulator. func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) { - defer t.wg.Done() - var firstLine = true - var metrics []telegraf.Metric - var m telegraf.Metric - var err error - var line *tail.Line - for line = range tailer.Lines { + for line := range tailer.Lines { if line.Err != nil { - t.acc.AddError(fmt.Errorf("error tailing file %s, Error: %s", tailer.Filename, err)) + t.acc.AddError(fmt.Errorf("error tailing file %s, Error: %s", tailer.Filename, line.Err)) continue } // Fix up files with Windows line endings. text := strings.TrimRight(line.Text, "\r") - if firstLine { - metrics, err = parser.Parse([]byte(text)) - if err == nil { - if len(metrics) == 0 { - firstLine = false - continue - } else { - m = metrics[0] - } - } - firstLine = false - } else { - m, err = parser.ParseLine(text) - } - - if err == nil { - if m != nil { - tags := m.Tags() - tags["path"] = tailer.Filename - t.acc.AddFields(m.Name(), m.Fields(), tags, m.Time()) - } - } else { + metrics, err := parseLine(parser, text, firstLine) + if err != nil { t.acc.AddError(fmt.Errorf("malformed log line in %s: [%s], Error: %s", tailer.Filename, line.Text, err)) + continue + } + firstLine = false + + for _, metric := range metrics { + metric.AddTag("path", tailer.Filename) + t.acc.AddMetric(metric) } } diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go index fb5e05a76..41db76cac 100644 --- a/plugins/inputs/tail/tail_test.go +++ b/plugins/inputs/tail/tail_test.go @@ -5,10 +5,13 @@ import ( "os" "runtime" "testing" + "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/csv" + "github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -139,3 +142,117 @@ func TestTailDosLineendings(t *testing.T) { "usage_idle": float64(200), }) } + +// The csv parser should only parse the header line once per file. +func TestCSVHeadersParsedOnce(t *testing.T) { + tmpfile, err := ioutil.TempFile("", "") + require.NoError(t, err) + defer func() { + tmpfile.Close() + os.Remove(tmpfile.Name()) + }() + + _, err = tmpfile.WriteString(` +measurement,time_idle +cpu,42 +cpu,42 +`) + require.NoError(t, err) + + plugin := NewTail() + plugin.FromBeginning = true + plugin.Files = []string{tmpfile.Name()} + plugin.SetParserFunc(func() (parsers.Parser, error) { + return &csv.Parser{ + MeasurementColumn: "measurement", + HeaderRowCount: 1, + TimeFunc: func() time.Time { return time.Unix(0, 0) }, + }, nil + }) + defer plugin.Stop() + + acc := testutil.Accumulator{} + err = plugin.Start(&acc) + require.NoError(t, err) + err = plugin.Gather(&acc) + require.NoError(t, err) + acc.Wait(2) + plugin.Stop() + + expected := []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "path": tmpfile.Name(), + }, + map[string]interface{}{ + "time_idle": 42, + "measurement": "cpu", + }, + time.Unix(0, 0)), + testutil.MustMetric("cpu", + map[string]string{ + "path": tmpfile.Name(), + }, + map[string]interface{}{ + "time_idle": 42, + "measurement": "cpu", + }, + time.Unix(0, 0)), + } + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) +} + +// Ensure that the first line can produce multiple metrics (#6138) +func TestMultipleMetricsOnFirstLine(t *testing.T) { + tmpfile, err := ioutil.TempFile("", "") + require.NoError(t, err) + defer func() { + tmpfile.Close() + os.Remove(tmpfile.Name()) + }() + + _, err = tmpfile.WriteString(` +[{"time_idle": 42}, {"time_idle": 42}] +`) + require.NoError(t, err) + + plugin := NewTail() + plugin.FromBeginning = true + plugin.Files = []string{tmpfile.Name()} + plugin.SetParserFunc(func() (parsers.Parser, error) { + return json.New( + &json.Config{ + MetricName: "cpu", + }) + }) + defer plugin.Stop() + + acc := testutil.Accumulator{} + err = plugin.Start(&acc) + require.NoError(t, err) + err = plugin.Gather(&acc) + require.NoError(t, err) + acc.Wait(2) + plugin.Stop() + + expected := []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "path": tmpfile.Name(), + }, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0)), + testutil.MustMetric("cpu", + map[string]string{ + "path": tmpfile.Name(), + }, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0)), + } + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), + testutil.IgnoreTime()) +}