diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index 6e1b6a751..7e57d9657 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -12,6 +12,7 @@ Telegraf is able to parse the following input data formats into metrics: 1. [Grok](#grok) 1. [Logfmt](#logfmt) 1. [Wavefront](#wavefront) +1. [CSV](#csv) Telegraf metrics, like InfluxDB [points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/), @@ -107,28 +108,28 @@ but can be overridden using the `name_override` config option. #### JSON Configuration: -The JSON data format supports specifying "tag_keys", "string_keys", and "json_query". -If specified, keys in "tag_keys" and "string_keys" will be searched for in the root-level -and any nested lists of the JSON blob. All int and float values are added to fields by default. -If the key(s) exist, they will be applied as tags or fields to the Telegraf metrics. +The JSON data format supports specifying "tag_keys", "string_keys", and "json_query". +If specified, keys in "tag_keys" and "string_keys" will be searched for in the root-level +and any nested lists of the JSON blob. All int and float values are added to fields by default. +If the key(s) exist, they will be applied as tags or fields to the Telegraf metrics. If "string_keys" is specified, the string will be added as a field. -The "json_query" configuration is a gjson path to an JSON object or -list of JSON objects. If this path leads to an array of values or -single data point an error will be thrown. If this configuration +The "json_query" configuration is a gjson path to an JSON object or +list of JSON objects. If this path leads to an array of values or +single data point an error will be thrown. If this configuration is specified, only the result of the query will be parsed and returned as metrics. The "json_name_key" configuration specifies the key of the field whos value will be added as the metric name. -Object paths are specified using gjson path format, which is denoted by object keys -concatenated with "." to go deeper in nested JSON objects. +Object paths are specified using gjson path format, which is denoted by object keys +concatenated with "." to go deeper in nested JSON objects. Additional information on gjson paths can be found here: https://github.com/tidwall/gjson#path-syntax -The JSON data format also supports extracting time values through the -config "json_time_key" and "json_time_format". If "json_time_key" is set, -"json_time_format" must be specified. The "json_time_key" describes the -name of the field containing time information. The "json_time_format" +The JSON data format also supports extracting time values through the +config "json_time_key" and "json_time_format". If "json_time_key" is set, +"json_time_format" must be specified. The "json_time_key" describes the +name of the field containing time information. The "json_time_format" must be a recognized Go time format. If there is no year provided, the metrics will have the current year. More info on time formats can be found here: https://golang.org/pkg/time/#Parse @@ -161,8 +162,8 @@ For example, if you had this configuration: ## List of field names to extract from JSON and add as string fields # json_string_fields = [] - ## gjson query path to specify a specific chunk of JSON to be parsed with - ## the above configuration. If not specified, the whole file will be parsed. + ## gjson query path to specify a specific chunk of JSON to be parsed with + ## the above configuration. If not specified, the whole file will be parsed. ## gjson query paths are described here: https://github.com/tidwall/gjson#path-syntax # json_query = "" @@ -191,8 +192,8 @@ Your Telegraf metrics would get tagged with "my_tag_1" exec_mycollector,my_tag_1=foo a=5,b_c=6 ``` -If the JSON data is an array, then each element of the array is -parsed with the configured settings. Each resulting metric will +If the JSON data is an array, then each element of the array is +parsed with the configured settings. Each resulting metric will be output with the same timestamp. For example, if the following configuration: @@ -220,7 +221,7 @@ For example, if the following configuration: ## List of field names to extract from JSON and add as string fields # string_fields = [] - ## gjson query path to specify a specific chunk of JSON to be parsed with + ## gjson query path to specify a specific chunk of JSON to be parsed with ## the above configuration. If not specified, the whole file will be parsed # json_query = "" @@ -264,7 +265,7 @@ exec_mycollector,my_tag_1=foo,my_tag_2=baz b_c=6 1136387040000000000 exec_mycollector,my_tag_1=bar,my_tag_2=baz b_c=8 1168527840000000000 ``` -If you want to only use a specific portion of your JSON, use the "json_query" +If you want to only use a specific portion of your JSON, use the "json_query" configuration to specify a path to a JSON object. For example, with the following config: @@ -288,7 +289,7 @@ For example, with the following config: ## List of field names to extract from JSON and add as string fields string_fields = ["last"] - ## gjson query path to specify a specific chunk of JSON to be parsed with + ## gjson query path to specify a specific chunk of JSON to be parsed with ## the above configuration. If not specified, the whole file will be parsed json_query = "obj.friends" @@ -1038,3 +1039,84 @@ There are no additional configuration options for Wavefront Data Format line-pro ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "wavefront" ``` + +# CSV +Parse out metrics from a CSV formatted table. By default, the parser assumes there is no header and +will read data from the first line. If `csv_header_row_count` is set to anything besides 0, the parser +will extract column names from the first number of rows. Headers of more than 1 row will have their +names concatenated together. Any unnamed columns will be ignored by the parser. + +The `csv_skip_rows` config indicates the number of rows to skip before looking for header information or data +to parse. By default, no rows will be skipped. + +The `csv_skip_columns` config indicates the number of columns to be skipped before parsing data. These +columns will not be read out of the header. Naming with the `csv_column_names` will begin at the first +parsed column after skipping the indicated columns. By default, no columns are skipped. + +To assign custom column names, the `csv_column_names` config is available. If the `csv_column_names` +config is used, all columns must be named as additional columns will be ignored. If `csv_header_row_count` +is set to 0, `csv_column_names` must be specified. Names listed in `csv_column_names` will override names extracted +from the header. + +The `csv_tag_columns` and `csv_field_columns` configs are available to add the column data to the metric. +The name used to specify the column is the name in the header, or if specified, the corresponding +name assigned in `csv_column_names`. If neither config is specified, no data will be added to the metric. + +Additional configs are available to dynamically name metrics and set custom timestamps. If the +`csv_column_names` config is specified, the parser will assign the metric name to the value found +in that column. If the `csv_timestamp_column` is specified, the parser will extract the timestamp from +that column. If `csv_timestamp_column` is specified, the `csv_timestamp_format` must also be specified +or an error will be thrown. + +#### CSV Configuration +```toml + data_format = "csv" + + ## Indicates how many rows to treat as a header. By default, the parser assumes + ## there is no header and will parse the first row as data. If set to anything more + ## than 1, column names will be concatenated with the name listed in the next header row. + ## If `csv_column_names` is specified, the column names in header will be overridden. + # csv_header_row_count = 0 + + ## Indicates the number of rows to skip before looking for header information. + # csv_skip_rows = 0 + + ## Indicates the number of columns to skip before looking for data to parse. + ## These columns will be skipped in the header as well. + # csv_skip_columns = 0 + + ## The seperator between csv fields + ## By default, the parser assumes a comma (",") + # csv_delimiter = "," + + ## The character reserved for marking a row as a comment row + ## Commented rows are skipped and not parsed + # csv_comment = "" + + ## If set to true, the parser will remove leading whitespace from fields + ## By default, this is false + # csv_trim_space = false + + ## For assigning custom names to columns + ## If this is specified, all columns should have a name + ## Unnamed columns will be ignored by the parser. + ## If `csv_header_row_count` is set to 0, this config must be used + csv_column_names = [] + + ## Columns listed here will be added as tags. Any other columns + ## will be added as fields. + csv_tag_columns = [] + + ## The column to extract the name of the metric from + ## By default, this is the name of the plugin + ## the `name_override` config overrides this + # csv_measurement_column = "" + + ## The column to extract time information for the metric + ## `csv_timestamp_format` must be specified if this is used + # csv_timestamp_column = "" + + ## The format of time data extracted from `csv_timestamp_column` + ## this must be specified if `csv_timestamp_column` is specified + # csv_timestamp_format = "" + ``` diff --git a/internal/config/config.go b/internal/config/config.go index 5926f6132..c712af85e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1443,6 +1443,120 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { } } + //for csv parser + if node, ok := tbl.Fields["csv_column_names"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + c.CSVColumnNames = append(c.CSVColumnNames, str.Value) + } + } + } + } + } + + if node, ok := tbl.Fields["csv_tag_columns"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + c.CSVTagColumns = append(c.CSVTagColumns, str.Value) + } + } + } + } + } + + if node, ok := tbl.Fields["csv_delimiter"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.CSVDelimiter = str.Value + } + } + } + + if node, ok := tbl.Fields["csv_comment"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.CSVComment = str.Value + } + } + } + + if node, ok := tbl.Fields["csv_measurement_column"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.CSVMeasurementColumn = str.Value + } + } + } + + if node, ok := tbl.Fields["csv_timestamp_column"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.CSVTimestampColumn = str.Value + } + } + } + + if node, ok := tbl.Fields["csv_timestamp_format"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.CSVTimestampFormat = str.Value + } + } + } + + if node, ok := tbl.Fields["csv_header_row_count"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + iVal, err := strconv.Atoi(str.Value) + c.CSVHeaderRowCount = iVal + if err != nil { + return nil, fmt.Errorf("E! parsing to int: %v", err) + } + } + } + } + + if node, ok := tbl.Fields["csv_skip_rows"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + iVal, err := strconv.Atoi(str.Value) + c.CSVSkipRows = iVal + if err != nil { + return nil, fmt.Errorf("E! parsing to int: %v", err) + } + } + } + } + + if node, ok := tbl.Fields["csv_skip_columns"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + iVal, err := strconv.Atoi(str.Value) + c.CSVSkipColumns = iVal + if err != nil { + return nil, fmt.Errorf("E! parsing to int: %v", err) + } + } + } + } + + if node, ok := tbl.Fields["csv_trim_space"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.Boolean); ok { + //for config with no quotes + val, err := strconv.ParseBool(str.Value) + c.CSVTrimSpace = val + if err != nil { + return nil, fmt.Errorf("E! parsing to bool: %v", err) + } + } + } + } + c.MetricName = name delete(tbl.Fields, "data_format") @@ -1469,6 +1583,14 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { delete(tbl.Fields, "grok_custom_patterns") delete(tbl.Fields, "grok_custom_pattern_files") delete(tbl.Fields, "grok_timezone") + delete(tbl.Fields, "csv_data_columns") + delete(tbl.Fields, "csv_tag_columns") + delete(tbl.Fields, "csv_field_columns") + delete(tbl.Fields, "csv_name_column") + delete(tbl.Fields, "csv_timestamp_column") + delete(tbl.Fields, "csv_timestamp_format") + delete(tbl.Fields, "csv_delimiter") + delete(tbl.Fields, "csv_header") return parsers.NewParser(c) } diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go new file mode 100644 index 000000000..9193fbf5b --- /dev/null +++ b/plugins/parsers/csv/parser.go @@ -0,0 +1,196 @@ +package csv + +import ( + "bytes" + "encoding/csv" + "fmt" + "strconv" + "strings" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" +) + +type Parser struct { + MetricName string + HeaderRowCount int + SkipRows int + SkipColumns int + Delimiter string + Comment string + TrimSpace bool + ColumnNames []string + TagColumns []string + MeasurementColumn string + TimestampColumn string + TimestampFormat string + DefaultTags map[string]string +} + +func (p *Parser) compile(r *bytes.Reader) (*csv.Reader, error) { + csvReader := csv.NewReader(r) + // ensures that the reader reads records of different lengths without an error + csvReader.FieldsPerRecord = -1 + if p.Delimiter != "" { + csvReader.Comma = []rune(p.Delimiter)[0] + } + if p.Comment != "" { + csvReader.Comment = []rune(p.Comment)[0] + } + return csvReader, nil +} + +func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { + r := bytes.NewReader(buf) + csvReader, err := p.compile(r) + if err != nil { + return nil, err + } + // skip first rows + for i := 0; i < p.SkipRows; i++ { + csvReader.Read() + } + // if there is a header and nothing in DataColumns + // set DataColumns to names extracted from the header + headerNames := make([]string, 0) + if len(p.ColumnNames) == 0 { + for i := 0; i < p.HeaderRowCount; i++ { + header, err := csvReader.Read() + if err != nil { + return nil, err + } + //concatenate header names + for i := range header { + name := header[i] + if p.TrimSpace { + name = strings.Trim(name, " ") + } + if len(headerNames) <= i { + headerNames = append(headerNames, name) + } else { + headerNames[i] = headerNames[i] + name + } + } + } + p.ColumnNames = headerNames[p.SkipColumns:] + } else { + // if columns are named, just skip header rows + for i := 0; i < p.HeaderRowCount; i++ { + csvReader.Read() + } + } + + table, err := csvReader.ReadAll() + if err != nil { + return nil, err + } + + metrics := make([]telegraf.Metric, 0) + for _, record := range table { + m, err := p.parseRecord(record) + if err != nil { + return metrics, err + } + metrics = append(metrics, m) + } + return metrics, nil +} + +// ParseLine does not use any information in header and assumes DataColumns is set +// it will also not skip any rows +func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { + r := bytes.NewReader([]byte(line)) + csvReader, err := p.compile(r) + if err != nil { + return nil, err + } + + // if there is nothing in DataColumns, ParseLine will fail + if len(p.ColumnNames) == 0 { + return nil, fmt.Errorf("[parsers.csv] data columns must be specified") + } + + record, err := csvReader.Read() + if err != nil { + return nil, err + } + m, err := p.parseRecord(record) + if err != nil { + return nil, err + } + return m, nil +} + +func (p *Parser) parseRecord(record []string) (telegraf.Metric, error) { + recordFields := make(map[string]interface{}) + tags := make(map[string]string) + + // skip columns in record + record = record[p.SkipColumns:] +outer: + for i, fieldName := range p.ColumnNames { + if i < len(record) { + value := record[i] + if p.TrimSpace { + value = strings.Trim(value, " ") + } + + for _, tagName := range p.TagColumns { + if tagName == fieldName { + tags[tagName] = value + continue outer + } + } + + // attempt type conversions + if iValue, err := strconv.ParseInt(value, 10, 64); err == nil { + recordFields[fieldName] = iValue + } else if fValue, err := strconv.ParseFloat(value, 64); err == nil { + recordFields[fieldName] = fValue + } else if bValue, err := strconv.ParseBool(value); err == nil { + recordFields[fieldName] = bValue + } else { + recordFields[fieldName] = value + } + } + } + + // add default tags + for k, v := range p.DefaultTags { + tags[k] = v + } + + // will default to plugin name + measurementName := p.MetricName + if recordFields[p.MeasurementColumn] != nil { + measurementName = fmt.Sprintf("%v", recordFields[p.MeasurementColumn]) + } + + metricTime := time.Now() + if p.TimestampColumn != "" { + if recordFields[p.TimestampColumn] == nil { + return nil, fmt.Errorf("timestamp column: %v could not be found", p.TimestampColumn) + } + tStr := fmt.Sprintf("%v", recordFields[p.TimestampColumn]) + if p.TimestampFormat == "" { + return nil, fmt.Errorf("timestamp format must be specified") + } + + var err error + metricTime, err = time.Parse(p.TimestampFormat, tStr) + if err != nil { + return nil, err + } + } + + m, err := metric.New(measurementName, tags, recordFields, metricTime) + if err != nil { + return nil, err + } + return m, nil +} + +func (p *Parser) SetDefaultTags(tags map[string]string) { + p.DefaultTags = tags +} diff --git a/plugins/parsers/csv/parser_test.go b/plugins/parsers/csv/parser_test.go new file mode 100644 index 000000000..b488a1f16 --- /dev/null +++ b/plugins/parsers/csv/parser_test.go @@ -0,0 +1,231 @@ +package csv + +import ( + "fmt" + "testing" + "time" + + "github.com/influxdata/telegraf/metric" + "github.com/stretchr/testify/require" +) + +func TestBasicCSV(t *testing.T) { + p := Parser{ + ColumnNames: []string{"first", "second", "third"}, + TagColumns: []string{"third"}, + } + + _, err := p.ParseLine("1.4,true,hi") + require.NoError(t, err) +} + +func TestHeaderConcatenationCSV(t *testing.T) { + p := Parser{ + HeaderRowCount: 2, + MeasurementColumn: "3", + } + testCSV := `first,second +1,2,3 +3.4,70,test_name` + + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + require.Equal(t, "test_name", metrics[0].Name()) +} + +func TestHeaderOverride(t *testing.T) { + p := Parser{ + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + MeasurementColumn: "third", + } + testCSV := `line1,line2,line3 +3.4,70,test_name` + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + require.Equal(t, "test_name", metrics[0].Name()) +} + +func TestTimestamp(t *testing.T) { + p := Parser{ + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + MeasurementColumn: "third", + TimestampColumn: "first", + TimestampFormat: "02/01/06 03:04:05 PM", + } + testCSV := `line1,line2,line3 +23/05/09 04:05:06 PM,70,test_name +07/11/09 04:05:06 PM,80,test_name2` + metrics, err := p.Parse([]byte(testCSV)) + + require.NoError(t, err) + require.Equal(t, metrics[0].Time().UnixNano(), int64(1243094706000000000)) + require.Equal(t, metrics[1].Time().UnixNano(), int64(1257609906000000000)) +} + +func TestTimestampError(t *testing.T) { + p := Parser{ + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + MeasurementColumn: "third", + TimestampColumn: "first", + } + testCSV := `line1,line2,line3 +23/05/09 04:05:06 PM,70,test_name +07/11/09 04:05:06 PM,80,test_name2` + _, err := p.Parse([]byte(testCSV)) + require.Equal(t, fmt.Errorf("timestamp format must be specified"), err) +} + +func TestQuotedCharacter(t *testing.T) { + p := Parser{ + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + MeasurementColumn: "third", + } + + testCSV := `line1,line2,line3 +"3,4",70,test_name` + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + require.Equal(t, "3,4", metrics[0].Fields()["first"]) +} + +func TestDelimiter(t *testing.T) { + p := Parser{ + HeaderRowCount: 1, + Delimiter: "%", + ColumnNames: []string{"first", "second", "third"}, + MeasurementColumn: "third", + } + + testCSV := `line1%line2%line3 +3,4%70%test_name` + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + require.Equal(t, "3,4", metrics[0].Fields()["first"]) +} + +func TestValueConversion(t *testing.T) { + p := Parser{ + HeaderRowCount: 0, + Delimiter: ",", + ColumnNames: []string{"first", "second", "third", "fourth"}, + MetricName: "test_value", + } + testCSV := `3.3,4,true,hello` + + expectedTags := make(map[string]string) + expectedFields := map[string]interface{}{ + "first": 3.3, + "second": 4, + "third": true, + "fourth": "hello", + } + + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + + expectedMetric, err1 := metric.New("test_value", expectedTags, expectedFields, time.Unix(0, 0)) + returnedMetric, err2 := metric.New(metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields(), time.Unix(0, 0)) + require.NoError(t, err1) + require.NoError(t, err2) + + //deep equal fields + require.Equal(t, expectedMetric.Fields(), returnedMetric.Fields()) +} + +func TestSkipComment(t *testing.T) { + p := Parser{ + HeaderRowCount: 0, + Comment: "#", + ColumnNames: []string{"first", "second", "third", "fourth"}, + MetricName: "test_value", + } + testCSV := `#3.3,4,true,hello +4,9.9,true,name_this` + + expectedFields := map[string]interface{}{ + "first": int64(4), + "second": 9.9, + "third": true, + "fourth": "name_this", + } + + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + require.Equal(t, expectedFields, metrics[0].Fields()) +} + +func TestTrimSpace(t *testing.T) { + p := Parser{ + HeaderRowCount: 0, + TrimSpace: true, + ColumnNames: []string{"first", "second", "third", "fourth"}, + MetricName: "test_value", + } + testCSV := ` 3.3, 4, true,hello` + + expectedFields := map[string]interface{}{ + "first": 3.3, + "second": int64(4), + "third": true, + "fourth": "hello", + } + + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + require.Equal(t, expectedFields, metrics[0].Fields()) +} + +func TestSkipRows(t *testing.T) { + p := Parser{ + HeaderRowCount: 1, + SkipRows: 1, + TagColumns: []string{"line1"}, + MeasurementColumn: "line3", + } + testCSV := `garbage nonsense +line1,line2,line3 +hello,80,test_name2` + + expectedFields := map[string]interface{}{ + "line2": int64(80), + "line3": "test_name2", + } + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + require.Equal(t, expectedFields, metrics[0].Fields()) +} + +func TestSkipColumns(t *testing.T) { + p := Parser{ + SkipColumns: 1, + ColumnNames: []string{"line1", "line2"}, + } + testCSV := `hello,80,test_name` + + expectedFields := map[string]interface{}{ + "line1": int64(80), + "line2": "test_name", + } + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + require.Equal(t, expectedFields, metrics[0].Fields()) +} + +func TestSkipColumnsWithHeader(t *testing.T) { + p := Parser{ + SkipColumns: 1, + HeaderRowCount: 2, + } + testCSV := `col,col,col + 1,2,3 + trash,80,test_name` + + // we should expect an error if we try to get col1 + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + require.Equal(t, map[string]interface{}{"col2": int64(80), "col3": "test_name"}, metrics[0].Fields()) +} diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 89fdc9a10..32027e417 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -6,6 +6,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/parsers/collectd" + "github.com/influxdata/telegraf/plugins/parsers/csv" "github.com/influxdata/telegraf/plugins/parsers/dropwizard" "github.com/influxdata/telegraf/plugins/parsers/graphite" "github.com/influxdata/telegraf/plugins/parsers/grok" @@ -113,6 +114,19 @@ type Config struct { GrokCustomPatterns string GrokCustomPatternFiles []string GrokTimeZone string + + //csv configuration + CSVDelimiter string + CSVComment string + CSVTrimSpace bool + CSVColumnNames []string + CSVTagColumns []string + CSVMeasurementColumn string + CSVTimestampColumn string + CSVTimestampFormat string + CSVHeaderRowCount int + CSVSkipRows int + CSVSkipColumns int } // NewParser returns a Parser interface based on the given config. @@ -162,6 +176,20 @@ func NewParser(config *Config) (Parser, error) { config.GrokCustomPatterns, config.GrokCustomPatternFiles, config.GrokTimeZone) + case "csv": + parser, err = newCSVParser(config.MetricName, + config.CSVHeaderRowCount, + config.CSVSkipRows, + config.CSVSkipColumns, + config.CSVDelimiter, + config.CSVComment, + config.CSVTrimSpace, + config.CSVColumnNames, + config.CSVTagColumns, + config.CSVMeasurementColumn, + config.CSVTimestampColumn, + config.CSVTimestampFormat, + config.DefaultTags) case "logfmt": parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags) default: @@ -170,6 +198,60 @@ func NewParser(config *Config) (Parser, error) { return parser, err } +func newCSVParser(metricName string, + header int, + skipRows int, + skipColumns int, + delimiter string, + comment string, + trimSpace bool, + dataColumns []string, + tagColumns []string, + nameColumn string, + timestampColumn string, + timestampFormat string, + defaultTags map[string]string) (Parser, error) { + + if header == 0 && len(dataColumns) == 0 { + // if there is no header and no DataColumns, that's an error + return nil, fmt.Errorf("there must be a header if `csv_data_columns` is not specified") + } + + if delimiter != "" { + runeStr := []rune(delimiter) + if len(runeStr) > 1 { + return nil, fmt.Errorf("delimiter must be a single character, got: %s", delimiter) + } + delimiter = fmt.Sprintf("%v", runeStr[0]) + } + + if comment != "" { + runeStr := []rune(comment) + if len(runeStr) > 1 { + return nil, fmt.Errorf("delimiter must be a single character, got: %s", comment) + } + comment = fmt.Sprintf("%v", runeStr[0]) + } + + parser := &csv.Parser{ + MetricName: metricName, + HeaderRowCount: header, + SkipRows: skipRows, + SkipColumns: skipColumns, + Delimiter: delimiter, + Comment: comment, + TrimSpace: trimSpace, + ColumnNames: dataColumns, + TagColumns: tagColumns, + MeasurementColumn: nameColumn, + TimestampColumn: timestampColumn, + TimestampFormat: timestampFormat, + DefaultTags: defaultTags, + } + + return parser, nil +} + func newJSONParser( metricName string, tagKeys []string,