Fix parsing multiple metrics on the first line of tailed file (#6289)
This commit is contained in:
parent
10671d2641
commit
8c2b3addd3
|
@ -9,11 +9,11 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/influxdata/tail"
|
"github.com/influxdata/tail"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/internal/globpath"
|
"github.com/influxdata/telegraf/internal/globpath"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers"
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
|
"github.com/influxdata/telegraf/plugins/parsers/csv"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -172,55 +172,64 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {
|
||||||
|
|
||||||
// create a goroutine for each "tailer"
|
// create a goroutine for each "tailer"
|
||||||
t.wg.Add(1)
|
t.wg.Add(1)
|
||||||
go t.receiver(parser, tailer)
|
go func() {
|
||||||
|
defer t.wg.Done()
|
||||||
|
t.receiver(parser, tailer)
|
||||||
|
}()
|
||||||
t.tailers[tailer.Filename] = tailer
|
t.tailers[tailer.Filename] = tailer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// this is launched as a goroutine to continuously watch a tailed logfile
|
// ParseLine parses a line of text.
|
||||||
|
func parseLine(parser parsers.Parser, line string, firstLine bool) ([]telegraf.Metric, error) {
|
||||||
|
switch parser.(type) {
|
||||||
|
case *csv.Parser:
|
||||||
|
// The csv parser parses headers in Parse and skips them in ParseLine.
|
||||||
|
// As a temporary solution call Parse only when getting the first
|
||||||
|
// line from the file.
|
||||||
|
if firstLine {
|
||||||
|
return parser.Parse([]byte(line))
|
||||||
|
} else {
|
||||||
|
m, err := parser.ParseLine(line)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if m != nil {
|
||||||
|
return []telegraf.Metric{m}, nil
|
||||||
|
}
|
||||||
|
return []telegraf.Metric{}, nil
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return parser.Parse([]byte(line))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Receiver is launched as a goroutine to continuously watch a tailed logfile
|
||||||
// for changes, parse any incoming msgs, and add to the accumulator.
|
// for changes, parse any incoming msgs, and add to the accumulator.
|
||||||
func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) {
|
func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) {
|
||||||
defer t.wg.Done()
|
|
||||||
|
|
||||||
var firstLine = true
|
var firstLine = true
|
||||||
var metrics []telegraf.Metric
|
for line := range tailer.Lines {
|
||||||
var m telegraf.Metric
|
|
||||||
var err error
|
|
||||||
var line *tail.Line
|
|
||||||
for line = range tailer.Lines {
|
|
||||||
if line.Err != nil {
|
if line.Err != nil {
|
||||||
t.acc.AddError(fmt.Errorf("error tailing file %s, Error: %s", tailer.Filename, err))
|
t.acc.AddError(fmt.Errorf("error tailing file %s, Error: %s", tailer.Filename, line.Err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Fix up files with Windows line endings.
|
// Fix up files with Windows line endings.
|
||||||
text := strings.TrimRight(line.Text, "\r")
|
text := strings.TrimRight(line.Text, "\r")
|
||||||
|
|
||||||
if firstLine {
|
metrics, err := parseLine(parser, text, firstLine)
|
||||||
metrics, err = parser.Parse([]byte(text))
|
if err != nil {
|
||||||
if err == nil {
|
|
||||||
if len(metrics) == 0 {
|
|
||||||
firstLine = false
|
|
||||||
continue
|
|
||||||
} else {
|
|
||||||
m = metrics[0]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
firstLine = false
|
|
||||||
} else {
|
|
||||||
m, err = parser.ParseLine(text)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err == nil {
|
|
||||||
if m != nil {
|
|
||||||
tags := m.Tags()
|
|
||||||
tags["path"] = tailer.Filename
|
|
||||||
t.acc.AddFields(m.Name(), m.Fields(), tags, m.Time())
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
t.acc.AddError(fmt.Errorf("malformed log line in %s: [%s], Error: %s",
|
t.acc.AddError(fmt.Errorf("malformed log line in %s: [%s], Error: %s",
|
||||||
tailer.Filename, line.Text, err))
|
tailer.Filename, line.Text, err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
firstLine = false
|
||||||
|
|
||||||
|
for _, metric := range metrics {
|
||||||
|
metric.AddTag("path", tailer.Filename)
|
||||||
|
t.acc.AddMetric(metric)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,10 +5,13 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers"
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
|
"github.com/influxdata/telegraf/plugins/parsers/csv"
|
||||||
|
"github.com/influxdata/telegraf/plugins/parsers/json"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
@ -139,3 +142,117 @@ func TestTailDosLineendings(t *testing.T) {
|
||||||
"usage_idle": float64(200),
|
"usage_idle": float64(200),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The csv parser should only parse the header line once per file.
|
||||||
|
func TestCSVHeadersParsedOnce(t *testing.T) {
|
||||||
|
tmpfile, err := ioutil.TempFile("", "")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() {
|
||||||
|
tmpfile.Close()
|
||||||
|
os.Remove(tmpfile.Name())
|
||||||
|
}()
|
||||||
|
|
||||||
|
_, err = tmpfile.WriteString(`
|
||||||
|
measurement,time_idle
|
||||||
|
cpu,42
|
||||||
|
cpu,42
|
||||||
|
`)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
plugin := NewTail()
|
||||||
|
plugin.FromBeginning = true
|
||||||
|
plugin.Files = []string{tmpfile.Name()}
|
||||||
|
plugin.SetParserFunc(func() (parsers.Parser, error) {
|
||||||
|
return &csv.Parser{
|
||||||
|
MeasurementColumn: "measurement",
|
||||||
|
HeaderRowCount: 1,
|
||||||
|
TimeFunc: func() time.Time { return time.Unix(0, 0) },
|
||||||
|
}, nil
|
||||||
|
})
|
||||||
|
defer plugin.Stop()
|
||||||
|
|
||||||
|
acc := testutil.Accumulator{}
|
||||||
|
err = plugin.Start(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = plugin.Gather(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
acc.Wait(2)
|
||||||
|
plugin.Stop()
|
||||||
|
|
||||||
|
expected := []telegraf.Metric{
|
||||||
|
testutil.MustMetric("cpu",
|
||||||
|
map[string]string{
|
||||||
|
"path": tmpfile.Name(),
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"time_idle": 42,
|
||||||
|
"measurement": "cpu",
|
||||||
|
},
|
||||||
|
time.Unix(0, 0)),
|
||||||
|
testutil.MustMetric("cpu",
|
||||||
|
map[string]string{
|
||||||
|
"path": tmpfile.Name(),
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"time_idle": 42,
|
||||||
|
"measurement": "cpu",
|
||||||
|
},
|
||||||
|
time.Unix(0, 0)),
|
||||||
|
}
|
||||||
|
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure that the first line can produce multiple metrics (#6138)
|
||||||
|
func TestMultipleMetricsOnFirstLine(t *testing.T) {
|
||||||
|
tmpfile, err := ioutil.TempFile("", "")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() {
|
||||||
|
tmpfile.Close()
|
||||||
|
os.Remove(tmpfile.Name())
|
||||||
|
}()
|
||||||
|
|
||||||
|
_, err = tmpfile.WriteString(`
|
||||||
|
[{"time_idle": 42}, {"time_idle": 42}]
|
||||||
|
`)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
plugin := NewTail()
|
||||||
|
plugin.FromBeginning = true
|
||||||
|
plugin.Files = []string{tmpfile.Name()}
|
||||||
|
plugin.SetParserFunc(func() (parsers.Parser, error) {
|
||||||
|
return json.New(
|
||||||
|
&json.Config{
|
||||||
|
MetricName: "cpu",
|
||||||
|
})
|
||||||
|
})
|
||||||
|
defer plugin.Stop()
|
||||||
|
|
||||||
|
acc := testutil.Accumulator{}
|
||||||
|
err = plugin.Start(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = plugin.Gather(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
acc.Wait(2)
|
||||||
|
plugin.Stop()
|
||||||
|
|
||||||
|
expected := []telegraf.Metric{
|
||||||
|
testutil.MustMetric("cpu",
|
||||||
|
map[string]string{
|
||||||
|
"path": tmpfile.Name(),
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"time_idle": 42.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0)),
|
||||||
|
testutil.MustMetric("cpu",
|
||||||
|
map[string]string{
|
||||||
|
"path": tmpfile.Name(),
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"time_idle": 42.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0)),
|
||||||
|
}
|
||||||
|
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(),
|
||||||
|
testutil.IgnoreTime())
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue