Fix cleanup of csv parser options, use per file parser (#4712)

This commit is contained in:
Daniel Nelson 2018-09-18 09:23:45 -07:00 committed by GitHub
parent 1d76343422
commit b5299f4cc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 160 additions and 53 deletions

View File

@ -855,6 +855,17 @@ func (c *Config) addInput(name string, table *ast.Table) error {
t.SetParser(parser) t.SetParser(parser)
} }
switch t := input.(type) {
case parsers.ParserFuncInput:
config, err := getParserConfig(name, table)
if err != nil {
return err
}
t.SetParserFunc(func() (parsers.Parser, error) {
return parsers.NewParser(config)
})
}
pluginConfig, err := buildInput(name, table) pluginConfig, err := buildInput(name, table)
if err != nil { if err != nil {
return err return err
@ -1212,6 +1223,14 @@ func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) {
// a parsers.Parser object, and creates it, which can then be added onto // a parsers.Parser object, and creates it, which can then be added onto
// an Input object. // an Input object.
func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
config, err := getParserConfig(name, tbl)
if err != nil {
return nil, err
}
return parsers.NewParser(config)
}
func getParserConfig(name string, tbl *ast.Table) (*parsers.Config, error) {
c := &parsers.Config{} c := &parsers.Config{}
if node, ok := tbl.Fields["data_format"]; ok { if node, ok := tbl.Fields["data_format"]; ok {
@ -1510,12 +1529,12 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
if node, ok := tbl.Fields["csv_header_row_count"]; ok { if node, ok := tbl.Fields["csv_header_row_count"]; ok {
if kv, ok := node.(*ast.KeyValue); ok { if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok { if integer, ok := kv.Value.(*ast.Integer); ok {
iVal, err := strconv.Atoi(str.Value) v, err := integer.Int()
c.CSVHeaderRowCount = iVal
if err != nil { if err != nil {
return nil, fmt.Errorf("E! parsing to int: %v", err) return nil, err
} }
c.CSVHeaderRowCount = int(v)
} }
} }
} }
@ -1583,16 +1602,19 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
delete(tbl.Fields, "grok_custom_patterns") delete(tbl.Fields, "grok_custom_patterns")
delete(tbl.Fields, "grok_custom_pattern_files") delete(tbl.Fields, "grok_custom_pattern_files")
delete(tbl.Fields, "grok_timezone") delete(tbl.Fields, "grok_timezone")
delete(tbl.Fields, "csv_data_columns") delete(tbl.Fields, "csv_column_names")
delete(tbl.Fields, "csv_tag_columns") delete(tbl.Fields, "csv_comment")
delete(tbl.Fields, "csv_delimiter")
delete(tbl.Fields, "csv_field_columns") delete(tbl.Fields, "csv_field_columns")
delete(tbl.Fields, "csv_name_column") delete(tbl.Fields, "csv_header_row_count")
delete(tbl.Fields, "csv_measurement_column")
delete(tbl.Fields, "csv_skip_columns")
delete(tbl.Fields, "csv_skip_rows")
delete(tbl.Fields, "csv_tag_columns")
delete(tbl.Fields, "csv_timestamp_column") delete(tbl.Fields, "csv_timestamp_column")
delete(tbl.Fields, "csv_timestamp_format") delete(tbl.Fields, "csv_timestamp_format")
delete(tbl.Fields, "csv_delimiter")
delete(tbl.Fields, "csv_header")
return parsers.NewParser(c) return c, nil
} }
// buildSerializer grabs the necessary entries from the ast.Table for creating // buildSerializer grabs the necessary entries from the ast.Table for creating

View File

@ -191,15 +191,13 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
Poll: poll, Poll: poll,
Logger: tail.DiscardingLogger, Logger: tail.DiscardingLogger,
}) })
//add message saying a new tailer was added for the file
log.Printf("D! tail added for file: %v", file)
if err != nil { if err != nil {
l.acc.AddError(err) l.acc.AddError(err)
continue continue
} }
log.Printf("D! [inputs.logparser] tail added for file: %v", file)
// create a goroutine for each "tailer" // create a goroutine for each "tailer"
l.wg.Add(1) l.wg.Add(1)
go l.receiver(tailer) go l.receiver(tailer)

View File

