Add pivot and unpivot processors (#5991)

This commit is contained in:
Daniel Nelson 2019-06-14 15:26:56 -07:00 committed by GitHub
parent 7f04511c30
commit 1ea7863b9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 386 additions and 2 deletions

View File

@ -240,11 +240,11 @@ func (m *metric) Copy() telegraf.Metric {
} }
for i, tag := range m.tags { for i, tag := range m.tags {
m2.tags[i] = tag m2.tags[i] = &telegraf.Tag{Key: tag.Key, Value: tag.Value}
} }
for i, field := range m.fields { for i, field := range m.fields {
m2.fields[i] = field m2.fields[i] = &telegraf.Field{Key: field.Key, Value: field.Value}
} }
return m2 return m2
} }

View File

@ -6,9 +6,11 @@ import (
_ "github.com/influxdata/telegraf/plugins/processors/enum" _ "github.com/influxdata/telegraf/plugins/processors/enum"
_ "github.com/influxdata/telegraf/plugins/processors/override" _ "github.com/influxdata/telegraf/plugins/processors/override"
_ "github.com/influxdata/telegraf/plugins/processors/parser" _ "github.com/influxdata/telegraf/plugins/processors/parser"
_ "github.com/influxdata/telegraf/plugins/processors/pivot"
_ "github.com/influxdata/telegraf/plugins/processors/printer" _ "github.com/influxdata/telegraf/plugins/processors/printer"
_ "github.com/influxdata/telegraf/plugins/processors/regex" _ "github.com/influxdata/telegraf/plugins/processors/regex"
_ "github.com/influxdata/telegraf/plugins/processors/rename" _ "github.com/influxdata/telegraf/plugins/processors/rename"
_ "github.com/influxdata/telegraf/plugins/processors/strings" _ "github.com/influxdata/telegraf/plugins/processors/strings"
_ "github.com/influxdata/telegraf/plugins/processors/topk" _ "github.com/influxdata/telegraf/plugins/processors/topk"
_ "github.com/influxdata/telegraf/plugins/processors/unpivot"
) )

View File

@ -0,0 +1,30 @@
# Pivot Processor
You can use the `pivot` processor to rotate single valued metrics into a multi
field metric. This transformation often results in data that is more easily
to apply mathematical operators and comparisons between, and flatten into a
more compact representation for write operations with some output data
formats.
To perform the reverse operation use the [unpivot] processor.
### Configuration
```toml
[[processors.pivot]]
## Tag to use for naming the new field.
tag_key = "name"
## Field to use as the value of the new field.
value_key = "value"
```
### Example
```diff
- cpu,cpu=cpu0,name=time_idle value=42i
- cpu,cpu=cpu0,name=time_user value=43i
+ cpu,cpu=cpu0 time_idle=42i
+ cpu,cpu=cpu0 time_user=42i
```
[unpivot]: /plugins/processors/unpivot/README.md

View File

@ -0,0 +1,54 @@
package pivot
import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/processors"
)
const (
description = "Rotate a single valued metric into a multi field metric"
sampleConfig = `
## Tag to use for naming the new field.
tag_key = "name"
## Field to use as the value of the new field.
value_key = "value"
`
)
type Pivot struct {
TagKey string `toml:"tag_key"`
ValueKey string `toml:"value_key"`
}
func (p *Pivot) SampleConfig() string {
return sampleConfig
}
func (p *Pivot) Description() string {
return description
}
func (p *Pivot) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
for _, m := range metrics {
key, ok := m.GetTag(p.TagKey)
if !ok {
continue
}
value, ok := m.GetField(p.ValueKey)
if !ok {
continue
}
m.RemoveTag(p.TagKey)
m.RemoveField(p.ValueKey)
m.AddField(key, value)
}
return metrics
}
func init() {
processors.Add("pivot", func() telegraf.Processor {
return &Pivot{}
})
}

View File

