Add csv parser (#4439)

This commit is contained in:
maxunt 2018-08-24 16:40:41 -07:00 committed by Daniel Nelson
parent 80346b2e93
commit 889745a112
5 changed files with 733 additions and 20 deletions

View File

@ -12,6 +12,7 @@ Telegraf is able to parse the following input data formats into metrics:
1. [Grok](#grok)
1. [Logfmt](#logfmt)
1. [Wavefront](#wavefront)
1. [CSV](#csv)
Telegraf metrics, like InfluxDB
[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/),
@ -1038,3 +1039,84 @@ There are no additional configuration options for Wavefront Data Format line-pro
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "wavefront"
```
# CSV
Parse out metrics from a CSV formatted table. By default, the parser assumes there is no header and
will read data from the first line. If `csv_header_row_count` is set to anything besides 0, the parser
will extract column names from the first number of rows. Headers of more than 1 row will have their
names concatenated together. Any unnamed columns will be ignored by the parser.
The `csv_skip_rows` config indicates the number of rows to skip before looking for header information or data
to parse. By default, no rows will be skipped.
The `csv_skip_columns` config indicates the number of columns to be skipped before parsing data. These
columns will not be read out of the header. Naming with the `csv_column_names` will begin at the first
parsed column after skipping the indicated columns. By default, no columns are skipped.
To assign custom column names, the `csv_column_names` config is available. If the `csv_column_names`
config is used, all columns must be named as additional columns will be ignored. If `csv_header_row_count`
is set to 0, `csv_column_names` must be specified. Names listed in `csv_column_names` will override names extracted
from the header.
The `csv_tag_columns` and `csv_field_columns` configs are available to add the column data to the metric.
The name used to specify the column is the name in the header, or if specified, the corresponding
name assigned in `csv_column_names`. If neither config is specified, no data will be added to the metric.
Additional configs are available to dynamically name metrics and set custom timestamps. If the
`csv_column_names` config is specified, the parser will assign the metric name to the value found
in that column. If the `csv_timestamp_column` is specified, the parser will extract the timestamp from
that column. If `csv_timestamp_column` is specified, the `csv_timestamp_format` must also be specified
or an error will be thrown.
#### CSV Configuration
```toml
data_format = "csv"
## Indicates how many rows to treat as a header. By default, the parser assumes
## there is no header and will parse the first row as data. If set to anything more
## than 1, column names will be concatenated with the name listed in the next header row.
## If `csv_column_names` is specified, the column names in header will be overridden.
# csv_header_row_count = 0
## Indicates the number of rows to skip before looking for header information.
# csv_skip_rows = 0
## Indicates the number of columns to skip before looking for data to parse.
## These columns will be skipped in the header as well.
# csv_skip_columns = 0
## The seperator between csv fields
## By default, the parser assumes a comma (",")
# csv_delimiter = ","
## The character reserved for marking a row as a comment row
## Commented rows are skipped and not parsed
# csv_comment = ""
## If set to true, the parser will remove leading whitespace from fields
## By default, this is false
# csv_trim_space = false
## For assigning custom names to columns
## If this is specified, all columns should have a name
## Unnamed columns will be ignored by the parser.
## If `csv_header_row_count` is set to 0, this config must be used
csv_column_names = []
## Columns listed here will be added as tags. Any other columns
## will be added as fields.
csv_tag_columns = []
## The column to extract the name of the metric from
## By default, this is the name of the plugin
## the `name_override` config overrides this
# csv_measurement_column = ""
## The column to extract time information for the metric
## `csv_timestamp_format` must be specified if this is used
# csv_timestamp_column = ""
## The format of time data extracted from `csv_timestamp_column`
## this must be specified if `csv_timestamp_column` is specified
# csv_timestamp_format = ""
```

View File

@ -1443,6 +1443,120 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
}
}
//for csv parser
if node, ok := tbl.Fields["csv_column_names"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.CSVColumnNames = append(c.CSVColumnNames, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["csv_tag_columns"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.CSVTagColumns = append(c.CSVTagColumns, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["csv_delimiter"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.CSVDelimiter = str.Value
}
}
}
if node, ok := tbl.Fields["csv_comment"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.CSVComment = str.Value
}
}
}
if node, ok := tbl.Fields["csv_measurement_column"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.CSVMeasurementColumn = str.Value
}
}
}
if node, ok := tbl.Fields["csv_timestamp_column"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.CSVTimestampColumn = str.Value
}
}
}
if node, ok := tbl.Fields["csv_timestamp_format"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.CSVTimestampFormat = str.Value
}
}
}
if node, ok := tbl.Fields["csv_header_row_count"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
iVal, err := strconv.Atoi(str.Value)
c.CSVHeaderRowCount = iVal
if err != nil {
return nil, fmt.Errorf("E! parsing to int: %v", err)
}
}
}
}
if node, ok := tbl.Fields["csv_skip_rows"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
iVal, err := strconv.Atoi(str.Value)
c.CSVSkipRows = iVal
if err != nil {
return nil, fmt.Errorf("E! parsing to int: %v", err)
}
}
}
}
if node, ok := tbl.Fields["csv_skip_columns"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
iVal, err := strconv.Atoi(str.Value)
c.CSVSkipColumns = iVal
if err != nil {
return nil, fmt.Errorf("E! parsing to int: %v", err)
}
}
}
}
if node, ok := tbl.Fields["csv_trim_space"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.Boolean); ok {
//for config with no quotes
val, err := strconv.ParseBool(str.Value)
c.CSVTrimSpace = val
if err != nil {
return nil, fmt.Errorf("E! parsing to bool: %v", err)
}
}
}
}
c.MetricName = name
delete(tbl.Fields, "data_format")
@ -1469,6 +1583,14 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
delete(tbl.Fields, "grok_custom_patterns")
delete(tbl.Fields, "grok_custom_pattern_files")
delete(tbl.Fields, "grok_timezone")
delete(tbl.Fields, "csv_data_columns")
delete(tbl.Fields, "csv_tag_columns")
delete(tbl.Fields, "csv_field_columns")
delete(tbl.Fields, "csv_name_column")
delete(tbl.Fields, "csv_timestamp_column")
delete(tbl.Fields, "csv_timestamp_format")
delete(tbl.Fields, "csv_delimiter")
delete(tbl.Fields, "csv_header")
return parsers.NewParser(c)
}

View File

@ -0,0 +1,196 @@
package csv
import (
"bytes"
"encoding/csv"
"fmt"
"strconv"
"strings"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
type Parser struct {
MetricName string
HeaderRowCount int
SkipRows int
SkipColumns int
Delimiter string
Comment string
TrimSpace bool
ColumnNames []string
TagColumns []string
MeasurementColumn string
TimestampColumn string
TimestampFormat string
DefaultTags map[string]string
}
func (p *Parser) compile(r *bytes.Reader) (*csv.Reader, error) {
csvReader := csv.NewReader(r)
// ensures that the reader reads records of different lengths without an error
csvReader.FieldsPerRecord = -1
if p.Delimiter != "" {
csvReader.Comma = []rune(p.Delimiter)[0]
}
if p.Comment != "" {
csvReader.Comment = []rune(p.Comment)[0]
}
return csvReader, nil
}
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
r := bytes.NewReader(buf)
csvReader, err := p.compile(r)
if err != nil {
return nil, err
}
// skip first rows
for i := 0; i < p.SkipRows; i++ {
csvReader.Read()
}
// if there is a header and nothing in DataColumns
// set DataColumns to names extracted from the header
headerNames := make([]string, 0)
if len(p.ColumnNames) == 0 {
for i := 0; i < p.HeaderRowCount; i++ {
header, err := csvReader.Read()
if err != nil {
return nil, err
}
//concatenate header names
for i := range header {
name := header[i]
if p.TrimSpace {
name = strings.Trim(name, " ")
}
if len(headerNames) <= i {
headerNames = append(headerNames, name)
} else {
headerNames[i] = headerNames[i] + name
}
}
}
p.ColumnNames = headerNames[p.SkipColumns:]
} else {
// if columns are named, just skip header rows
for i := 0; i < p.HeaderRowCount; i++ {
csvReader.Read()
}
}
table, err := csvReader.ReadAll()
if err != nil {
return nil, err
}
metrics := make([]telegraf.Metric, 0)
for _, record := range table {
m, err := p.parseRecord(record)
if err != nil {
return metrics, err
}
metrics = append(metrics, m)
}
return metrics, nil
}
// ParseLine does not use any information in header and assumes DataColumns is set
// it will also not skip any rows
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
r := bytes.NewReader([]byte(line))
csvReader, err := p.compile(r)
if err != nil {
return nil, err
}
// if there is nothing in DataColumns, ParseLine will fail
if len(p.ColumnNames) == 0 {
return nil, fmt.Errorf("[parsers.csv] data columns must be specified")
}
record, err := csvReader.Read()
if err != nil {
return nil, err
}
m, err := p.parseRecord(record)
if err != nil {
return nil, err
}
return m, nil
}
func (p *Parser) parseRecord(record []string) (telegraf.Metric, error) {
recordFields := make(map[string]interface{})
tags := make(map[string]string)
// skip columns in record
record = record[p.SkipColumns:]
outer:
for i, fieldName := range p.ColumnNames {
if i < len(record) {
value := record[i]
if p.TrimSpace {
value = strings.Trim(value, " ")
}
for _, tagName := range p.TagColumns {
if tagName == fieldName {
tags[tagName] = value
continue outer
}
}
// attempt type conversions
if iValue, err := strconv.ParseInt(value, 10, 64); err == nil {
recordFields[fieldName] = iValue
} else if fValue, err := strconv.ParseFloat(value, 64); err == nil {
recordFields[fieldName] = fValue
} else if bValue, err := strconv.ParseBool(value); err == nil {
recordFields[fieldName] = bValue
} else {
recordFields[fieldName] = value
}
}
}
// add default tags
for k, v := range p.DefaultTags {
tags[k] = v
}
// will default to plugin name
measurementName := p.MetricName
if recordFields[p.MeasurementColumn] != nil {
measurementName = fmt.Sprintf("%v", recordFields[p.MeasurementColumn])
}
metricTime := time.Now()
if p.TimestampColumn != "" {
if recordFields[p.TimestampColumn] == nil {
return nil, fmt.Errorf("timestamp column: %v could not be found", p.TimestampColumn)
}
tStr := fmt.Sprintf("%v", recordFields[p.TimestampColumn])
if p.TimestampFormat == "" {
return nil, fmt.Errorf("timestamp format must be specified")
}
var err error
metricTime, err = time.Parse(p.TimestampFormat, tStr)
if err != nil {
return nil, err
}
}
m, err := metric.New(measurementName, tags, recordFields, metricTime)
if err != nil {
return nil, err
}
return m, nil
}
func (p *Parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}

View File

@ -0,0 +1,231 @@
package csv
import (
"fmt"
"testing"
"time"
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/require"
)
func TestBasicCSV(t *testing.T) {
p := Parser{
ColumnNames: []string{"first", "second", "third"},
TagColumns: []string{"third"},
}
_, err := p.ParseLine("1.4,true,hi")
require.NoError(t, err)
}
func TestHeaderConcatenationCSV(t *testing.T) {
p := Parser{
HeaderRowCount: 2,
MeasurementColumn: "3",
}
testCSV := `first,second
1,2,3
3.4,70,test_name`
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, "test_name", metrics[0].Name())
}
func TestHeaderOverride(t *testing.T) {
p := Parser{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
}
testCSV := `line1,line2,line3
3.4,70,test_name`
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, "test_name", metrics[0].Name())
}
func TestTimestamp(t *testing.T) {
p := Parser{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimestampColumn: "first",
TimestampFormat: "02/01/06 03:04:05 PM",
}
testCSV := `line1,line2,line3
23/05/09 04:05:06 PM,70,test_name
07/11/09 04:05:06 PM,80,test_name2`
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, metrics[0].Time().UnixNano(), int64(1243094706000000000))
require.Equal(t, metrics[1].Time().UnixNano(), int64(1257609906000000000))
}
func TestTimestampError(t *testing.T) {
p := Parser{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
TimestampColumn: "first",
}
testCSV := `line1,line2,line3
23/05/09 04:05:06 PM,70,test_name
07/11/09 04:05:06 PM,80,test_name2`
_, err := p.Parse([]byte(testCSV))
require.Equal(t, fmt.Errorf("timestamp format must be specified"), err)
}
func TestQuotedCharacter(t *testing.T) {
p := Parser{
HeaderRowCount: 1,
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
}
testCSV := `line1,line2,line3
"3,4",70,test_name`
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, "3,4", metrics[0].Fields()["first"])
}
func TestDelimiter(t *testing.T) {
p := Parser{
HeaderRowCount: 1,
Delimiter: "%",
ColumnNames: []string{"first", "second", "third"},
MeasurementColumn: "third",
}
testCSV := `line1%line2%line3
3,4%70%test_name`
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, "3,4", metrics[0].Fields()["first"])
}
func TestValueConversion(t *testing.T) {
p := Parser{
HeaderRowCount: 0,
Delimiter: ",",
ColumnNames: []string{"first", "second", "third", "fourth"},
MetricName: "test_value",
}
testCSV := `3.3,4,true,hello`
expectedTags := make(map[string]string)
expectedFields := map[string]interface{}{
"first": 3.3,
"second": 4,
"third": true,
"fourth": "hello",
}
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
expectedMetric, err1 := metric.New("test_value", expectedTags, expectedFields, time.Unix(0, 0))
returnedMetric, err2 := metric.New(metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields(), time.Unix(0, 0))
require.NoError(t, err1)
require.NoError(t, err2)
//deep equal fields
require.Equal(t, expectedMetric.Fields(), returnedMetric.Fields())
}
func TestSkipComment(t *testing.T) {
p := Parser{
HeaderRowCount: 0,
Comment: "#",
ColumnNames: []string{"first", "second", "third", "fourth"},
MetricName: "test_value",
}
testCSV := `#3.3,4,true,hello
4,9.9,true,name_this`
expectedFields := map[string]interface{}{
"first": int64(4),
"second": 9.9,
"third": true,
"fourth": "name_this",
}
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, expectedFields, metrics[0].Fields())
}
func TestTrimSpace(t *testing.T) {
p := Parser{
HeaderRowCount: 0,
TrimSpace: true,
ColumnNames: []string{"first", "second", "third", "fourth"},
MetricName: "test_value",
}
testCSV := ` 3.3, 4, true,hello`
expectedFields := map[string]interface{}{
"first": 3.3,
"second": int64(4),
"third": true,
"fourth": "hello",
}
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, expectedFields, metrics[0].Fields())
}
func TestSkipRows(t *testing.T) {
p := Parser{
HeaderRowCount: 1,
SkipRows: 1,
TagColumns: []string{"line1"},
MeasurementColumn: "line3",
}
testCSV := `garbage nonsense
line1,line2,line3
hello,80,test_name2`
expectedFields := map[string]interface{}{
"line2": int64(80),
"line3": "test_name2",
}
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, expectedFields, metrics[0].Fields())
}
func TestSkipColumns(t *testing.T) {
p := Parser{
SkipColumns: 1,
ColumnNames: []string{"line1", "line2"},
}
testCSV := `hello,80,test_name`
expectedFields := map[string]interface{}{
"line1": int64(80),
"line2": "test_name",
}
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, expectedFields, metrics[0].Fields())
}
func TestSkipColumnsWithHeader(t *testing.T) {
p := Parser{
SkipColumns: 1,
HeaderRowCount: 2,
}
testCSV := `col,col,col
1,2,3
trash,80,test_name`
// we should expect an error if we try to get col1
metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, map[string]interface{}{"col2": int64(80), "col3": "test_name"}, metrics[0].Fields())
}

View File

@ -6,6 +6,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers/collectd"
"github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/plugins/parsers/dropwizard"
"github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/grok"
@ -113,6 +114,19 @@ type Config struct {
GrokCustomPatterns string
GrokCustomPatternFiles []string
GrokTimeZone string
//csv configuration
CSVDelimiter string
CSVComment string
CSVTrimSpace bool
CSVColumnNames []string
CSVTagColumns []string
CSVMeasurementColumn string
CSVTimestampColumn string
CSVTimestampFormat string
CSVHeaderRowCount int
CSVSkipRows int
CSVSkipColumns int
}
// NewParser returns a Parser interface based on the given config.
@ -162,6 +176,20 @@ func NewParser(config *Config) (Parser, error) {
config.GrokCustomPatterns,
config.GrokCustomPatternFiles,
config.GrokTimeZone)
case "csv":
parser, err = newCSVParser(config.MetricName,
config.CSVHeaderRowCount,
config.CSVSkipRows,
config.CSVSkipColumns,
config.CSVDelimiter,
config.CSVComment,
config.CSVTrimSpace,
config.CSVColumnNames,
config.CSVTagColumns,
config.CSVMeasurementColumn,
config.CSVTimestampColumn,
config.CSVTimestampFormat,
config.DefaultTags)
case "logfmt":
parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags)
default:
@ -170,6 +198,60 @@ func NewParser(config *Config) (Parser, error) {
return parser, err
}
func newCSVParser(metricName string,
header int,
skipRows int,
skipColumns int,
delimiter string,
comment string,
trimSpace bool,
dataColumns []string,
tagColumns []string,
nameColumn string,
timestampColumn string,
timestampFormat string,
defaultTags map[string]string) (Parser, error) {
if header == 0 && len(dataColumns) == 0 {
// if there is no header and no DataColumns, that's an error
return nil, fmt.Errorf("there must be a header if `csv_data_columns` is not specified")
}
if delimiter != "" {
runeStr := []rune(delimiter)
if len(runeStr) > 1 {
return nil, fmt.Errorf("delimiter must be a single character, got: %s", delimiter)
}
delimiter = fmt.Sprintf("%v", runeStr[0])
}
if comment != "" {
runeStr := []rune(comment)
if len(runeStr) > 1 {
return nil, fmt.Errorf("delimiter must be a single character, got: %s", comment)
}
comment = fmt.Sprintf("%v", runeStr[0])
}
parser := &csv.Parser{
MetricName: metricName,
HeaderRowCount: header,
SkipRows: skipRows,
SkipColumns: skipColumns,
Delimiter: delimiter,
Comment: comment,
TrimSpace: trimSpace,
ColumnNames: dataColumns,
TagColumns: tagColumns,
MeasurementColumn: nameColumn,
TimestampColumn: timestampColumn,
TimestampFormat: timestampFormat,
DefaultTags: defaultTags,
}
return parser, nil
}
func newJSONParser(
metricName string,
tagKeys []string,