@ -4,6 +4,7 @@ package tail
import ( import (
"fmt" "fmt"
"log"
"strings" "strings"
"sync" "sync"
@ -26,7 +27,7 @@ type Tail struct {
WatchMethod string WatchMethod string
tailers map[string]*tail.Tail tailers map[string]*tail.Tail
parser parsers.Parser parserFunc parsers.ParserFunc
wg sync.WaitGroup wg sync.WaitGroup
acc telegraf.Accumulator acc telegraf.Accumulator
@ -130,10 +131,18 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {
t.acc.AddError(err) t.acc.AddError(err)
continue continue
} }
log.Printf("D! [inputs.tail] tail added for file: %v", file)
parser, err := t.parserFunc()
if err != nil {
t.acc.AddError(fmt.Errorf("error creating parser: %v", err))
}
// create a goroutine for each "tailer" // create a goroutine for each "tailer"
t.wg.Add(1) t.wg.Add(1)
go t.receiver(tailer) go t.receiver(parser, tailer)
t.tailers[file] = tailer t.tailers[tailer.Filename] = tailer
} }
} }
return nil return nil
@ -141,9 +150,11 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {
// this is launched as a goroutine to continuously watch a tailed logfile // this is launched as a goroutine to continuously watch a tailed logfile
// for changes, parse any incoming msgs, and add to the accumulator. // for changes, parse any incoming msgs, and add to the accumulator.
func (t *Tail) receiver(tailer *tail.Tail) { func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) {
defer t.wg.Done() defer t.wg.Done()
var firstLine = true
var metrics []telegraf.Metric
var m telegraf.Metric var m telegraf.Metric
var err error var err error
var line *tail.Line var line *tail.Line
@ -156,7 +167,21 @@ func (t *Tail) receiver(tailer *tail.Tail) {
// Fix up files with Windows line endings. // Fix up files with Windows line endings.
text := strings.TrimRight(line.Text, "\r") text := strings.TrimRight(line.Text, "\r")
m, err = t.parser.ParseLine(text) if firstLine {
metrics, err = parser.Parse([]byte(text))
if err == nil {
if len(metrics) == 0 {
firstLine = false
continue
} else {
m = metrics[0]
}
}
firstLine = false
} else {
m, err = parser.ParseLine(text)
}
if err == nil { if err == nil {
if m != nil { if m != nil {
tags := m.Tags() tags := m.Tags()
@ -168,6 +193,9 @@ func (t *Tail) receiver(tailer *tail.Tail) {
tailer.Filename, line.Text, err)) tailer.Filename, line.Text, err))
} }
} }
log.Printf("D! [inputs.tail] tail removed for file: %v", tailer.Filename)
if err := tailer.Err(); err != nil { if err := tailer.Err(); err != nil {
t.acc.AddError(fmt.Errorf("E! Error tailing file %s, Error: %s\n", t.acc.AddError(fmt.Errorf("E! Error tailing file %s, Error: %s\n",
tailer.Filename, err)) tailer.Filename, err))
@ -183,13 +211,16 @@ func (t *Tail) Stop() {
if err != nil { if err != nil {
t.acc.AddError(fmt.Errorf("E! Error stopping tail on file %s\n", tailer.Filename)) t.acc.AddError(fmt.Errorf("E! Error stopping tail on file %s\n", tailer.Filename))
} }
}
for _, tailer := range t.tailers {
tailer.Cleanup() tailer.Cleanup()
} }
t.wg.Wait() t.wg.Wait()
} }
func (t *Tail) SetParser(parser parsers.Parser) { func (t *Tail) SetParserFunc(fn parsers.ParserFunc) {
t.parser = parser t.parserFunc = fn
} }
func init() { func init() {

View File

@ -27,8 +27,7 @@ func TestTailFromBeginning(t *testing.T) {
tt := NewTail() tt := NewTail()
tt.FromBeginning = true tt.FromBeginning = true
tt.Files = []string{tmpfile.Name()} tt.Files = []string{tmpfile.Name()}
p, _ := parsers.NewInfluxParser() tt.SetParserFunc(parsers.NewInfluxParser)
tt.SetParser(p)
defer tt.Stop() defer tt.Stop()
defer tmpfile.Close() defer tmpfile.Close()
@ -60,8 +59,7 @@ func TestTailFromEnd(t *testing.T) {
tt := NewTail() tt := NewTail()
tt.Files = []string{tmpfile.Name()} tt.Files = []string{tmpfile.Name()}
p, _ := parsers.NewInfluxParser() tt.SetParserFunc(parsers.NewInfluxParser)
tt.SetParser(p)
defer tt.Stop() defer tt.Stop()
defer tmpfile.Close() defer tmpfile.Close()
@ -98,8 +96,7 @@ func TestTailBadLine(t *testing.T) {
tt := NewTail() tt := NewTail()
tt.FromBeginning = true tt.FromBeginning = true
tt.Files = []string{tmpfile.Name()} tt.Files = []string{tmpfile.Name()}
p, _ := parsers.NewInfluxParser() tt.SetParserFunc(parsers.NewInfluxParser)
tt.SetParser(p)
defer tt.Stop() defer tt.Stop()
defer tmpfile.Close() defer tmpfile.Close()
@ -124,8 +121,7 @@ func TestTailDosLineendings(t *testing.T) {
tt := NewTail() tt := NewTail()
tt.FromBeginning = true tt.FromBeginning = true
tt.Files = []string{tmpfile.Name()} tt.Files = []string{tmpfile.Name()}
p, _ := parsers.NewInfluxParser() tt.SetParserFunc(parsers.NewInfluxParser)
tt.SetParser(p)
defer tt.Stop() defer tt.Stop()
defer tmpfile.Close() defer tmpfile.Close()

View File

@ -26,6 +26,11 @@ type Parser struct {
TimestampColumn string TimestampColumn string
TimestampFormat string TimestampFormat string
DefaultTags map[string]string DefaultTags map[string]string
TimeFunc func() time.Time
}
func (p *Parser) SetTimeFunc(fn metric.TimeFunc) {
p.TimeFunc = fn
} }
func (p *Parser) compile(r *bytes.Reader) (*csv.Reader, error) { func (p *Parser) compile(r *bytes.Reader) (*csv.Reader, error) {
@ -167,7 +172,7 @@ outer:
measurementName = fmt.Sprintf("%v", recordFields[p.MeasurementColumn]) measurementName = fmt.Sprintf("%v", recordFields[p.MeasurementColumn])
} }
metricTime := time.Now() metricTime := p.TimeFunc()
if p.TimestampColumn != "" { if p.TimestampColumn != "" {
if recordFields[p.TimestampColumn] == nil { if recordFields[p.TimestampColumn] == nil {
return nil, fmt.Errorf("timestamp column: %v could not be found", p.TimestampColumn) return nil, fmt.Errorf("timestamp column: %v could not be found", p.TimestampColumn)

View File

@ -6,13 +6,19 @@ import (
"time" "time"
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
var DefaultTime = func() time.Time {
return time.Unix(3600, 0)
}
func TestBasicCSV(t *testing.T) { func TestBasicCSV(t *testing.T) {
p := Parser{ p := Parser{
ColumnNames: []string{"first", "second", "third"}, ColumnNames: []string{"first", "second", "third"},
TagColumns: []string{"third"}, TagColumns: []string{"third"},
TimeFunc: DefaultTime,
} }
_, err := p.ParseLine("1.4,true,hi") _, err := p.ParseLine("1.4,true,hi")
@ -23,6 +29,7 @@ func TestHeaderConcatenationCSV(t *testing.T) {
p := Parser{ p := Parser{
HeaderRowCount: 2, HeaderRowCount: 2,
MeasurementColumn: "3", MeasurementColumn: "3",
TimeFunc: DefaultTime,
} }
testCSV := `first,second testCSV := `first,second
1,2,3 1,2,3
@ -38,6 +45,7 @@ func TestHeaderOverride(t *testing.T) {
HeaderRowCount: 1, HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"}, ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third", MeasurementColumn: "third",
TimeFunc: DefaultTime,
} }
testCSV := `line1,line2,line3 testCSV := `line1,line2,line3
3.4,70,test_name` 3.4,70,test_name`
@ -53,6 +61,7 @@ func TestTimestamp(t *testing.T) {
MeasurementColumn: "third", MeasurementColumn: "third",
TimestampColumn: "first", TimestampColumn: "first",
TimestampFormat: "02/01/06 03:04:05 PM", TimestampFormat: "02/01/06 03:04:05 PM",
TimeFunc: DefaultTime,
} }
testCSV := `line1,line2,line3 testCSV := `line1,line2,line3
23/05/09 04:05:06 PM,70,test_name 23/05/09 04:05:06 PM,70,test_name
@ -70,6 +79,7 @@ func TestTimestampError(t *testing.T) {
ColumnNames: []string{"first", "second", "third"}, ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third", MeasurementColumn: "third",
TimestampColumn: "first", TimestampColumn: "first",
TimeFunc: DefaultTime,
} }
testCSV := `line1,line2,line3 testCSV := `line1,line2,line3
23/05/09 04:05:06 PM,70,test_name 23/05/09 04:05:06 PM,70,test_name
@ -83,6 +93,7 @@ func TestQuotedCharacter(t *testing.T) {
HeaderRowCount: 1, HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"}, ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third", MeasurementColumn: "third",
TimeFunc: DefaultTime,
} }
testCSV := `line1,line2,line3 testCSV := `line1,line2,line3
@ -98,6 +109,7 @@ func TestDelimiter(t *testing.T) {
Delimiter: "%", Delimiter: "%",
ColumnNames: []string{"first", "second", "third"}, ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third", MeasurementColumn: "third",
TimeFunc: DefaultTime,
} }
testCSV := `line1%line2%line3 testCSV := `line1%line2%line3
@ -113,6 +125,7 @@ func TestValueConversion(t *testing.T) {
Delimiter: ",", Delimiter: ",",
ColumnNames: []string{"first", "second", "third", "fourth"}, ColumnNames: []string{"first", "second", "third", "fourth"},
MetricName: "test_value", MetricName: "test_value",
TimeFunc: DefaultTime,
} }
testCSV := `3.3,4,true,hello` testCSV := `3.3,4,true,hello`
@ -142,6 +155,7 @@ func TestSkipComment(t *testing.T) {
Comment: "#", Comment: "#",
ColumnNames: []string{"first", "second", "third", "fourth"}, ColumnNames: []string{"first", "second", "third", "fourth"},
MetricName: "test_value", MetricName: "test_value",
TimeFunc: DefaultTime,
} }
testCSV := `#3.3,4,true,hello testCSV := `#3.3,4,true,hello
4,9.9,true,name_this` 4,9.9,true,name_this`
@ -164,6 +178,7 @@ func TestTrimSpace(t *testing.T) {
TrimSpace: true, TrimSpace: true,
ColumnNames: []string{"first", "second", "third", "fourth"}, ColumnNames: []string{"first", "second", "third", "fourth"},
MetricName: "test_value", MetricName: "test_value",
TimeFunc: DefaultTime,
} }
testCSV := ` 3.3, 4, true,hello` testCSV := ` 3.3, 4, true,hello`
@ -185,6 +200,7 @@ func TestSkipRows(t *testing.T) {
SkipRows: 1, SkipRows: 1,
TagColumns: []string{"line1"}, TagColumns: []string{"line1"},
MeasurementColumn: "line3", MeasurementColumn: "line3",
TimeFunc: DefaultTime,
} }
testCSV := `garbage nonsense testCSV := `garbage nonsense
line1,line2,line3 line1,line2,line3
@ -203,6 +219,7 @@ func TestSkipColumns(t *testing.T) {
p := Parser{ p := Parser{
SkipColumns: 1, SkipColumns: 1,
ColumnNames: []string{"line1", "line2"}, ColumnNames: []string{"line1", "line2"},
TimeFunc: DefaultTime,
} }
testCSV := `hello,80,test_name` testCSV := `hello,80,test_name`
@ -219,6 +236,7 @@ func TestSkipColumnsWithHeader(t *testing.T) {
p := Parser{ p := Parser{
SkipColumns: 1, SkipColumns: 1,
HeaderRowCount: 2, HeaderRowCount: 2,
TimeFunc: DefaultTime,
} }
testCSV := `col,col,col testCSV := `col,col,col
1,2,3 1,2,3
@ -229,3 +247,30 @@ func TestSkipColumnsWithHeader(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, map[string]interface{}{"col2": int64(80), "col3": "test_name"}, metrics[0].Fields()) require.Equal(t, map[string]interface{}{"col2": int64(80), "col3": "test_name"}, metrics[0].Fields())
} }
func TestParseStream(t *testing.T) {
p := Parser{
MetricName: "csv",
HeaderRowCount: 1,
TimeFunc: DefaultTime,
}
csvHeader := "a,b,c"
csvBody := "1,2,3"
metrics, err := p.Parse([]byte(csvHeader))
require.NoError(t, err)
require.Len(t, metrics, 0)
metric, err := p.ParseLine(csvBody)
testutil.RequireMetricEqual(t,
testutil.MustMetric(
"csv",
map[string]string{},
map[string]interface{}{
"a": int64(1),
"b": int64(2),
"c": int64(3),
},
DefaultTime(),
), metric)
}

View File

@ -2,6 +2,7 @@ package parsers
import ( import (
"fmt" "fmt"
"time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
@ -18,6 +19,8 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/wavefront" "github.com/influxdata/telegraf/plugins/parsers/wavefront"
) )
type ParserFunc func() (Parser, error)
// ParserInput is an interface for input plugins that are able to parse // ParserInput is an interface for input plugins that are able to parse
// arbitrary data formats. // arbitrary data formats.
type ParserInput interface { type ParserInput interface {
@ -25,6 +28,13 @@ type ParserInput interface {
SetParser(parser Parser) SetParser(parser Parser)
} }
// ParserFuncInput is an interface for input plugins that are able to parse
// arbitrary data formats.
type ParserFuncInput interface {
// GetParser returns a new parser.
SetParserFunc(fn ParserFunc)
}
// Parser is an interface defining functions that a parser plugin must satisfy. // Parser is an interface defining functions that a parser plugin must satisfy.
type Parser interface { type Parser interface {
// Parse takes a byte buffer separated by newlines // Parse takes a byte buffer separated by newlines
@ -116,17 +126,17 @@ type Config struct {
GrokTimeZone string GrokTimeZone string
//csv configuration //csv configuration
CSVDelimiter string CSVColumnNames []string `toml:"csv_column_names"`
CSVComment string CSVComment string `toml:"csv_comment"`
CSVTrimSpace bool CSVDelimiter string `toml:"csv_delimiter"`
CSVColumnNames []string CSVHeaderRowCount int `toml:"csv_header_row_count"`
CSVTagColumns []string CSVMeasurementColumn string `toml:"csv_measurement_column"`
CSVMeasurementColumn string CSVSkipColumns int `toml:"csv_skip_columns"`
CSVTimestampColumn string CSVSkipRows int `toml:"csv_skip_rows"`
CSVTimestampFormat string CSVTagColumns []string `toml:"csv_tag_columns"`
CSVHeaderRowCount int CSVTimestampColumn string `toml:"csv_timestamp_column"`
CSVSkipRows int CSVTimestampFormat string `toml:"csv_timestamp_format"`
CSVSkipColumns int CSVTrimSpace bool `toml:"csv_trim_space"`
} }
// NewParser returns a Parser interface based on the given config. // NewParser returns a Parser interface based on the given config.
@ -199,28 +209,27 @@ func NewParser(config *Config) (Parser, error) {
} }
func newCSVParser(metricName string, func newCSVParser(metricName string,
header int, headerRowCount int,
skipRows int, skipRows int,
skipColumns int, skipColumns int,
delimiter string, delimiter string,
comment string, comment string,
trimSpace bool, trimSpace bool,
dataColumns []string, columnNames []string,
tagColumns []string, tagColumns []string,
nameColumn string, nameColumn string,
timestampColumn string, timestampColumn string,
timestampFormat string, timestampFormat string,
defaultTags map[string]string) (Parser, error) { defaultTags map[string]string) (Parser, error) {
if header == 0 && len(dataColumns) == 0 { if headerRowCount == 0 && len(columnNames) == 0 {
// if there is no header and no DataColumns, that's an error return nil, fmt.Errorf("there must be a header if `csv_column_names` is not specified")
return nil, fmt.Errorf("there must be a header if `csv_data_columns` is not specified")
} }
if delimiter != "" { if delimiter != "" {
runeStr := []rune(delimiter) runeStr := []rune(delimiter)
if len(runeStr) > 1 { if len(runeStr) > 1 {
return nil, fmt.Errorf("delimiter must be a single character, got: %s", delimiter) return nil, fmt.Errorf("csv_delimiter must be a single character, got: %s", delimiter)
} }
delimiter = fmt.Sprintf("%v", runeStr[0]) delimiter = fmt.Sprintf("%v", runeStr[0])
} }
@ -228,25 +237,26 @@ func newCSVParser(metricName string,
if comment != "" { if comment != "" {
runeStr := []rune(comment) runeStr := []rune(comment)
if len(runeStr) > 1 { if len(runeStr) > 1 {
return nil, fmt.Errorf("delimiter must be a single character, got: %s", comment) return nil, fmt.Errorf("csv_delimiter must be a single character, got: %s", comment)
} }
comment = fmt.Sprintf("%v", runeStr[0]) comment = fmt.Sprintf("%v", runeStr[0])
} }
parser := &csv.Parser{ parser := &csv.Parser{
MetricName: metricName, MetricName: metricName,
HeaderRowCount: header, HeaderRowCount: headerRowCount,
SkipRows: skipRows, SkipRows: skipRows,
SkipColumns: skipColumns, SkipColumns: skipColumns,
Delimiter: delimiter, Delimiter: delimiter,
Comment: comment, Comment: comment,
TrimSpace: trimSpace, TrimSpace: trimSpace,
ColumnNames: dataColumns, ColumnNames: columnNames,
TagColumns: tagColumns, TagColumns: tagColumns,
MeasurementColumn: nameColumn, MeasurementColumn: nameColumn,
TimestampColumn: timestampColumn, TimestampColumn: timestampColumn,
TimestampFormat: timestampFormat, TimestampFormat: timestampFormat,
DefaultTags: defaultTags, DefaultTags: defaultTags,
TimeFunc: time.Now,
} }
return parser, nil return parser, nil