From ca65d52c9a8c19b8547973e0b835911d23091bce Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Mon, 9 Mar 2020 14:08:38 -0700 Subject: [PATCH] Add support for converting tag or field to measurement in converter processor (#7049) --- plugins/processors/converter/README.md | 39 +- plugins/processors/converter/converter.go | 114 +++-- .../processors/converter/converter_test.go | 467 ++++++++++-------- 3 files changed, 342 insertions(+), 278 deletions(-) diff --git a/plugins/processors/converter/README.md b/plugins/processors/converter/README.md index d56985d84..d916c8764 100644 --- a/plugins/processors/converter/README.md +++ b/plugins/processors/converter/README.md @@ -9,7 +9,7 @@ Values that cannot be converted are dropped. uniquely identifiable. Fields with the same series key (measurement + tags) will overwrite one another. -### Configuration: +### Configuration ```toml # Convert values to another metric value type [[processors.converter]] @@ -19,6 +19,7 @@ will overwrite one another. ## select the keys to convert. The array may contain globs. ## = [...] [processors.converter.tags] + measurement = [] string = [] integer = [] unsigned = [] @@ -31,6 +32,7 @@ will overwrite one another. ## select the keys to convert. The array may contain globs. ## = [...] [processors.converter.fields] + measurement = [] tag = [] string = [] integer = [] @@ -39,19 +41,40 @@ will overwrite one another. float = [] ``` -### Examples: +### Example +Convert `port` tag to a string field: ```toml [[processors.converter]] [processors.converter.tags] string = ["port"] - - [processors.converter.fields] - integer = ["scboard_*"] - tag = ["ParentServerConfigGeneration"] ``` ```diff -- apache,port=80,server=debian-stretch-apache BusyWorkers=1,BytesPerReq=0,BytesPerSec=0,CPUChildrenSystem=0,CPUChildrenUser=0,CPULoad=0.00995025,CPUSystem=0.01,CPUUser=0.01,ConnsAsyncClosing=0,ConnsAsyncKeepAlive=0,ConnsAsyncWriting=0,ConnsTotal=0,IdleWorkers=49,Load1=0.01,Load15=0,Load5=0,ParentServerConfigGeneration=3,ParentServerMPMGeneration=2,ReqPerSec=0.00497512,ServerUptimeSeconds=201,TotalAccesses=1,TotalkBytes=0,Uptime=201,scboard_closing=0,scboard_dnslookup=0,scboard_finishing=0,scboard_idle_cleanup=0,scboard_keepalive=0,scboard_logging=0,scboard_open=100,scboard_reading=0,scboard_sending=1,scboard_starting=0,scboard_waiting=49 1502489900000000000 -+ apache,server=debian-stretch-apache,ParentServerConfigGeneration=3 port="80",BusyWorkers=1,BytesPerReq=0,BytesPerSec=0,CPUChildrenSystem=0,CPUChildrenUser=0,CPULoad=0.00995025,CPUSystem=0.01,CPUUser=0.01,ConnsAsyncClosing=0,ConnsAsyncKeepAlive=0,ConnsAsyncWriting=0,ConnsTotal=0,IdleWorkers=49,Load1=0.01,Load15=0,Load5=0,ParentServerMPMGeneration=2,ReqPerSec=0.00497512,ServerUptimeSeconds=201,TotalAccesses=1,TotalkBytes=0,Uptime=201,scboard_closing=0i,scboard_dnslookup=0i,scboard_finishing=0i,scboard_idle_cleanup=0i,scboard_keepalive=0i,scboard_logging=0i,scboard_open=100i,scboard_reading=0i,scboard_sending=1i,scboard_starting=0i,scboard_waiting=49i 1502489900000000000 +- apache,port=80,server=debian-stretch-apache BusyWorkers=1,BytesPerReq=0 ++ apache,server=debian-stretch-apache port="80",BusyWorkers=1,BytesPerReq=0 +``` + +Convert all `scboard_*` fields to an integer: +```toml +[[processors.converter]] + [processors.converter.fields] + integer = ["scboard_*"] +``` + +```diff +- apache scboard_closing=0,scboard_dnslookup=0,scboard_finishing=0,scboard_idle_cleanup=0,scboard_keepalive=0,scboard_logging=0,scboard_open=100,scboard_reading=0,scboard_sending=1,scboard_starting=0,scboard_waiting=49 ++ apache scboard_closing=0i,scboard_dnslookup=0i,scboard_finishing=0i,scboard_idle_cleanup=0i,scboard_keepalive=0i,scboard_logging=0i,scboard_open=100i,scboard_reading=0i,scboard_sending=1i,scboard_starting=0i,scboard_waiting=49i +``` + +Rename the measurement from a tag value: +```toml +[[processors.converter]] + [processors.converter.tags] + measurement = ["topic"] +``` + +```diff +- mqtt_consumer,topic=sensor temp=42 ++ sensor temp=42 ``` diff --git a/plugins/processors/converter/converter.go b/plugins/processors/converter/converter.go index bf9b851fb..33f2e43c0 100644 --- a/plugins/processors/converter/converter.go +++ b/plugins/processors/converter/converter.go @@ -2,7 +2,6 @@ package converter import ( "fmt" - "log" "math" "strconv" @@ -18,6 +17,7 @@ var sampleConfig = ` ## select the keys to convert. The array may contain globs. ## = [...] [processors.converter.tags] + measurement = [] string = [] integer = [] unsigned = [] @@ -30,6 +30,7 @@ var sampleConfig = ` ## select the keys to convert. The array may contain globs. ## = [...] [processors.converter.fields] + measurement = [] tag = [] string = [] integer = [] @@ -39,30 +40,32 @@ var sampleConfig = ` ` type Conversion struct { - Tag []string `toml:"tag"` - String []string `toml:"string"` - Integer []string `toml:"integer"` - Unsigned []string `toml:"unsigned"` - Boolean []string `toml:"boolean"` - Float []string `toml:"float"` + Measurement []string `toml:"measurement"` + Tag []string `toml:"tag"` + String []string `toml:"string"` + Integer []string `toml:"integer"` + Unsigned []string `toml:"unsigned"` + Boolean []string `toml:"boolean"` + Float []string `toml:"float"` } type Converter struct { - Tags *Conversion `toml:"tags"` - Fields *Conversion `toml:"fields"` + Tags *Conversion `toml:"tags"` + Fields *Conversion `toml:"fields"` + Log telegraf.Logger `toml:"-"` - initialized bool tagConversions *ConversionFilter fieldConversions *ConversionFilter } type ConversionFilter struct { - Tag filter.Filter - String filter.Filter - Integer filter.Filter - Unsigned filter.Filter - Boolean filter.Filter - Float filter.Filter + Measurement filter.Filter + Tag filter.Filter + String filter.Filter + Integer filter.Filter + Unsigned filter.Filter + Boolean filter.Filter + Float filter.Filter } func (p *Converter) SampleConfig() string { @@ -73,15 +76,11 @@ func (p *Converter) Description() string { return "Convert values to another metric value type" } -func (p *Converter) Apply(metrics ...telegraf.Metric) []telegraf.Metric { - if !p.initialized { - err := p.compile() - if err != nil { - logPrintf("initialization error: %v\n", err) - return metrics - } - } +func (p *Converter) Init() error { + return p.compile() +} +func (p *Converter) Apply(metrics ...telegraf.Metric) []telegraf.Metric { for _, metric := range metrics { p.convertTags(metric) p.convertFields(metric) @@ -106,7 +105,6 @@ func (p *Converter) compile() error { p.tagConversions = tf p.fieldConversions = ff - p.initialized = true return nil } @@ -117,6 +115,11 @@ func compileFilter(conv *Conversion) (*ConversionFilter, error) { var err error cf := &ConversionFilter{} + cf.Measurement, err = filter.Compile(conv.Measurement) + if err != nil { + return nil, err + } + cf.Tag, err = filter.Compile(conv.Tag) if err != nil { return nil, err @@ -150,13 +153,19 @@ func compileFilter(conv *Conversion) (*ConversionFilter, error) { return cf, nil } -// convertTags converts tags into fields +// convertTags converts tags into measurements or fields. func (p *Converter) convertTags(metric telegraf.Metric) { if p.tagConversions == nil { return } for key, value := range metric.Tags() { + if p.tagConversions.Measurement != nil && p.tagConversions.Measurement.Match(key) { + metric.RemoveTag(key) + metric.SetName(value) + continue + } + if p.tagConversions.String != nil && p.tagConversions.String.Match(key) { metric.RemoveTag(key) metric.AddField(key, value) @@ -167,7 +176,7 @@ func (p *Converter) convertTags(metric telegraf.Metric) { v, ok := toInteger(value) if !ok { metric.RemoveTag(key) - logPrintf("error converting to integer [%T]: %v\n", value, value) + p.Log.Errorf("error converting to integer [%T]: %v", value, value) continue } @@ -179,7 +188,7 @@ func (p *Converter) convertTags(metric telegraf.Metric) { v, ok := toUnsigned(value) if !ok { metric.RemoveTag(key) - logPrintf("error converting to unsigned [%T]: %v\n", value, value) + p.Log.Errorf("error converting to unsigned [%T]: %v", value, value) continue } @@ -192,7 +201,7 @@ func (p *Converter) convertTags(metric telegraf.Metric) { v, ok := toBool(value) if !ok { metric.RemoveTag(key) - logPrintf("error converting to boolean [%T]: %v\n", value, value) + p.Log.Errorf("error converting to boolean [%T]: %v", value, value) continue } @@ -205,7 +214,7 @@ func (p *Converter) convertTags(metric telegraf.Metric) { v, ok := toFloat(value) if !ok { metric.RemoveTag(key) - logPrintf("error converting to float [%T]: %v\n", value, value) + p.Log.Errorf("error converting to float [%T]: %v", value, value) continue } @@ -216,18 +225,31 @@ func (p *Converter) convertTags(metric telegraf.Metric) { } } -// convertFields converts fields into tags or other field types +// convertFields converts fields into measurements, tags, or other field types. func (p *Converter) convertFields(metric telegraf.Metric) { if p.fieldConversions == nil { return } for key, value := range metric.Fields() { + if p.fieldConversions.Measurement != nil && p.fieldConversions.Measurement.Match(key) { + v, ok := toString(value) + if !ok { + metric.RemoveField(key) + p.Log.Errorf("error converting to measurement [%T]: %v", value, value) + continue + } + + metric.RemoveField(key) + metric.SetName(v) + continue + } + if p.fieldConversions.Tag != nil && p.fieldConversions.Tag.Match(key) { v, ok := toString(value) if !ok { metric.RemoveField(key) - logPrintf("error converting to tag [%T]: %v\n", value, value) + p.Log.Errorf("error converting to tag [%T]: %v", value, value) continue } @@ -240,7 +262,7 @@ func (p *Converter) convertFields(metric telegraf.Metric) { v, ok := toFloat(value) if !ok { metric.RemoveField(key) - logPrintf("error converting to float [%T]: %v\n", value, value) + p.Log.Errorf("error converting to float [%T]: %v", value, value) continue } @@ -253,7 +275,7 @@ func (p *Converter) convertFields(metric telegraf.Metric) { v, ok := toInteger(value) if !ok { metric.RemoveField(key) - logPrintf("error converting to integer [%T]: %v\n", value, value) + p.Log.Errorf("error converting to integer [%T]: %v", value, value) continue } @@ -266,7 +288,7 @@ func (p *Converter) convertFields(metric telegraf.Metric) { v, ok := toUnsigned(value) if !ok { metric.RemoveField(key) - logPrintf("error converting to unsigned [%T]: %v\n", value, value) + p.Log.Errorf("error converting to unsigned [%T]: %v", value, value) continue } @@ -279,7 +301,7 @@ func (p *Converter) convertFields(metric telegraf.Metric) { v, ok := toBool(value) if !ok { metric.RemoveField(key) - logPrintf("error converting to bool [%T]: %v\n", value, value) + p.Log.Errorf("error converting to bool [%T]: %v", value, value) continue } @@ -292,7 +314,7 @@ func (p *Converter) convertFields(metric telegraf.Metric) { v, ok := toString(value) if !ok { metric.RemoveField(key) - logPrintf("error converting to string [%T]: %v\n", value, value) + p.Log.Errorf("Error converting to string [%T]: %v", value, value) continue } @@ -336,7 +358,7 @@ func toInteger(v interface{}) (int64, bool) { } else if value > float64(math.MaxInt64) { return math.MaxInt64, true } else { - return int64(Round(value)), true + return int64(math.Round(value)), true } case bool: if value { @@ -375,7 +397,7 @@ func toUnsigned(v interface{}) (uint64, bool) { } else if value > float64(math.MaxUint64) { return math.MaxUint64, true } else { - return uint64(Round(value)), true + return uint64(math.Round(value)), true } case bool: if value { @@ -435,20 +457,6 @@ func toString(v interface{}) (string, bool) { return "", false } -// math.Round was not added until Go 1.10, can be removed when support for Go -// 1.9 is dropped. -func Round(x float64) float64 { - t := math.Trunc(x) - if math.Abs(x-t) >= 0.5 { - return t + math.Copysign(1, x) - } - return t -} - -func logPrintf(format string, v ...interface{}) { - log.Printf("D! [processors.converter] "+format, v...) -} - func init() { processors.Add("converter", func() telegraf.Processor { return &Converter{} diff --git a/plugins/processors/converter/converter_test.go b/plugins/processors/converter/converter_test.go index 1d60a40fb..1310e698a 100644 --- a/plugins/processors/converter/converter_test.go +++ b/plugins/processors/converter/converter_test.go @@ -6,48 +6,17 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) -func Metric(v telegraf.Metric, err error) telegraf.Metric { - if err != nil { - panic(err) - } - return v -} - func TestConverter(t *testing.T) { tests := []struct { name string converter *Converter input telegraf.Metric - expected telegraf.Metric + expected []telegraf.Metric }{ - { - name: "empty", - converter: &Converter{}, - input: Metric( - metric.New( - "cpu", - map[string]string{}, - map[string]interface{}{ - "value": 42.0, - }, - time.Unix(0, 0), - ), - ), - expected: Metric( - metric.New( - "cpu", - map[string]string{}, - map[string]interface{}{ - "value": 42.0, - }, - time.Unix(0, 0), - ), - ), - }, { name: "from tag", converter: &Converter{ @@ -60,23 +29,21 @@ func TestConverter(t *testing.T) { Tag: []string{"tag"}, }, }, - input: Metric( - metric.New( - "cpu", - map[string]string{ - "float": "42", - "int": "42", - "uint": "42", - "bool": "true", - "string": "howdy", - "tag": "tag", - }, - map[string]interface{}{}, - time.Unix(0, 0), - ), + input: testutil.MustMetric( + "cpu", + map[string]string{ + "float": "42", + "int": "42", + "uint": "42", + "bool": "true", + "string": "howdy", + "tag": "tag", + }, + map[string]interface{}{}, + time.Unix(0, 0), ), - expected: Metric( - metric.New( + expected: []telegraf.Metric{ + testutil.MustMetric( "cpu", map[string]string{ "tag": "tag", @@ -90,7 +57,7 @@ func TestConverter(t *testing.T) { }, time.Unix(0, 0), ), - ), + }, }, { name: "from tag unconvertible", @@ -102,27 +69,25 @@ func TestConverter(t *testing.T) { Float: []string{"float"}, }, }, - input: Metric( - metric.New( - "cpu", - map[string]string{ - "float": "a", - "int": "b", - "uint": "c", - "bool": "maybe", - }, - map[string]interface{}{}, - time.Unix(0, 0), - ), + input: testutil.MustMetric( + "cpu", + map[string]string{ + "float": "a", + "int": "b", + "uint": "c", + "bool": "maybe", + }, + map[string]interface{}{}, + time.Unix(0, 0), ), - expected: Metric( - metric.New( + expected: []telegraf.Metric{ + testutil.MustMetric( "cpu", map[string]string{}, map[string]interface{}{}, time.Unix(0, 0), ), - ), + }, }, { name: "from string field", @@ -136,29 +101,27 @@ func TestConverter(t *testing.T) { Tag: []string{"f"}, }, }, - input: Metric( - metric.New( - "cpu", - map[string]string{}, - map[string]interface{}{ - "a": "howdy", - "b": "42", - "b1": "42.2", - "b2": "42.5", - "b3": "0x2A", - "c": "42", - "c1": "42.2", - "c2": "42.5", - "c3": "0x2A", - "d": "true", - "e": "42.0", - "f": "foo", - }, - time.Unix(0, 0), - ), + input: testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "a": "howdy", + "b": "42", + "b1": "42.2", + "b2": "42.5", + "b3": "0x2A", + "c": "42", + "c1": "42.2", + "c2": "42.5", + "c3": "0x2A", + "d": "true", + "e": "42.0", + "f": "foo", + }, + time.Unix(0, 0), ), - expected: Metric( - metric.New( + expected: []telegraf.Metric{ + testutil.MustMetric( "cpu", map[string]string{ "f": "foo", @@ -178,7 +141,7 @@ func TestConverter(t *testing.T) { }, time.Unix(0, 0), ), - ), + }, }, { name: "from string field unconvertible", @@ -190,27 +153,25 @@ func TestConverter(t *testing.T) { Float: []string{"d"}, }, }, - input: Metric( - metric.New( - "cpu", - map[string]string{}, - map[string]interface{}{ - "a": "a", - "b": "b", - "c": "c", - "d": "d", - }, - time.Unix(0, 0), - ), + input: testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "a": "a", + "b": "b", + "c": "c", + "d": "d", + }, + time.Unix(0, 0), ), - expected: Metric( - metric.New( + expected: []telegraf.Metric{ + testutil.MustMetric( "cpu", map[string]string{}, map[string]interface{}{}, time.Unix(0, 0), ), - ), + }, }, { name: "from integer field", @@ -224,24 +185,22 @@ func TestConverter(t *testing.T) { Tag: []string{"f"}, }, }, - input: Metric( - metric.New( - "cpu", - map[string]string{}, - map[string]interface{}{ - "a": int64(42), - "b": int64(42), - "c": int64(42), - "d": int64(42), - "e": int64(42), - "f": int64(42), - "negative_uint": int64(-42), - }, - time.Unix(0, 0), - ), + input: testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "a": int64(42), + "b": int64(42), + "c": int64(42), + "d": int64(42), + "e": int64(42), + "f": int64(42), + "negative_uint": int64(-42), + }, + time.Unix(0, 0), ), - expected: Metric( - metric.New( + expected: []telegraf.Metric{ + testutil.MustMetric( "cpu", map[string]string{ "f": "42", @@ -256,7 +215,7 @@ func TestConverter(t *testing.T) { }, time.Unix(0, 0), ), - ), + }, }, { name: "from unsigned field", @@ -270,24 +229,22 @@ func TestConverter(t *testing.T) { Tag: []string{"f"}, }, }, - input: Metric( - metric.New( - "cpu", - map[string]string{}, - map[string]interface{}{ - "a": uint64(42), - "b": uint64(42), - "c": uint64(42), - "d": uint64(42), - "e": uint64(42), - "f": uint64(42), - "overflow_int": uint64(math.MaxUint64), - }, - time.Unix(0, 0), - ), + input: testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "a": uint64(42), + "b": uint64(42), + "c": uint64(42), + "d": uint64(42), + "e": uint64(42), + "f": uint64(42), + "overflow_int": uint64(math.MaxUint64), + }, + time.Unix(0, 0), ), - expected: Metric( - metric.New( + expected: []telegraf.Metric{ + testutil.MustMetric( "cpu", map[string]string{ "f": "42", @@ -302,7 +259,7 @@ func TestConverter(t *testing.T) { }, time.Unix(0, 0), ), - ), + }, }, { name: "out of range for unsigned", @@ -311,19 +268,17 @@ func TestConverter(t *testing.T) { Unsigned: []string{"a", "b"}, }, }, - input: Metric( - metric.New( - "cpu", - map[string]string{}, - map[string]interface{}{ - "a": int64(-42), - "b": math.MaxFloat64, - }, - time.Unix(0, 0), - ), + input: testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "a": int64(-42), + "b": math.MaxFloat64, + }, + time.Unix(0, 0), ), - expected: Metric( - metric.New( + expected: []telegraf.Metric{ + testutil.MustMetric( "cpu", map[string]string{}, map[string]interface{}{ @@ -332,7 +287,7 @@ func TestConverter(t *testing.T) { }, time.Unix(0, 0), ), - ), + }, }, { name: "boolean field", @@ -346,29 +301,27 @@ func TestConverter(t *testing.T) { Tag: []string{"f", "ff"}, }, }, - input: Metric( - metric.New( - "cpu", - map[string]string{}, - map[string]interface{}{ - "a": true, - "b": true, - "c": true, - "d": true, - "e": true, - "f": true, - "af": false, - "bf": false, - "cf": false, - "df": false, - "ef": false, - "ff": false, - }, - time.Unix(0, 0), - ), + input: testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "a": true, + "b": true, + "c": true, + "d": true, + "e": true, + "f": true, + "af": false, + "bf": false, + "cf": false, + "df": false, + "ef": false, + "ff": false, + }, + time.Unix(0, 0), ), - expected: Metric( - metric.New( + expected: []telegraf.Metric{ + testutil.MustMetric( "cpu", map[string]string{ "f": "true", @@ -388,7 +341,7 @@ func TestConverter(t *testing.T) { }, time.Unix(0, 0), ), - ), + }, }, { name: "from float field", @@ -402,28 +355,26 @@ func TestConverter(t *testing.T) { Tag: []string{"f"}, }, }, - input: Metric( - metric.New( - "cpu", - map[string]string{}, - map[string]interface{}{ - "a": 42.0, - "b": 42.0, - "c": 42.0, - "d": 42.0, - "e": 42.0, - "f": 42.0, - "too_large_int": math.MaxFloat64, - "too_large_uint": math.MaxFloat64, - "too_small_int": -math.MaxFloat64, - "too_small_uint": -math.MaxFloat64, - "negative_uint": -42.0, - }, - time.Unix(0, 0), - ), + input: testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "a": 42.0, + "b": 42.0, + "c": 42.0, + "d": 42.0, + "e": 42.0, + "f": 42.0, + "too_large_int": math.MaxFloat64, + "too_large_uint": math.MaxFloat64, + "too_small_int": -math.MaxFloat64, + "too_small_uint": -math.MaxFloat64, + "negative_uint": -42.0, + }, + time.Unix(0, 0), ), - expected: Metric( - metric.New( + expected: []telegraf.Metric{ + testutil.MustMetric( "cpu", map[string]string{ "f": "42", @@ -442,7 +393,7 @@ func TestConverter(t *testing.T) { }, time.Unix(0, 0), ), - ), + }, }, { name: "globbing", @@ -451,20 +402,18 @@ func TestConverter(t *testing.T) { Integer: []string{"int_*"}, }, }, - input: Metric( - metric.New( - "cpu", - map[string]string{}, - map[string]interface{}{ - "int_a": "1", - "int_b": "2", - "float_a": 1.0, - }, - time.Unix(0, 0), - ), + input: testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "int_a": "1", + "int_b": "2", + "float_a": 1.0, + }, + time.Unix(0, 0), ), - expected: Metric( - metric.New( + expected: []telegraf.Metric{ + testutil.MustMetric( "cpu", map[string]string{}, map[string]interface{}{ @@ -474,18 +423,102 @@ func TestConverter(t *testing.T) { }, time.Unix(0, 0), ), - ), + }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - metrics := tt.converter.Apply(tt.input) + tt.converter.Log = testutil.Logger{} - require.Equal(t, 1, len(metrics)) - require.Equal(t, tt.expected.Name(), metrics[0].Name()) - require.Equal(t, tt.expected.Tags(), metrics[0].Tags()) - require.Equal(t, tt.expected.Fields(), metrics[0].Fields()) - require.Equal(t, tt.expected.Time(), metrics[0].Time()) + err := tt.converter.Init() + require.NoError(t, err) + actual := tt.converter.Apply(tt.input) + + testutil.RequireMetricsEqual(t, tt.expected, actual) }) } } + +func TestMeasurement(t *testing.T) { + tests := []struct { + name string + converter *Converter + input telegraf.Metric + expected []telegraf.Metric + }{ + { + name: "measurement from tag", + converter: &Converter{ + Tags: &Conversion{ + Measurement: []string{"filepath"}, + }, + }, + input: testutil.MustMetric( + "file", + map[string]string{ + "filepath": "/var/log/syslog", + }, + map[string]interface{}{ + "msg": "howdy", + }, + time.Unix(0, 0), + ), + expected: []telegraf.Metric{ + testutil.MustMetric( + "/var/log/syslog", + map[string]string{}, + map[string]interface{}{ + "msg": "howdy", + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "measurement from field", + converter: &Converter{ + Fields: &Conversion{ + Measurement: []string{"topic"}, + }, + }, + input: testutil.MustMetric( + "file", + map[string]string{}, + map[string]interface{}{ + "v": 1, + "topic": "telegraf", + }, + time.Unix(0, 0), + ), + expected: []telegraf.Metric{ + testutil.MustMetric( + "telegraf", + map[string]string{}, + map[string]interface{}{ + "v": 1, + }, + time.Unix(0, 0), + ), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.converter.Log = testutil.Logger{} + err := tt.converter.Init() + require.NoError(t, err) + + actual := tt.converter.Apply(tt.input) + + testutil.RequireMetricsEqual(t, tt.expected, actual) + }) + } +} + +func TestEmptyConfigInitError(t *testing.T) { + converter := &Converter{ + Log: testutil.Logger{}, + } + err := converter.Init() + require.Error(t, err) +}