@ -0,0 +1,111 @@
package pivot
import (
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
)
func TestPivot(t *testing.T) {
now := time.Now()
tests := []struct {
name string
pivot *Pivot
metrics []telegraf.Metric
expected []telegraf.Metric
}{
{
name: "simple",
pivot: &Pivot{
TagKey: "name",
ValueKey: "value",
},
metrics: []telegraf.Metric{
testutil.MustMetric("cpu",
map[string]string{
"name": "idle_time",
},
map[string]interface{}{
"value": int64(42),
},
now,
),
},
expected: []telegraf.Metric{
testutil.MustMetric("cpu",
map[string]string{},
map[string]interface{}{
"idle_time": int64(42),
},
now,
),
},
},
{
name: "missing tag",
pivot: &Pivot{
TagKey: "name",
ValueKey: "value",
},
metrics: []telegraf.Metric{
testutil.MustMetric("cpu",
map[string]string{
"foo": "idle_time",
},
map[string]interface{}{
"value": int64(42),
},
now,
),
},
expected: []telegraf.Metric{
testutil.MustMetric("cpu",
map[string]string{
"foo": "idle_time",
},
map[string]interface{}{
"value": int64(42),
},
now,
),
},
},
{
name: "missing field",
pivot: &Pivot{
TagKey: "name",
ValueKey: "value",
},
metrics: []telegraf.Metric{
testutil.MustMetric("cpu",
map[string]string{
"name": "idle_time",
},
map[string]interface{}{
"foo": int64(42),
},
now,
),
},
expected: []telegraf.Metric{
testutil.MustMetric("cpu",
map[string]string{
"name": "idle_time",
},
map[string]interface{}{
"foo": int64(42),
},
now,
),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual := tt.pivot.Apply(tt.metrics...)
testutil.RequireMetricsEqual(t, tt.expected, actual)
})
}
}

View File

@ -0,0 +1,26 @@
# Unpivot Processor
You can use the `unpivot` processor to rotate a multi field series into single valued metrics. This transformation often results in data that is more easy to aggregate across fields.
To perform the reverse operation use the [pivot] processor.
### Configuration
```toml
[[processors.unpivot]]
## Tag to use for the name.
tag_key = "name"
## Field to use for the name of the value.
value_key = "value"
```
### Example
```diff
- cpu,cpu=cpu0 time_idle=42i,time_user=43i
+ cpu,cpu=cpu0,name=time_idle value=42i
+ cpu,cpu=cpu0,name=time_user value=43i
```
[pivot]: /plugins/processors/pivot/README.md

View File

@ -0,0 +1,71 @@
package unpivot
import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/processors"
)
const (
description = "Rotate multi field metric into several single field metrics"
sampleConfig = `
## Tag to use for the name.
tag_key = "name"
## Field to use for the name of the value.
value_key = "value"
`
)
type Unpivot struct {
TagKey string `toml:"tag_key"`
ValueKey string `toml:"value_key"`
}
func (p *Unpivot) SampleConfig() string {
return sampleConfig
}
func (p *Unpivot) Description() string {
return description
}
func copyWithoutFields(metric telegraf.Metric) telegraf.Metric {
m := metric.Copy()
fieldKeys := make([]string, 0, len(m.FieldList()))
for _, field := range m.FieldList() {
fieldKeys = append(fieldKeys, field.Key)
}
for _, fk := range fieldKeys {
m.RemoveField(fk)
}
return m
}
func (p *Unpivot) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
fieldCount := 0
for _, m := range metrics {
fieldCount += len(m.FieldList())
}
results := make([]telegraf.Metric, 0, fieldCount)
for _, m := range metrics {
base := copyWithoutFields(m)
for _, field := range m.FieldList() {
newMetric := base.Copy()
newMetric.AddField(p.ValueKey, field.Value)
newMetric.AddTag(p.TagKey, field.Key)
results = append(results, newMetric)
}
m.Accept()
}
return results
}
func init() {
processors.Add("unpivot", func() telegraf.Processor {
return &Unpivot{}
})
}

View File

@ -0,0 +1,90 @@
package unpivot
import (
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
)
func TestUnpivot(t *testing.T) {
now := time.Now()
tests := []struct {
name string
unpivot *Unpivot
metrics []telegraf.Metric
expected []telegraf.Metric
}{
{
name: "simple",
unpivot: &Unpivot{
TagKey: "name",
ValueKey: "value",
},
metrics: []telegraf.Metric{
testutil.MustMetric("cpu",
map[string]string{},
map[string]interface{}{
"idle_time": int64(42),
},
now,
),
},
expected: []telegraf.Metric{
testutil.MustMetric("cpu",
map[string]string{
"name": "idle_time",
},
map[string]interface{}{
"value": int64(42),
},
now,
),
},
},
{
name: "multi fields",
unpivot: &Unpivot{
TagKey: "name",
ValueKey: "value",
},
metrics: []telegraf.Metric{
testutil.MustMetric("cpu",
map[string]string{},
map[string]interface{}{
"idle_time": int64(42),
"idle_user": int64(43),
},
now,
),
},
expected: []telegraf.Metric{
testutil.MustMetric("cpu",
map[string]string{
"name": "idle_time",
},
map[string]interface{}{
"value": int64(42),
},
now,
),
testutil.MustMetric("cpu",
map[string]string{
"name": "idle_user",
},
map[string]interface{}{
"value": int64(43),
},
now,
),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual := tt.unpivot.Apply(tt.metrics...)
testutil.RequireMetricsEqual(t, tt.expected, actual, testutil.SortMetrics())
})
}
}