diff --git a/internal/config/config.go b/internal/config/config.go index 4382e0cbe..36027834b 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1460,6 +1460,18 @@ func getParserConfig(name string, tbl *ast.Table) (*parsers.Config, error) { } } + if node, ok := tbl.Fields["csv_column_types"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + c.CSVColumnTypes = append(c.CSVColumnTypes, str.Value) + } + } + } + } + } + if node, ok := tbl.Fields["csv_tag_columns"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if ary, ok := kv.Value.(*ast.Array); ok { @@ -1588,6 +1600,7 @@ func getParserConfig(name string, tbl *ast.Table) (*parsers.Config, error) { delete(tbl.Fields, "grok_custom_pattern_files") delete(tbl.Fields, "grok_timezone") delete(tbl.Fields, "csv_column_names") + delete(tbl.Fields, "csv_column_types") delete(tbl.Fields, "csv_comment") delete(tbl.Fields, "csv_delimiter") delete(tbl.Fields, "csv_field_columns") diff --git a/plugins/parsers/csv/README.md b/plugins/parsers/csv/README.md index 532980991..e4cfbfc37 100644 --- a/plugins/parsers/csv/README.md +++ b/plugins/parsers/csv/README.md @@ -27,6 +27,11 @@ values. ## If `csv_header_row_count` is set to 0, this config must be used csv_column_names = [] + ## For assigning explicit data types to columns. + ## Supported types: "int", "float", "bool", "string". + ## If this is not specified, type conversion will be done on the types above. + csv_column_types = [] + ## Indicates the number of rows to skip before looking for header information. csv_skip_rows = 0 diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index 8e0b8b47e..f18068eb7 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -21,6 +21,7 @@ type Parser struct { Comment string TrimSpace bool ColumnNames []string + ColumnTypes []string TagColumns []string MeasurementColumn string TimestampColumn string @@ -148,6 +149,40 @@ outer: } } + // Try explicit conversion only when column types is defined. + if len(p.ColumnTypes) > 0 { + // Throw error if current column count exceeds defined types. + if i >= len(p.ColumnTypes) { + return nil, fmt.Errorf("column type: column count exceeded") + } + + var val interface{} + var err error + + switch p.ColumnTypes[i] { + case "int": + val, err = strconv.ParseInt(value, 10, 64) + if err != nil { + return nil, fmt.Errorf("column type: parse int error %s", err) + } + case "float": + val, err = strconv.ParseFloat(value, 64) + if err != nil { + return nil, fmt.Errorf("column type: parse float error %s", err) + } + case "bool": + val, err = strconv.ParseBool(value) + if err != nil { + return nil, fmt.Errorf("column type: parse bool error %s", err) + } + default: + val = value + } + + recordFields[fieldName] = val + continue + } + // attempt type conversions if iValue, err := strconv.ParseInt(value, 10, 64); err == nil { recordFields[fieldName] = iValue diff --git a/plugins/parsers/csv/parser_test.go b/plugins/parsers/csv/parser_test.go index e3668d3ac..eff6f953f 100644 --- a/plugins/parsers/csv/parser_test.go +++ b/plugins/parsers/csv/parser_test.go @@ -147,6 +147,18 @@ func TestValueConversion(t *testing.T) { //deep equal fields require.Equal(t, expectedMetric.Fields(), returnedMetric.Fields()) + + // Test explicit type conversion. + p.ColumnTypes = []string{"float", "int", "bool", "string"} + + metrics, err = p.Parse([]byte(testCSV)) + require.NoError(t, err) + + returnedMetric, err2 = metric.New(metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields(), time.Unix(0, 0)) + require.NoError(t, err2) + + //deep equal fields + require.Equal(t, expectedMetric.Fields(), returnedMetric.Fields()) } func TestSkipComment(t *testing.T) { diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 8f972fb1b..c3e4b1cbf 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -127,6 +127,7 @@ type Config struct { //csv configuration CSVColumnNames []string `toml:"csv_column_names"` + CSVColumnTypes []string `toml:"csv_column_types"` CSVComment string `toml:"csv_comment"` CSVDelimiter string `toml:"csv_delimiter"` CSVHeaderRowCount int `toml:"csv_header_row_count"` @@ -195,6 +196,7 @@ func NewParser(config *Config) (Parser, error) { config.CSVComment, config.CSVTrimSpace, config.CSVColumnNames, + config.CSVColumnTypes, config.CSVTagColumns, config.CSVMeasurementColumn, config.CSVTimestampColumn, @@ -216,6 +218,7 @@ func newCSVParser(metricName string, comment string, trimSpace bool, columnNames []string, + columnTypes []string, tagColumns []string, nameColumn string, timestampColumn string, @@ -240,6 +243,10 @@ func newCSVParser(metricName string, } } + if len(columnNames) > 0 && len(columnTypes) > 0 && len(columnNames) != len(columnTypes) { + return nil, fmt.Errorf("csv_column_names field count doesn't match with csv_column_types") + } + parser := &csv.Parser{ MetricName: metricName, HeaderRowCount: headerRowCount, @@ -249,6 +256,7 @@ func newCSVParser(metricName string, Comment: comment, TrimSpace: trimSpace, ColumnNames: columnNames, + ColumnTypes: columnTypes, TagColumns: tagColumns, MeasurementColumn: nameColumn, TimestampColumn: timestampColumn,