diff --git a/metric/metric.go b/metric/metric.go index 29345e63c..4f1418b35 100644 --- a/metric/metric.go +++ b/metric/metric.go @@ -240,11 +240,11 @@ func (m *metric) Copy() telegraf.Metric { } for i, tag := range m.tags { - m2.tags[i] = tag + m2.tags[i] = &telegraf.Tag{Key: tag.Key, Value: tag.Value} } for i, field := range m.fields { - m2.fields[i] = field + m2.fields[i] = &telegraf.Field{Key: field.Key, Value: field.Value} } return m2 } diff --git a/plugins/processors/all/all.go b/plugins/processors/all/all.go index 65580a46f..5a61a2e80 100644 --- a/plugins/processors/all/all.go +++ b/plugins/processors/all/all.go @@ -6,9 +6,11 @@ import ( _ "github.com/influxdata/telegraf/plugins/processors/enum" _ "github.com/influxdata/telegraf/plugins/processors/override" _ "github.com/influxdata/telegraf/plugins/processors/parser" + _ "github.com/influxdata/telegraf/plugins/processors/pivot" _ "github.com/influxdata/telegraf/plugins/processors/printer" _ "github.com/influxdata/telegraf/plugins/processors/regex" _ "github.com/influxdata/telegraf/plugins/processors/rename" _ "github.com/influxdata/telegraf/plugins/processors/strings" _ "github.com/influxdata/telegraf/plugins/processors/topk" + _ "github.com/influxdata/telegraf/plugins/processors/unpivot" ) diff --git a/plugins/processors/pivot/README.md b/plugins/processors/pivot/README.md new file mode 100644 index 000000000..7d2fa91b4 --- /dev/null +++ b/plugins/processors/pivot/README.md @@ -0,0 +1,30 @@ +# Pivot Processor + +You can use the `pivot` processor to rotate single valued metrics into a multi +field metric. This transformation often results in data that is more easily +to apply mathematical operators and comparisons between, and flatten into a +more compact representation for write operations with some output data +formats. + +To perform the reverse operation use the [unpivot] processor. + +### Configuration + +```toml +[[processors.pivot]] + ## Tag to use for naming the new field. + tag_key = "name" + ## Field to use as the value of the new field. + value_key = "value" +``` + +### Example + +```diff +- cpu,cpu=cpu0,name=time_idle value=42i +- cpu,cpu=cpu0,name=time_user value=43i ++ cpu,cpu=cpu0 time_idle=42i ++ cpu,cpu=cpu0 time_user=42i +``` + +[unpivot]: /plugins/processors/unpivot/README.md diff --git a/plugins/processors/pivot/pivot.go b/plugins/processors/pivot/pivot.go new file mode 100644 index 000000000..b20c7f758 --- /dev/null +++ b/plugins/processors/pivot/pivot.go @@ -0,0 +1,54 @@ +package pivot + +import ( + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/processors" +) + +const ( + description = "Rotate a single valued metric into a multi field metric" + sampleConfig = ` + ## Tag to use for naming the new field. + tag_key = "name" + ## Field to use as the value of the new field. + value_key = "value" +` +) + +type Pivot struct { + TagKey string `toml:"tag_key"` + ValueKey string `toml:"value_key"` +} + +func (p *Pivot) SampleConfig() string { + return sampleConfig +} + +func (p *Pivot) Description() string { + return description +} + +func (p *Pivot) Apply(metrics ...telegraf.Metric) []telegraf.Metric { + for _, m := range metrics { + key, ok := m.GetTag(p.TagKey) + if !ok { + continue + } + + value, ok := m.GetField(p.ValueKey) + if !ok { + continue + } + + m.RemoveTag(p.TagKey) + m.RemoveField(p.ValueKey) + m.AddField(key, value) + } + return metrics +} + +func init() { + processors.Add("pivot", func() telegraf.Processor { + return &Pivot{} + }) +} diff --git a/plugins/processors/pivot/pivot_test.go b/plugins/processors/pivot/pivot_test.go new file mode 100644 index 000000000..34924f8fa --- /dev/null +++ b/plugins/processors/pivot/pivot_test.go @@ -0,0 +1,111 @@ +package pivot + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" +) + +func TestPivot(t *testing.T) { + now := time.Now() + tests := []struct { + name string + pivot *Pivot + metrics []telegraf.Metric + expected []telegraf.Metric + }{ + { + name: "simple", + pivot: &Pivot{ + TagKey: "name", + ValueKey: "value", + }, + metrics: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "name": "idle_time", + }, + map[string]interface{}{ + "value": int64(42), + }, + now, + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "idle_time": int64(42), + }, + now, + ), + }, + }, + { + name: "missing tag", + pivot: &Pivot{ + TagKey: "name", + ValueKey: "value", + }, + metrics: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "foo": "idle_time", + }, + map[string]interface{}{ + "value": int64(42), + }, + now, + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "foo": "idle_time", + }, + map[string]interface{}{ + "value": int64(42), + }, + now, + ), + }, + }, + { + name: "missing field", + pivot: &Pivot{ + TagKey: "name", + ValueKey: "value", + }, + metrics: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "name": "idle_time", + }, + map[string]interface{}{ + "foo": int64(42), + }, + now, + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "name": "idle_time", + }, + map[string]interface{}{ + "foo": int64(42), + }, + now, + ), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actual := tt.pivot.Apply(tt.metrics...) + testutil.RequireMetricsEqual(t, tt.expected, actual) + }) + } +} diff --git a/plugins/processors/unpivot/README.md b/plugins/processors/unpivot/README.md new file mode 100644 index 000000000..beee6c276 --- /dev/null +++ b/plugins/processors/unpivot/README.md @@ -0,0 +1,26 @@ +# Unpivot Processor + +You can use the `unpivot` processor to rotate a multi field series into single valued metrics. This transformation often results in data that is more easy to aggregate across fields. + +To perform the reverse operation use the [pivot] processor. + +### Configuration + +```toml +[[processors.unpivot]] + ## Tag to use for the name. + tag_key = "name" + ## Field to use for the name of the value. + value_key = "value" +``` + +### Example + +```diff +- cpu,cpu=cpu0 time_idle=42i,time_user=43i ++ cpu,cpu=cpu0,name=time_idle value=42i ++ cpu,cpu=cpu0,name=time_user value=43i +``` + +[pivot]: /plugins/processors/pivot/README.md + diff --git a/plugins/processors/unpivot/unpivot.go b/plugins/processors/unpivot/unpivot.go new file mode 100644 index 000000000..4a081a428 --- /dev/null +++ b/plugins/processors/unpivot/unpivot.go @@ -0,0 +1,71 @@ +package unpivot + +import ( + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/processors" +) + +const ( + description = "Rotate multi field metric into several single field metrics" + sampleConfig = ` + ## Tag to use for the name. + tag_key = "name" + ## Field to use for the name of the value. + value_key = "value" +` +) + +type Unpivot struct { + TagKey string `toml:"tag_key"` + ValueKey string `toml:"value_key"` +} + +func (p *Unpivot) SampleConfig() string { + return sampleConfig +} + +func (p *Unpivot) Description() string { + return description +} + +func copyWithoutFields(metric telegraf.Metric) telegraf.Metric { + m := metric.Copy() + + fieldKeys := make([]string, 0, len(m.FieldList())) + for _, field := range m.FieldList() { + fieldKeys = append(fieldKeys, field.Key) + } + + for _, fk := range fieldKeys { + m.RemoveField(fk) + } + + return m +} + +func (p *Unpivot) Apply(metrics ...telegraf.Metric) []telegraf.Metric { + fieldCount := 0 + for _, m := range metrics { + fieldCount += len(m.FieldList()) + } + + results := make([]telegraf.Metric, 0, fieldCount) + + for _, m := range metrics { + base := copyWithoutFields(m) + for _, field := range m.FieldList() { + newMetric := base.Copy() + newMetric.AddField(p.ValueKey, field.Value) + newMetric.AddTag(p.TagKey, field.Key) + results = append(results, newMetric) + } + m.Accept() + } + return results +} + +func init() { + processors.Add("unpivot", func() telegraf.Processor { + return &Unpivot{} + }) +} diff --git a/plugins/processors/unpivot/unpivot_test.go b/plugins/processors/unpivot/unpivot_test.go new file mode 100644 index 000000000..a3a538503 --- /dev/null +++ b/plugins/processors/unpivot/unpivot_test.go @@ -0,0 +1,90 @@ +package unpivot + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" +) + +func TestUnpivot(t *testing.T) { + now := time.Now() + tests := []struct { + name string + unpivot *Unpivot + metrics []telegraf.Metric + expected []telegraf.Metric + }{ + { + name: "simple", + unpivot: &Unpivot{ + TagKey: "name", + ValueKey: "value", + }, + metrics: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "idle_time": int64(42), + }, + now, + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "name": "idle_time", + }, + map[string]interface{}{ + "value": int64(42), + }, + now, + ), + }, + }, + { + name: "multi fields", + unpivot: &Unpivot{ + TagKey: "name", + ValueKey: "value", + }, + metrics: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "idle_time": int64(42), + "idle_user": int64(43), + }, + now, + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "name": "idle_time", + }, + map[string]interface{}{ + "value": int64(42), + }, + now, + ), + testutil.MustMetric("cpu", + map[string]string{ + "name": "idle_user", + }, + map[string]interface{}{ + "value": int64(43), + }, + now, + ), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actual := tt.unpivot.Apply(tt.metrics...) + testutil.RequireMetricsEqual(t, tt.expected, actual, testutil.SortMetrics()) + }) + } +}