From b5299f4cc457e4984c54428312a9e93060e6fcaf Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 18 Sep 2018 09:23:45 -0700 Subject: [PATCH] Fix cleanup of csv parser options, use per file parser (#4712) --- internal/config/config.go | 42 ++++++++++++++++------ plugins/inputs/logparser/logparser.go | 6 ++-- plugins/inputs/tail/tail.go | 51 +++++++++++++++++++++------ plugins/inputs/tail/tail_test.go | 12 +++---- plugins/parsers/csv/parser.go | 7 +++- plugins/parsers/csv/parser_test.go | 45 +++++++++++++++++++++++ plugins/parsers/registry.go | 50 +++++++++++++++----------- 7 files changed, 160 insertions(+), 53 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index 2208268d2..c613244fd 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -855,6 +855,17 @@ func (c *Config) addInput(name string, table *ast.Table) error { t.SetParser(parser) } + switch t := input.(type) { + case parsers.ParserFuncInput: + config, err := getParserConfig(name, table) + if err != nil { + return err + } + t.SetParserFunc(func() (parsers.Parser, error) { + return parsers.NewParser(config) + }) + } + pluginConfig, err := buildInput(name, table) if err != nil { return err @@ -1212,6 +1223,14 @@ func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) { // a parsers.Parser object, and creates it, which can then be added onto // an Input object. func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { + config, err := getParserConfig(name, tbl) + if err != nil { + return nil, err + } + return parsers.NewParser(config) +} + +func getParserConfig(name string, tbl *ast.Table) (*parsers.Config, error) { c := &parsers.Config{} if node, ok := tbl.Fields["data_format"]; ok { @@ -1510,12 +1529,12 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { if node, ok := tbl.Fields["csv_header_row_count"]; ok { if kv, ok := node.(*ast.KeyValue); ok { - if str, ok := kv.Value.(*ast.String); ok { - iVal, err := strconv.Atoi(str.Value) - c.CSVHeaderRowCount = iVal + if integer, ok := kv.Value.(*ast.Integer); ok { + v, err := integer.Int() if err != nil { - return nil, fmt.Errorf("E! parsing to int: %v", err) + return nil, err } + c.CSVHeaderRowCount = int(v) } } } @@ -1583,16 +1602,19 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { delete(tbl.Fields, "grok_custom_patterns") delete(tbl.Fields, "grok_custom_pattern_files") delete(tbl.Fields, "grok_timezone") - delete(tbl.Fields, "csv_data_columns") - delete(tbl.Fields, "csv_tag_columns") + delete(tbl.Fields, "csv_column_names") + delete(tbl.Fields, "csv_comment") + delete(tbl.Fields, "csv_delimiter") delete(tbl.Fields, "csv_field_columns") - delete(tbl.Fields, "csv_name_column") + delete(tbl.Fields, "csv_header_row_count") + delete(tbl.Fields, "csv_measurement_column") + delete(tbl.Fields, "csv_skip_columns") + delete(tbl.Fields, "csv_skip_rows") + delete(tbl.Fields, "csv_tag_columns") delete(tbl.Fields, "csv_timestamp_column") delete(tbl.Fields, "csv_timestamp_format") - delete(tbl.Fields, "csv_delimiter") - delete(tbl.Fields, "csv_header") - return parsers.NewParser(c) + return c, nil } // buildSerializer grabs the necessary entries from the ast.Table for creating diff --git a/plugins/inputs/logparser/logparser.go b/plugins/inputs/logparser/logparser.go index bdfa4bacc..d52df3aa9 100644 --- a/plugins/inputs/logparser/logparser.go +++ b/plugins/inputs/logparser/logparser.go @@ -191,15 +191,13 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error { Poll: poll, Logger: tail.DiscardingLogger, }) - - //add message saying a new tailer was added for the file - log.Printf("D! tail added for file: %v", file) - if err != nil { l.acc.AddError(err) continue } + log.Printf("D! [inputs.logparser] tail added for file: %v", file) + // create a goroutine for each "tailer" l.wg.Add(1) go l.receiver(tailer) diff --git a/plugins/inputs/tail/tail.go b/plugins/inputs/tail/tail.go index ad3d713f3..cdea675e0 100644 --- a/plugins/inputs/tail/tail.go +++ b/plugins/inputs/tail/tail.go @@ -4,6 +4,7 @@ package tail import ( "fmt" + "log" "strings" "sync" @@ -25,10 +26,10 @@ type Tail struct { Pipe bool WatchMethod string - tailers map[string]*tail.Tail - parser parsers.Parser - wg sync.WaitGroup - acc telegraf.Accumulator + tailers map[string]*tail.Tail + parserFunc parsers.ParserFunc + wg sync.WaitGroup + acc telegraf.Accumulator sync.Mutex } @@ -130,10 +131,18 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error { t.acc.AddError(err) continue } + + log.Printf("D! [inputs.tail] tail added for file: %v", file) + + parser, err := t.parserFunc() + if err != nil { + t.acc.AddError(fmt.Errorf("error creating parser: %v", err)) + } + // create a goroutine for each "tailer" t.wg.Add(1) - go t.receiver(tailer) - t.tailers[file] = tailer + go t.receiver(parser, tailer) + t.tailers[tailer.Filename] = tailer } } return nil @@ -141,9 +150,11 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error { // this is launched as a goroutine to continuously watch a tailed logfile // for changes, parse any incoming msgs, and add to the accumulator. -func (t *Tail) receiver(tailer *tail.Tail) { +func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) { defer t.wg.Done() + var firstLine = true + var metrics []telegraf.Metric var m telegraf.Metric var err error var line *tail.Line @@ -156,7 +167,21 @@ func (t *Tail) receiver(tailer *tail.Tail) { // Fix up files with Windows line endings. text := strings.TrimRight(line.Text, "\r") - m, err = t.parser.ParseLine(text) + if firstLine { + metrics, err = parser.Parse([]byte(text)) + if err == nil { + if len(metrics) == 0 { + firstLine = false + continue + } else { + m = metrics[0] + } + } + firstLine = false + } else { + m, err = parser.ParseLine(text) + } + if err == nil { if m != nil { tags := m.Tags() @@ -168,6 +193,9 @@ func (t *Tail) receiver(tailer *tail.Tail) { tailer.Filename, line.Text, err)) } } + + log.Printf("D! [inputs.tail] tail removed for file: %v", tailer.Filename) + if err := tailer.Err(); err != nil { t.acc.AddError(fmt.Errorf("E! Error tailing file %s, Error: %s\n", tailer.Filename, err)) @@ -183,13 +211,16 @@ func (t *Tail) Stop() { if err != nil { t.acc.AddError(fmt.Errorf("E! Error stopping tail on file %s\n", tailer.Filename)) } + } + + for _, tailer := range t.tailers { tailer.Cleanup() } t.wg.Wait() } -func (t *Tail) SetParser(parser parsers.Parser) { - t.parser = parser +func (t *Tail) SetParserFunc(fn parsers.ParserFunc) { + t.parserFunc = fn } func init() { diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go index 23df0d0b8..06db2c172 100644 --- a/plugins/inputs/tail/tail_test.go +++ b/plugins/inputs/tail/tail_test.go @@ -27,8 +27,7 @@ func TestTailFromBeginning(t *testing.T) { tt := NewTail() tt.FromBeginning = true tt.Files = []string{tmpfile.Name()} - p, _ := parsers.NewInfluxParser() - tt.SetParser(p) + tt.SetParserFunc(parsers.NewInfluxParser) defer tt.Stop() defer tmpfile.Close() @@ -60,8 +59,7 @@ func TestTailFromEnd(t *testing.T) { tt := NewTail() tt.Files = []string{tmpfile.Name()} - p, _ := parsers.NewInfluxParser() - tt.SetParser(p) + tt.SetParserFunc(parsers.NewInfluxParser) defer tt.Stop() defer tmpfile.Close() @@ -98,8 +96,7 @@ func TestTailBadLine(t *testing.T) { tt := NewTail() tt.FromBeginning = true tt.Files = []string{tmpfile.Name()} - p, _ := parsers.NewInfluxParser() - tt.SetParser(p) + tt.SetParserFunc(parsers.NewInfluxParser) defer tt.Stop() defer tmpfile.Close() @@ -124,8 +121,7 @@ func TestTailDosLineendings(t *testing.T) { tt := NewTail() tt.FromBeginning = true tt.Files = []string{tmpfile.Name()} - p, _ := parsers.NewInfluxParser() - tt.SetParser(p) + tt.SetParserFunc(parsers.NewInfluxParser) defer tt.Stop() defer tmpfile.Close() diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index 9193fbf5b..8e0b8b47e 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -26,6 +26,11 @@ type Parser struct { TimestampColumn string TimestampFormat string DefaultTags map[string]string + TimeFunc func() time.Time +} + +func (p *Parser) SetTimeFunc(fn metric.TimeFunc) { + p.TimeFunc = fn } func (p *Parser) compile(r *bytes.Reader) (*csv.Reader, error) { @@ -167,7 +172,7 @@ outer: measurementName = fmt.Sprintf("%v", recordFields[p.MeasurementColumn]) } - metricTime := time.Now() + metricTime := p.TimeFunc() if p.TimestampColumn != "" { if recordFields[p.TimestampColumn] == nil { return nil, fmt.Errorf("timestamp column: %v could not be found", p.TimestampColumn) diff --git a/plugins/parsers/csv/parser_test.go b/plugins/parsers/csv/parser_test.go index b488a1f16..e3668d3ac 100644 --- a/plugins/parsers/csv/parser_test.go +++ b/plugins/parsers/csv/parser_test.go @@ -6,13 +6,19 @@ import ( "time" "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) +var DefaultTime = func() time.Time { + return time.Unix(3600, 0) +} + func TestBasicCSV(t *testing.T) { p := Parser{ ColumnNames: []string{"first", "second", "third"}, TagColumns: []string{"third"}, + TimeFunc: DefaultTime, } _, err := p.ParseLine("1.4,true,hi") @@ -23,6 +29,7 @@ func TestHeaderConcatenationCSV(t *testing.T) { p := Parser{ HeaderRowCount: 2, MeasurementColumn: "3", + TimeFunc: DefaultTime, } testCSV := `first,second 1,2,3 @@ -38,6 +45,7 @@ func TestHeaderOverride(t *testing.T) { HeaderRowCount: 1, ColumnNames: []string{"first", "second", "third"}, MeasurementColumn: "third", + TimeFunc: DefaultTime, } testCSV := `line1,line2,line3 3.4,70,test_name` @@ -53,6 +61,7 @@ func TestTimestamp(t *testing.T) { MeasurementColumn: "third", TimestampColumn: "first", TimestampFormat: "02/01/06 03:04:05 PM", + TimeFunc: DefaultTime, } testCSV := `line1,line2,line3 23/05/09 04:05:06 PM,70,test_name @@ -70,6 +79,7 @@ func TestTimestampError(t *testing.T) { ColumnNames: []string{"first", "second", "third"}, MeasurementColumn: "third", TimestampColumn: "first", + TimeFunc: DefaultTime, } testCSV := `line1,line2,line3 23/05/09 04:05:06 PM,70,test_name @@ -83,6 +93,7 @@ func TestQuotedCharacter(t *testing.T) { HeaderRowCount: 1, ColumnNames: []string{"first", "second", "third"}, MeasurementColumn: "third", + TimeFunc: DefaultTime, } testCSV := `line1,line2,line3 @@ -98,6 +109,7 @@ func TestDelimiter(t *testing.T) { Delimiter: "%", ColumnNames: []string{"first", "second", "third"}, MeasurementColumn: "third", + TimeFunc: DefaultTime, } testCSV := `line1%line2%line3 @@ -113,6 +125,7 @@ func TestValueConversion(t *testing.T) { Delimiter: ",", ColumnNames: []string{"first", "second", "third", "fourth"}, MetricName: "test_value", + TimeFunc: DefaultTime, } testCSV := `3.3,4,true,hello` @@ -142,6 +155,7 @@ func TestSkipComment(t *testing.T) { Comment: "#", ColumnNames: []string{"first", "second", "third", "fourth"}, MetricName: "test_value", + TimeFunc: DefaultTime, } testCSV := `#3.3,4,true,hello 4,9.9,true,name_this` @@ -164,6 +178,7 @@ func TestTrimSpace(t *testing.T) { TrimSpace: true, ColumnNames: []string{"first", "second", "third", "fourth"}, MetricName: "test_value", + TimeFunc: DefaultTime, } testCSV := ` 3.3, 4, true,hello` @@ -185,6 +200,7 @@ func TestSkipRows(t *testing.T) { SkipRows: 1, TagColumns: []string{"line1"}, MeasurementColumn: "line3", + TimeFunc: DefaultTime, } testCSV := `garbage nonsense line1,line2,line3 @@ -203,6 +219,7 @@ func TestSkipColumns(t *testing.T) { p := Parser{ SkipColumns: 1, ColumnNames: []string{"line1", "line2"}, + TimeFunc: DefaultTime, } testCSV := `hello,80,test_name` @@ -219,6 +236,7 @@ func TestSkipColumnsWithHeader(t *testing.T) { p := Parser{ SkipColumns: 1, HeaderRowCount: 2, + TimeFunc: DefaultTime, } testCSV := `col,col,col 1,2,3 @@ -229,3 +247,30 @@ func TestSkipColumnsWithHeader(t *testing.T) { require.NoError(t, err) require.Equal(t, map[string]interface{}{"col2": int64(80), "col3": "test_name"}, metrics[0].Fields()) } + +func TestParseStream(t *testing.T) { + p := Parser{ + MetricName: "csv", + HeaderRowCount: 1, + TimeFunc: DefaultTime, + } + + csvHeader := "a,b,c" + csvBody := "1,2,3" + + metrics, err := p.Parse([]byte(csvHeader)) + require.NoError(t, err) + require.Len(t, metrics, 0) + metric, err := p.ParseLine(csvBody) + testutil.RequireMetricEqual(t, + testutil.MustMetric( + "csv", + map[string]string{}, + map[string]interface{}{ + "a": int64(1), + "b": int64(2), + "c": int64(3), + }, + DefaultTime(), + ), metric) +} diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 32027e417..28ff30261 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -2,6 +2,7 @@ package parsers import ( "fmt" + "time" "github.com/influxdata/telegraf" @@ -18,6 +19,8 @@ import ( "github.com/influxdata/telegraf/plugins/parsers/wavefront" ) +type ParserFunc func() (Parser, error) + // ParserInput is an interface for input plugins that are able to parse // arbitrary data formats. type ParserInput interface { @@ -25,6 +28,13 @@ type ParserInput interface { SetParser(parser Parser) } +// ParserFuncInput is an interface for input plugins that are able to parse +// arbitrary data formats. +type ParserFuncInput interface { + // GetParser returns a new parser. + SetParserFunc(fn ParserFunc) +} + // Parser is an interface defining functions that a parser plugin must satisfy. type Parser interface { // Parse takes a byte buffer separated by newlines @@ -116,17 +126,17 @@ type Config struct { GrokTimeZone string //csv configuration - CSVDelimiter string - CSVComment string - CSVTrimSpace bool - CSVColumnNames []string - CSVTagColumns []string - CSVMeasurementColumn string - CSVTimestampColumn string - CSVTimestampFormat string - CSVHeaderRowCount int - CSVSkipRows int - CSVSkipColumns int + CSVColumnNames []string `toml:"csv_column_names"` + CSVComment string `toml:"csv_comment"` + CSVDelimiter string `toml:"csv_delimiter"` + CSVHeaderRowCount int `toml:"csv_header_row_count"` + CSVMeasurementColumn string `toml:"csv_measurement_column"` + CSVSkipColumns int `toml:"csv_skip_columns"` + CSVSkipRows int `toml:"csv_skip_rows"` + CSVTagColumns []string `toml:"csv_tag_columns"` + CSVTimestampColumn string `toml:"csv_timestamp_column"` + CSVTimestampFormat string `toml:"csv_timestamp_format"` + CSVTrimSpace bool `toml:"csv_trim_space"` } // NewParser returns a Parser interface based on the given config. @@ -199,28 +209,27 @@ func NewParser(config *Config) (Parser, error) { } func newCSVParser(metricName string, - header int, + headerRowCount int, skipRows int, skipColumns int, delimiter string, comment string, trimSpace bool, - dataColumns []string, + columnNames []string, tagColumns []string, nameColumn string, timestampColumn string, timestampFormat string, defaultTags map[string]string) (Parser, error) { - if header == 0 && len(dataColumns) == 0 { - // if there is no header and no DataColumns, that's an error - return nil, fmt.Errorf("there must be a header if `csv_data_columns` is not specified") + if headerRowCount == 0 && len(columnNames) == 0 { + return nil, fmt.Errorf("there must be a header if `csv_column_names` is not specified") } if delimiter != "" { runeStr := []rune(delimiter) if len(runeStr) > 1 { - return nil, fmt.Errorf("delimiter must be a single character, got: %s", delimiter) + return nil, fmt.Errorf("csv_delimiter must be a single character, got: %s", delimiter) } delimiter = fmt.Sprintf("%v", runeStr[0]) } @@ -228,25 +237,26 @@ func newCSVParser(metricName string, if comment != "" { runeStr := []rune(comment) if len(runeStr) > 1 { - return nil, fmt.Errorf("delimiter must be a single character, got: %s", comment) + return nil, fmt.Errorf("csv_delimiter must be a single character, got: %s", comment) } comment = fmt.Sprintf("%v", runeStr[0]) } parser := &csv.Parser{ MetricName: metricName, - HeaderRowCount: header, + HeaderRowCount: headerRowCount, SkipRows: skipRows, SkipColumns: skipColumns, Delimiter: delimiter, Comment: comment, TrimSpace: trimSpace, - ColumnNames: dataColumns, + ColumnNames: columnNames, TagColumns: tagColumns, MeasurementColumn: nameColumn, TimestampColumn: timestampColumn, TimestampFormat: timestampFormat, DefaultTags: defaultTags, + TimeFunc: time.Now, } return parser, nil