Add rename processor (#4528)
This commit is contained in:
parent
6454319062
commit
7ca7f22e50
|
@ -279,6 +279,7 @@ formats may be used with input plugins supporting the `data_format` option:
|
||||||
* [override](./plugins/processors/override)
|
* [override](./plugins/processors/override)
|
||||||
* [printer](./plugins/processors/printer)
|
* [printer](./plugins/processors/printer)
|
||||||
* [regex](./plugins/processors/regex)
|
* [regex](./plugins/processors/regex)
|
||||||
|
* [rename](./plugins/processors/rename)
|
||||||
* [topk](./plugins/processors/topk)
|
* [topk](./plugins/processors/topk)
|
||||||
|
|
||||||
## Aggregator Plugins
|
## Aggregator Plugins
|
||||||
|
|
|
@ -6,5 +6,6 @@ import (
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/override"
|
_ "github.com/influxdata/telegraf/plugins/processors/override"
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/printer"
|
_ "github.com/influxdata/telegraf/plugins/processors/printer"
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/regex"
|
_ "github.com/influxdata/telegraf/plugins/processors/regex"
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/processors/rename"
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/topk"
|
_ "github.com/influxdata/telegraf/plugins/processors/topk"
|
||||||
)
|
)
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
# Rename Processor Plugin
|
||||||
|
|
||||||
|
The `rename` processor renames measurements, fields, and tags.
|
||||||
|
|
||||||
|
### Configuration:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
## Measurement, tag, and field renamings are stored in separate sub-tables.
|
||||||
|
## Specify one sub-table per rename operation.
|
||||||
|
[[processors.rename]]
|
||||||
|
[[processors.rename.measurement]]
|
||||||
|
## measurement to change
|
||||||
|
from = "network_interface_throughput"
|
||||||
|
to = "throughput"
|
||||||
|
|
||||||
|
[[processors.rename.tag]]
|
||||||
|
## tag to change
|
||||||
|
from = "hostname"
|
||||||
|
to = "host"
|
||||||
|
|
||||||
|
[[processors.rename.field]]
|
||||||
|
## field to change
|
||||||
|
from = "lower"
|
||||||
|
to = "min"
|
||||||
|
|
||||||
|
[[processors.rename.field]]
|
||||||
|
## field to change
|
||||||
|
from = "upper"
|
||||||
|
to = "max"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Tags:
|
||||||
|
|
||||||
|
No tags are applied by this processor, though it can alter them by renaming.
|
||||||
|
|
||||||
|
### Example processing:
|
||||||
|
|
||||||
|
```diff
|
||||||
|
- network_interface_throughput,hostname=backend.example.com,units=kbps lower=10i,upper=1000i,mean=500i 1502489900000000000
|
||||||
|
+ throughput,host=backend.example.com,units=kbps min=10i,max=1000i,mean=500i 1502489900000000000
|
||||||
|
```
|
|
@ -0,0 +1,82 @@
|
||||||
|
package rename
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/plugins/processors"
|
||||||
|
)
|
||||||
|
|
||||||
|
const sampleConfig = `
|
||||||
|
## Measurement, tag, and field renamings are stored in separate sub-tables.
|
||||||
|
## Specify one sub-table per rename operation.
|
||||||
|
# [[processors.rename.measurement]]
|
||||||
|
# ## measurement to change
|
||||||
|
# from = "kilobytes_per_second"
|
||||||
|
# to = "kbps"
|
||||||
|
|
||||||
|
# [[processors.rename.tag]]
|
||||||
|
# ## tag to change
|
||||||
|
# from = "host"
|
||||||
|
# to = "hostname"
|
||||||
|
|
||||||
|
# [[processors.rename.field]]
|
||||||
|
# ## field to change
|
||||||
|
# from = "lower"
|
||||||
|
# to = "min"
|
||||||
|
|
||||||
|
# [[processors.rename.field]]
|
||||||
|
# ## field to change
|
||||||
|
# from = "upper"
|
||||||
|
# to = "max"
|
||||||
|
`
|
||||||
|
|
||||||
|
type renamer struct {
|
||||||
|
From string
|
||||||
|
To string
|
||||||
|
}
|
||||||
|
|
||||||
|
type Rename struct {
|
||||||
|
Measurement []renamer
|
||||||
|
Tag []renamer
|
||||||
|
Field []renamer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Rename) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Rename) Description() string {
|
||||||
|
return "Rename measurements, tags, and fields that pass through this filter."
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Rename) Apply(in ...telegraf.Metric) []telegraf.Metric {
|
||||||
|
for _, point := range in {
|
||||||
|
for _, measurementRenamer := range r.Measurement {
|
||||||
|
if point.Name() == measurementRenamer.From {
|
||||||
|
point.SetName(measurementRenamer.To)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tagRenamer := range r.Tag {
|
||||||
|
if value, ok := point.GetTag(tagRenamer.From); ok {
|
||||||
|
point.RemoveTag(tagRenamer.From)
|
||||||
|
point.AddTag(tagRenamer.To, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, fieldRenamer := range r.Field {
|
||||||
|
if value, ok := point.GetField(fieldRenamer.From); ok {
|
||||||
|
point.RemoveField(fieldRenamer.From)
|
||||||
|
point.AddField(fieldRenamer.To, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return in
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
processors.Add("rename", func() telegraf.Processor {
|
||||||
|
return &Rename{}
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,58 @@
|
||||||
|
package rename
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newMetric(name string, tags map[string]string, fields map[string]interface{}) telegraf.Metric {
|
||||||
|
if tags == nil {
|
||||||
|
tags = map[string]string{}
|
||||||
|
}
|
||||||
|
if fields == nil {
|
||||||
|
fields = map[string]interface{}{}
|
||||||
|
}
|
||||||
|
m, _ := metric.New(name, tags, fields, time.Now())
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMeasurementRename(t *testing.T) {
|
||||||
|
r := Rename{}
|
||||||
|
r.Measurement = []renamer{
|
||||||
|
{From: "foo", To: "bar"},
|
||||||
|
{From: "baz", To: "quux"},
|
||||||
|
}
|
||||||
|
m1 := newMetric("foo", nil, nil)
|
||||||
|
m2 := newMetric("bar", nil, nil)
|
||||||
|
m3 := newMetric("baz", nil, nil)
|
||||||
|
results := r.Apply(m1, m2, m3)
|
||||||
|
assert.Equal(t, "bar", results[0].Name(), "Should change name from 'foo' to 'bar'")
|
||||||
|
assert.Equal(t, "bar", results[1].Name(), "Should not name from 'bar'")
|
||||||
|
assert.Equal(t, "quux", results[2].Name(), "Should change name from 'baz' to 'quux'")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTagRename(t *testing.T) {
|
||||||
|
r := Rename{}
|
||||||
|
r.Tag = []renamer{
|
||||||
|
{From: "hostname", To: "host"},
|
||||||
|
}
|
||||||
|
m := newMetric("foo", map[string]string{"hostname": "localhost", "region": "east-1"}, nil)
|
||||||
|
results := r.Apply(m)
|
||||||
|
|
||||||
|
assert.Equal(t, map[string]string{"host": "localhost", "region": "east-1"}, results[0].Tags(), "should change tag 'hostname' to 'host'")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFieldRename(t *testing.T) {
|
||||||
|
r := Rename{}
|
||||||
|
r.Field = []renamer{
|
||||||
|
{From: "time_msec", To: "time"},
|
||||||
|
}
|
||||||
|
m := newMetric("foo", nil, map[string]interface{}{"time_msec": int64(1250), "snakes": true})
|
||||||
|
results := r.Apply(m)
|
||||||
|
|
||||||
|
assert.Equal(t, map[string]interface{}{"time": int64(1250), "snakes": true}, results[0].Fields(), "should change field 'time_msec' to 'time'")
|
||||||
|
}
|
Loading…
Reference in New Issue