From 7f93911f43b5db0e680a247cdee938a9c945feb0 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 23 May 2018 14:29:57 -0700 Subject: [PATCH] Add converter processor (#4178) --- plugins/processors/all/all.go | 1 + plugins/processors/converter/README.md | 57 +++ plugins/processors/converter/converter.go | 430 ++++++++++++++++ .../processors/converter/converter_test.go | 479 ++++++++++++++++++ 4 files changed, 967 insertions(+) create mode 100644 plugins/processors/converter/README.md create mode 100644 plugins/processors/converter/converter.go create mode 100644 plugins/processors/converter/converter_test.go diff --git a/plugins/processors/all/all.go b/plugins/processors/all/all.go index c15f76391..a19e469a9 100644 --- a/plugins/processors/all/all.go +++ b/plugins/processors/all/all.go @@ -1,6 +1,7 @@ package all import ( + _ "github.com/influxdata/telegraf/plugins/processors/converter" _ "github.com/influxdata/telegraf/plugins/processors/override" _ "github.com/influxdata/telegraf/plugins/processors/printer" _ "github.com/influxdata/telegraf/plugins/processors/regex" diff --git a/plugins/processors/converter/README.md b/plugins/processors/converter/README.md new file mode 100644 index 000000000..69c2cc275 --- /dev/null +++ b/plugins/processors/converter/README.md @@ -0,0 +1,57 @@ +# Converter Processor + +The converter processor is used to change the type of tag or field values. In +addition to changing field types it can convert fields to tags and vis versa. + +Values that cannot be converted are dropped. + +**Note:** When converting tags to fields, take care not to ensure the series is still +uniquely identifiable. Fields with the same series key (measurement + tags) +will overwrite one another. + +### Configuration: +```toml +# Convert values to another metric value type +[processors.converter] + ## Tags to convert + ## + ## The table key determines the target type, and the array of key-values + ## select the keys to convert. The array may contain globs. + ## = [...] + [processors.converter.tags] + string = [] + integer = [] + unsigned = [] + boolean = [] + float = [] + + ## Fields to convert + ## + ## The table key determines the target type, and the array of key-values + ## select the keys to convert. The array may contain globs. + ## = [...] + [processors.converter.fields] + tag = [] + string = [] + integer = [] + unsigned = [] + boolean = [] + float = [] +``` + +### Examples: + +```toml +[processors.converter] + [processors.converter.tags] + string = ["port"] + + [processors.converter.fields] + integer = ["scboard_*"] + tag = ["ParseServerConfigGeneration"] +``` + +```diff +- apache,port=80,server=debian-stretch-apache BusyWorkers=1,BytesPerReq=0,BytesPerSec=0,CPUChildrenSystem=0,CPUChildrenUser=0,CPULoad=0.00995025,CPUSystem=0.01,CPUUser=0.01,ConnsAsyncClosing=0,ConnsAsyncKeepAlive=0,ConnsAsyncWriting=0,ConnsTotal=0,IdleWorkers=49,Load1=0.01,Load15=0,Load5=0,ParentServerConfigGeneration=3,ParentServerMPMGeneration=2,ReqPerSec=0.00497512,ServerUptimeSeconds=201,TotalAccesses=1,TotalkBytes=0,Uptime=201,scboard_closing=0,scboard_dnslookup=0,scboard_finishing=0,scboard_idle_cleanup=0,scboard_keepalive=0,scboard_logging=0,scboard_open=100,scboard_reading=0,scboard_sending=1,scboard_starting=0,scboard_waiting=49 1502489900000000000 ++ apache,server=debian-stretch-apache,ParentServerConfigGeneration=3 port="80",BusyWorkers=1,BytesPerReq=0,BytesPerSec=0,CPUChildrenSystem=0,CPUChildrenUser=0,CPULoad=0.00995025,CPUSystem=0.01,CPUUser=0.01,ConnsAsyncClosing=0,ConnsAsyncKeepAlive=0,ConnsAsyncWriting=0,ConnsTotal=0,IdleWorkers=49,Load1=0.01,Load15=0,Load5=0,ParentServerMPMGeneration=2,ReqPerSec=0.00497512,ServerUptimeSeconds=201,TotalAccesses=1,TotalkBytes=0,Uptime=201,scboard_closing=0i,scboard_dnslookup=0i,scboard_finishing=0i,scboard_idle_cleanup=0i,scboard_keepalive=0i,scboard_logging=0i,scboard_open=100i,scboard_reading=0i,scboard_sending=1i,scboard_starting=0i,scboard_waiting=49i 1502489900000000000 +``` diff --git a/plugins/processors/converter/converter.go b/plugins/processors/converter/converter.go new file mode 100644 index 000000000..50fd195e0 --- /dev/null +++ b/plugins/processors/converter/converter.go @@ -0,0 +1,430 @@ +package converter + +import ( + "fmt" + "log" + "math" + "strconv" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" + "github.com/influxdata/telegraf/plugins/processors" +) + +var sampleConfig = ` + ## Tags to convert + ## + ## The table key determines the target type, and the array of key-values + ## select the keys to convert. The array may contain globs. + ## = [...] + [processors.converter.tags] + string = [] + integer = [] + unsigned = [] + boolean = [] + float = [] + + ## Fields to convert + ## + ## The table key determines the target type, and the array of key-values + ## select the keys to convert. The array may contain globs. + ## = [...] + [processors.converter.fields] + tag = [] + string = [] + integer = [] + unsigned = [] + boolean = [] + float = [] +` + +type Conversion struct { + Tag []string `toml:"tag"` + String []string `toml:"string"` + Integer []string `toml:"integer"` + Unsigned []string `toml:"unsigned"` + Boolean []string `toml:"boolean"` + Float []string `toml:"float"` +} + +type Converter struct { + Tags *Conversion `toml:"tags"` + Fields *Conversion `toml:"fields"` + + initialized bool + tagConversions *ConversionFilter + fieldConversions *ConversionFilter +} + +type ConversionFilter struct { + Tag filter.Filter + String filter.Filter + Integer filter.Filter + Unsigned filter.Filter + Boolean filter.Filter + Float filter.Filter +} + +func (p *Converter) SampleConfig() string { + return sampleConfig +} + +func (p *Converter) Description() string { + return "Convert values to another metric value type" +} + +func (p *Converter) Apply(metrics ...telegraf.Metric) []telegraf.Metric { + if !p.initialized { + err := p.compile() + if err != nil { + logPrintf("initialization error: %v\n", err) + return metrics + } + } + + for _, metric := range metrics { + p.convertTags(metric) + p.convertFields(metric) + } + return metrics +} + +func (p *Converter) compile() error { + tf, err := compileFilter(p.Tags) + if err != nil { + return err + } + + ff, err := compileFilter(p.Fields) + if err != nil { + return err + } + + if tf == nil && ff == nil { + return fmt.Errorf("no filters found") + } + + p.tagConversions = tf + p.fieldConversions = ff + p.initialized = true + return nil +} + +func compileFilter(conv *Conversion) (*ConversionFilter, error) { + if conv == nil { + return nil, nil + } + + var err error + cf := &ConversionFilter{} + cf.Tag, err = filter.Compile(conv.Tag) + if err != nil { + return nil, err + } + + cf.String, err = filter.Compile(conv.String) + if err != nil { + return nil, err + } + + cf.Integer, err = filter.Compile(conv.Integer) + if err != nil { + return nil, err + } + + cf.Unsigned, err = filter.Compile(conv.Unsigned) + if err != nil { + return nil, err + } + + cf.Boolean, err = filter.Compile(conv.Boolean) + if err != nil { + return nil, err + } + + cf.Float, err = filter.Compile(conv.Float) + if err != nil { + return nil, err + } + + return cf, nil +} + +// convertTags converts tags into fields +func (p *Converter) convertTags(metric telegraf.Metric) { + if p.tagConversions == nil { + return + } + + for key, value := range metric.Tags() { + if p.tagConversions.String != nil && p.tagConversions.String.Match(key) { + metric.RemoveTag(key) + metric.AddField(key, value) + continue + } + + if p.tagConversions.Integer != nil && p.tagConversions.Integer.Match(key) { + v, ok := toInteger(value) + if !ok { + metric.RemoveTag(key) + logPrintf("error converting to integer [%T]: %v\n", value, value) + continue + } + + metric.RemoveTag(key) + metric.AddField(key, v) + } + + if p.tagConversions.Unsigned != nil && p.tagConversions.Unsigned.Match(key) { + v, ok := toUnsigned(value) + if !ok { + metric.RemoveTag(key) + logPrintf("error converting to unsigned [%T]: %v\n", value, value) + continue + } + + metric.RemoveTag(key) + metric.AddField(key, v) + continue + } + + if p.tagConversions.Boolean != nil && p.tagConversions.Boolean.Match(key) { + v, ok := toBool(value) + if !ok { + metric.RemoveTag(key) + logPrintf("error converting to boolean [%T]: %v\n", value, value) + continue + } + + metric.RemoveTag(key) + metric.AddField(key, v) + continue + } + + if p.tagConversions.Float != nil && p.tagConversions.Float.Match(key) { + v, ok := toFloat(value) + if !ok { + metric.RemoveTag(key) + logPrintf("error converting to float [%T]: %v\n", value, value) + continue + } + + metric.RemoveTag(key) + metric.AddField(key, v) + continue + } + } +} + +// convertFields converts fields into tags or other field types +func (p *Converter) convertFields(metric telegraf.Metric) { + if p.fieldConversions == nil { + return + } + + for key, value := range metric.Fields() { + if p.fieldConversions.Tag != nil && p.fieldConversions.Tag.Match(key) { + v, ok := toString(value) + if !ok { + metric.RemoveField(key) + logPrintf("error converting to tag [%T]: %v\n", value, value) + continue + } + + metric.RemoveField(key) + metric.AddTag(key, v) + continue + } + + if p.fieldConversions.Float != nil && p.fieldConversions.Float.Match(key) { + v, ok := toFloat(value) + if !ok { + metric.RemoveField(key) + logPrintf("error converting to integer [%T]: %v\n", value, value) + continue + } + + metric.RemoveField(key) + metric.AddField(key, v) + continue + } + + if p.fieldConversions.Integer != nil && p.fieldConversions.Integer.Match(key) { + v, ok := toInteger(value) + if !ok { + metric.RemoveField(key) + logPrintf("error converting to integer [%T]: %v\n", value, value) + continue + } + + metric.RemoveField(key) + metric.AddField(key, v) + continue + } + + if p.fieldConversions.Unsigned != nil && p.fieldConversions.Unsigned.Match(key) { + v, ok := toUnsigned(value) + if !ok { + metric.RemoveField(key) + logPrintf("error converting to unsigned [%T]: %v\n", value, value) + continue + } + + metric.RemoveField(key) + metric.AddField(key, v) + continue + } + + if p.fieldConversions.Boolean != nil && p.fieldConversions.Boolean.Match(key) { + v, ok := toBool(value) + if !ok { + metric.RemoveField(key) + logPrintf("error converting to bool [%T]: %v\n", value, value) + continue + } + + metric.RemoveField(key) + metric.AddField(key, v) + continue + } + + if p.fieldConversions.String != nil && p.fieldConversions.String.Match(key) { + v, ok := toString(value) + if !ok { + metric.RemoveField(key) + logPrintf("error converting to string [%T]: %v\n", value, value) + continue + } + + metric.RemoveField(key) + metric.AddField(key, v) + continue + } + } +} + +func toBool(v interface{}) (bool, bool) { + switch value := v.(type) { + case int64, uint64, float64: + if value != 0 { + return true, true + } else { + return false, false + } + case bool: + return value, true + case string: + result, err := strconv.ParseBool(value) + return result, err == nil + } + return false, false +} + +func toInteger(v interface{}) (int64, bool) { + switch value := v.(type) { + case int64: + return value, true + case uint64: + if value <= uint64(math.MaxInt64) { + return int64(value), true + } else { + return math.MaxInt64, true + } + case float64: + if value < float64(math.MinInt64) { + return math.MinInt64, true + } else if value > float64(math.MaxInt64) { + return math.MaxInt64, true + } else { + return int64(value), true + } + case bool: + if value { + return 1, true + } else { + return 0, true + } + case string: + result, err := strconv.ParseInt(value, 10, 64) + return result, err == nil + } + return 0, false +} + +func toUnsigned(v interface{}) (uint64, bool) { + switch value := v.(type) { + case uint64: + return value, true + case int64: + if value < 0 { + return 0, true + } else { + return uint64(value), true + } + case float64: + if value < 0.0 { + return 0, true + } else if value > float64(math.MaxUint64) { + return math.MaxUint64, true + } else { + return uint64(value), true + } + case bool: + if value { + return 1, true + } else { + return 0, true + } + case string: + result, err := strconv.ParseUint(value, 10, 64) + return result, err == nil + } + return 0, false +} + +func toFloat(v interface{}) (float64, bool) { + switch value := v.(type) { + case int64: + return float64(value), true + case uint64: + return float64(value), true + case float64: + return value, true + case bool: + if value { + return 1.0, true + } else { + return 0.0, true + } + case string: + result, err := strconv.ParseFloat(value, 64) + return result, err == nil + } + return 0.0, false +} + +func toString(v interface{}) (string, bool) { + switch value := v.(type) { + case int64: + return strconv.FormatInt(value, 10), true + case uint64: + return strconv.FormatUint(value, 10), true + case float64: + return strconv.FormatFloat(value, 'f', -1, 64), true + case bool: + return strconv.FormatBool(value), true + case string: + return value, true + } + return "", false +} + +func logPrintf(format string, v ...interface{}) { + log.Printf("D! [processors.converter] "+format, v...) +} + +func init() { + processors.Add("converter", func() telegraf.Processor { + return &Converter{} + }) +} diff --git a/plugins/processors/converter/converter_test.go b/plugins/processors/converter/converter_test.go new file mode 100644 index 000000000..76839760d --- /dev/null +++ b/plugins/processors/converter/converter_test.go @@ -0,0 +1,479 @@ +package converter + +import ( + "math" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/stretchr/testify/require" +) + +func Metric(v telegraf.Metric, err error) telegraf.Metric { + if err != nil { + panic(err) + } + return v +} + +func TestConverter(t *testing.T) { + tests := []struct { + name string + converter *Converter + input telegraf.Metric + expected telegraf.Metric + }{ + { + name: "empty", + converter: &Converter{}, + input: Metric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + ), + expected: Metric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + ), + }, + { + name: "from tag", + converter: &Converter{ + Tags: &Conversion{ + String: []string{"string"}, + Integer: []string{"int"}, + Unsigned: []string{"uint"}, + Boolean: []string{"bool"}, + Float: []string{"float"}, + Tag: []string{"tag"}, + }, + }, + input: Metric( + metric.New( + "cpu", + map[string]string{ + "float": "42", + "int": "42", + "uint": "42", + "bool": "true", + "string": "howdy", + "tag": "tag", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ), + ), + expected: Metric( + metric.New( + "cpu", + map[string]string{ + "tag": "tag", + }, + map[string]interface{}{ + "float": 42.0, + "int": int64(42), + "uint": uint64(42), + "bool": true, + "string": "howdy", + }, + time.Unix(0, 0), + ), + ), + }, + { + name: "from tag unconvertible", + converter: &Converter{ + Tags: &Conversion{ + Integer: []string{"int"}, + Unsigned: []string{"uint"}, + Boolean: []string{"bool"}, + Float: []string{"float"}, + }, + }, + input: Metric( + metric.New( + "cpu", + map[string]string{ + "float": "a", + "int": "b", + "uint": "c", + "bool": "maybe", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ), + ), + expected: Metric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{}, + time.Unix(0, 0), + ), + ), + }, + { + name: "from string field", + converter: &Converter{ + Fields: &Conversion{ + String: []string{"a"}, + Integer: []string{"b"}, + Unsigned: []string{"c"}, + Boolean: []string{"d"}, + Float: []string{"e"}, + Tag: []string{"f"}, + }, + }, + input: Metric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "a": "howdy", + "b": "42", + "c": "42", + "d": "true", + "e": "42.0", + "f": "foo", + }, + time.Unix(0, 0), + ), + ), + expected: Metric( + metric.New( + "cpu", + map[string]string{ + "f": "foo", + }, + map[string]interface{}{ + "a": "howdy", + "b": int64(42), + "c": uint64(42), + "d": true, + "e": 42.0, + }, + time.Unix(0, 0), + ), + ), + }, + { + name: "from string field unconvertible", + converter: &Converter{ + Fields: &Conversion{ + Integer: []string{"a"}, + Unsigned: []string{"b"}, + Boolean: []string{"c"}, + Float: []string{"d"}, + }, + }, + input: Metric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "a": "a", + "b": "b", + "c": "c", + "d": "d", + }, + time.Unix(0, 0), + ), + ), + expected: Metric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{}, + time.Unix(0, 0), + ), + ), + }, + { + name: "from integer field", + converter: &Converter{ + Fields: &Conversion{ + String: []string{"a"}, + Integer: []string{"b"}, + Unsigned: []string{"c", "negative_uint"}, + Boolean: []string{"d"}, + Float: []string{"e"}, + Tag: []string{"f"}, + }, + }, + input: Metric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "a": int64(42), + "b": int64(42), + "c": int64(42), + "d": int64(42), + "e": int64(42), + "f": int64(42), + "negative_uint": int64(-42), + }, + time.Unix(0, 0), + ), + ), + expected: Metric( + metric.New( + "cpu", + map[string]string{ + "f": "42", + }, + map[string]interface{}{ + "a": "42", + "b": int64(42), + "c": uint64(42), + "d": true, + "e": 42.0, + "negative_uint": uint64(0), + }, + time.Unix(0, 0), + ), + ), + }, + { + name: "from unsigned field", + converter: &Converter{ + Fields: &Conversion{ + String: []string{"a"}, + Integer: []string{"b", "overflow_int"}, + Unsigned: []string{"c"}, + Boolean: []string{"d"}, + Float: []string{"e"}, + Tag: []string{"f"}, + }, + }, + input: Metric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "a": uint64(42), + "b": uint64(42), + "c": uint64(42), + "d": uint64(42), + "e": uint64(42), + "f": uint64(42), + "overflow_int": uint64(math.MaxUint64), + }, + time.Unix(0, 0), + ), + ), + expected: Metric( + metric.New( + "cpu", + map[string]string{ + "f": "42", + }, + map[string]interface{}{ + "a": "42", + "b": int64(42), + "c": uint64(42), + "d": true, + "e": 42.0, + "overflow_int": int64(math.MaxInt64), + }, + time.Unix(0, 0), + ), + ), + }, + { + name: "out of range for unsigned", + converter: &Converter{ + Fields: &Conversion{ + Unsigned: []string{"a", "b"}, + }, + }, + input: Metric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "a": int64(-42), + "b": math.MaxFloat64, + }, + time.Unix(0, 0), + ), + ), + expected: Metric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "a": uint64(0), + "b": uint64(math.MaxUint64), + }, + time.Unix(0, 0), + ), + ), + }, + { + name: "boolean field", + converter: &Converter{ + Fields: &Conversion{ + String: []string{"a", "af"}, + Integer: []string{"b", "bf"}, + Unsigned: []string{"c", "cf"}, + Boolean: []string{"d", "df"}, + Float: []string{"e", "ef"}, + Tag: []string{"f", "ff"}, + }, + }, + input: Metric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "a": true, + "b": true, + "c": true, + "d": true, + "e": true, + "f": true, + "af": false, + "bf": false, + "cf": false, + "df": false, + "ef": false, + "ff": false, + }, + time.Unix(0, 0), + ), + ), + expected: Metric( + metric.New( + "cpu", + map[string]string{ + "f": "true", + "ff": "false", + }, + map[string]interface{}{ + "a": "true", + "af": "false", + "b": int64(1), + "bf": int64(0), + "c": uint64(1), + "cf": uint64(0), + "d": true, + "df": false, + "e": 1.0, + "ef": 0.0, + }, + time.Unix(0, 0), + ), + ), + }, + { + name: "from float field", + converter: &Converter{ + Fields: &Conversion{ + String: []string{"a"}, + Integer: []string{"b", "too_large_int", "too_small_int"}, + Unsigned: []string{"c", "negative_uint", "too_large_uint", "too_small_uint"}, + Boolean: []string{"d"}, + Float: []string{"e"}, + Tag: []string{"f"}, + }, + }, + input: Metric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "a": 42.0, + "b": 42.0, + "c": 42.0, + "d": 42.0, + "e": 42.0, + "f": 42.0, + "too_large_int": math.MaxFloat64, + "too_large_uint": math.MaxFloat64, + "too_small_int": -math.MaxFloat64, + "too_small_uint": -math.MaxFloat64, + "negative_uint": -42.0, + }, + time.Unix(0, 0), + ), + ), + expected: Metric( + metric.New( + "cpu", + map[string]string{ + "f": "42", + }, + map[string]interface{}{ + "a": "42", + "b": int64(42), + "c": uint64(42), + "d": true, + "e": 42.0, + "too_large_int": int64(math.MaxInt64), + "too_large_uint": uint64(math.MaxUint64), + "too_small_int": int64(math.MinInt64), + "too_small_uint": uint64(0), + "negative_uint": uint64(0), + }, + time.Unix(0, 0), + ), + ), + }, + { + name: "globbing", + converter: &Converter{ + Fields: &Conversion{ + Integer: []string{"int_*"}, + }, + }, + input: Metric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "int_a": "1", + "int_b": "2", + "float_a": 1.0, + }, + time.Unix(0, 0), + ), + ), + expected: Metric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "int_a": int64(1), + "int_b": int64(2), + "float_a": 1.0, + }, + time.Unix(0, 0), + ), + ), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metrics := tt.converter.Apply(tt.input) + + require.Equal(t, 1, len(metrics)) + require.Equal(t, tt.expected.Name(), metrics[0].Name()) + require.Equal(t, tt.expected.Tags(), metrics[0].Tags()) + require.Equal(t, tt.expected.Fields(), metrics[0].Fields()) + require.Equal(t, tt.expected.Time(), metrics[0].Time()) + }) + } +}