Add new config for csv column explicit type conversion (#4781)
This commit is contained in:
parent
9efe7c12f0
commit
a1f9f63463
|
@ -1460,6 +1460,18 @@ func getParserConfig(name string, tbl *ast.Table) (*parsers.Config, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if node, ok := tbl.Fields["csv_column_types"]; ok {
|
||||||
|
if kv, ok := node.(*ast.KeyValue); ok {
|
||||||
|
if ary, ok := kv.Value.(*ast.Array); ok {
|
||||||
|
for _, elem := range ary.Value {
|
||||||
|
if str, ok := elem.(*ast.String); ok {
|
||||||
|
c.CSVColumnTypes = append(c.CSVColumnTypes, str.Value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if node, ok := tbl.Fields["csv_tag_columns"]; ok {
|
if node, ok := tbl.Fields["csv_tag_columns"]; ok {
|
||||||
if kv, ok := node.(*ast.KeyValue); ok {
|
if kv, ok := node.(*ast.KeyValue); ok {
|
||||||
if ary, ok := kv.Value.(*ast.Array); ok {
|
if ary, ok := kv.Value.(*ast.Array); ok {
|
||||||
|
@ -1588,6 +1600,7 @@ func getParserConfig(name string, tbl *ast.Table) (*parsers.Config, error) {
|
||||||
delete(tbl.Fields, "grok_custom_pattern_files")
|
delete(tbl.Fields, "grok_custom_pattern_files")
|
||||||
delete(tbl.Fields, "grok_timezone")
|
delete(tbl.Fields, "grok_timezone")
|
||||||
delete(tbl.Fields, "csv_column_names")
|
delete(tbl.Fields, "csv_column_names")
|
||||||
|
delete(tbl.Fields, "csv_column_types")
|
||||||
delete(tbl.Fields, "csv_comment")
|
delete(tbl.Fields, "csv_comment")
|
||||||
delete(tbl.Fields, "csv_delimiter")
|
delete(tbl.Fields, "csv_delimiter")
|
||||||
delete(tbl.Fields, "csv_field_columns")
|
delete(tbl.Fields, "csv_field_columns")
|
||||||
|
|
|
@ -27,6 +27,11 @@ values.
|
||||||
## If `csv_header_row_count` is set to 0, this config must be used
|
## If `csv_header_row_count` is set to 0, this config must be used
|
||||||
csv_column_names = []
|
csv_column_names = []
|
||||||
|
|
||||||
|
## For assigning explicit data types to columns.
|
||||||
|
## Supported types: "int", "float", "bool", "string".
|
||||||
|
## If this is not specified, type conversion will be done on the types above.
|
||||||
|
csv_column_types = []
|
||||||
|
|
||||||
## Indicates the number of rows to skip before looking for header information.
|
## Indicates the number of rows to skip before looking for header information.
|
||||||
csv_skip_rows = 0
|
csv_skip_rows = 0
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@ type Parser struct {
|
||||||
Comment string
|
Comment string
|
||||||
TrimSpace bool
|
TrimSpace bool
|
||||||
ColumnNames []string
|
ColumnNames []string
|
||||||
|
ColumnTypes []string
|
||||||
TagColumns []string
|
TagColumns []string
|
||||||
MeasurementColumn string
|
MeasurementColumn string
|
||||||
TimestampColumn string
|
TimestampColumn string
|
||||||
|
@ -148,6 +149,40 @@ outer:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Try explicit conversion only when column types is defined.
|
||||||
|
if len(p.ColumnTypes) > 0 {
|
||||||
|
// Throw error if current column count exceeds defined types.
|
||||||
|
if i >= len(p.ColumnTypes) {
|
||||||
|
return nil, fmt.Errorf("column type: column count exceeded")
|
||||||
|
}
|
||||||
|
|
||||||
|
var val interface{}
|
||||||
|
var err error
|
||||||
|
|
||||||
|
switch p.ColumnTypes[i] {
|
||||||
|
case "int":
|
||||||
|
val, err = strconv.ParseInt(value, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("column type: parse int error %s", err)
|
||||||
|
}
|
||||||
|
case "float":
|
||||||
|
val, err = strconv.ParseFloat(value, 64)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("column type: parse float error %s", err)
|
||||||
|
}
|
||||||
|
case "bool":
|
||||||
|
val, err = strconv.ParseBool(value)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("column type: parse bool error %s", err)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
val = value
|
||||||
|
}
|
||||||
|
|
||||||
|
recordFields[fieldName] = val
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// attempt type conversions
|
// attempt type conversions
|
||||||
if iValue, err := strconv.ParseInt(value, 10, 64); err == nil {
|
if iValue, err := strconv.ParseInt(value, 10, 64); err == nil {
|
||||||
recordFields[fieldName] = iValue
|
recordFields[fieldName] = iValue
|
||||||
|
|
|
@ -147,6 +147,18 @@ func TestValueConversion(t *testing.T) {
|
||||||
|
|
||||||
//deep equal fields
|
//deep equal fields
|
||||||
require.Equal(t, expectedMetric.Fields(), returnedMetric.Fields())
|
require.Equal(t, expectedMetric.Fields(), returnedMetric.Fields())
|
||||||
|
|
||||||
|
// Test explicit type conversion.
|
||||||
|
p.ColumnTypes = []string{"float", "int", "bool", "string"}
|
||||||
|
|
||||||
|
metrics, err = p.Parse([]byte(testCSV))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
returnedMetric, err2 = metric.New(metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields(), time.Unix(0, 0))
|
||||||
|
require.NoError(t, err2)
|
||||||
|
|
||||||
|
//deep equal fields
|
||||||
|
require.Equal(t, expectedMetric.Fields(), returnedMetric.Fields())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSkipComment(t *testing.T) {
|
func TestSkipComment(t *testing.T) {
|
||||||
|
|
|
@ -127,6 +127,7 @@ type Config struct {
|
||||||
|
|
||||||
//csv configuration
|
//csv configuration
|
||||||
CSVColumnNames []string `toml:"csv_column_names"`
|
CSVColumnNames []string `toml:"csv_column_names"`
|
||||||
|
CSVColumnTypes []string `toml:"csv_column_types"`
|
||||||
CSVComment string `toml:"csv_comment"`
|
CSVComment string `toml:"csv_comment"`
|
||||||
CSVDelimiter string `toml:"csv_delimiter"`
|
CSVDelimiter string `toml:"csv_delimiter"`
|
||||||
CSVHeaderRowCount int `toml:"csv_header_row_count"`
|
CSVHeaderRowCount int `toml:"csv_header_row_count"`
|
||||||
|
@ -195,6 +196,7 @@ func NewParser(config *Config) (Parser, error) {
|
||||||
config.CSVComment,
|
config.CSVComment,
|
||||||
config.CSVTrimSpace,
|
config.CSVTrimSpace,
|
||||||
config.CSVColumnNames,
|
config.CSVColumnNames,
|
||||||
|
config.CSVColumnTypes,
|
||||||
config.CSVTagColumns,
|
config.CSVTagColumns,
|
||||||
config.CSVMeasurementColumn,
|
config.CSVMeasurementColumn,
|
||||||
config.CSVTimestampColumn,
|
config.CSVTimestampColumn,
|
||||||
|
@ -216,6 +218,7 @@ func newCSVParser(metricName string,
|
||||||
comment string,
|
comment string,
|
||||||
trimSpace bool,
|
trimSpace bool,
|
||||||
columnNames []string,
|
columnNames []string,
|
||||||
|
columnTypes []string,
|
||||||
tagColumns []string,
|
tagColumns []string,
|
||||||
nameColumn string,
|
nameColumn string,
|
||||||
timestampColumn string,
|
timestampColumn string,
|
||||||
|
@ -240,6 +243,10 @@ func newCSVParser(metricName string,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(columnNames) > 0 && len(columnTypes) > 0 && len(columnNames) != len(columnTypes) {
|
||||||
|
return nil, fmt.Errorf("csv_column_names field count doesn't match with csv_column_types")
|
||||||
|
}
|
||||||
|
|
||||||
parser := &csv.Parser{
|
parser := &csv.Parser{
|
||||||
MetricName: metricName,
|
MetricName: metricName,
|
||||||
HeaderRowCount: headerRowCount,
|
HeaderRowCount: headerRowCount,
|
||||||
|
@ -249,6 +256,7 @@ func newCSVParser(metricName string,
|
||||||
Comment: comment,
|
Comment: comment,
|
||||||
TrimSpace: trimSpace,
|
TrimSpace: trimSpace,
|
||||||
ColumnNames: columnNames,
|
ColumnNames: columnNames,
|
||||||
|
ColumnTypes: columnTypes,
|
||||||
TagColumns: tagColumns,
|
TagColumns: tagColumns,
|
||||||
MeasurementColumn: nameColumn,
|
MeasurementColumn: nameColumn,
|
||||||
TimestampColumn: timestampColumn,
|
TimestampColumn: timestampColumn,
|
||||||
|
|
Loading…
Reference in New Issue