Use operation subtables in enum and rename processors (#4672)
This commit is contained in:
parent
9d72d078a3
commit
eff7f0f083
|
@ -13,13 +13,13 @@ source field is overwritten.
|
||||||
|
|
||||||
```toml
|
```toml
|
||||||
[[processors.enum]]
|
[[processors.enum]]
|
||||||
[[processors.enum.fields]]
|
[[processors.enum.mapping]]
|
||||||
## Name of the field to map
|
## Name of the field to map
|
||||||
source = "name"
|
field = "status"
|
||||||
|
|
||||||
## Destination field to be used for the mapped value. By default the source
|
## Destination field to be used for the mapped value. By default the source
|
||||||
## field is used, overwriting the original value.
|
## field is used, overwriting the original value.
|
||||||
# destination = "mapped"
|
# dest = "status_code"
|
||||||
|
|
||||||
## Default value to be used for all values not contained in the mapping
|
## Default value to be used for all values not contained in the mapping
|
||||||
## table. When unset, the unmodified value for the field will be used if no
|
## table. When unset, the unmodified value for the field will be used if no
|
||||||
|
@ -27,7 +27,15 @@ source field is overwritten.
|
||||||
# default = 0
|
# default = 0
|
||||||
|
|
||||||
## Table of mappings
|
## Table of mappings
|
||||||
[processors.enum.fields.value_mappings]
|
[processors.enum.mapping.value_mappings]
|
||||||
value1 = 1
|
green = 1
|
||||||
value2 = 2
|
amber = 2
|
||||||
|
red = 3
|
||||||
|
```
|
||||||
|
|
||||||
|
### Example:
|
||||||
|
|
||||||
|
```diff
|
||||||
|
- xyzzy status="green" 1502489900000000000
|
||||||
|
+ xyzzy status="green",status_code=1i 1502489900000000000
|
||||||
```
|
```
|
||||||
|
|
|
@ -8,13 +8,13 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var sampleConfig = `
|
var sampleConfig = `
|
||||||
[[processors.enum.fields]]
|
[[processors.enum.mapping]]
|
||||||
## Name of the field to map
|
## Name of the field to map
|
||||||
source = "name"
|
field = "status"
|
||||||
|
|
||||||
## Destination field to be used for the mapped value. By default the source
|
## Destination field to be used for the mapped value. By default the source
|
||||||
## field is used, overwriting the original value.
|
## field is used, overwriting the original value.
|
||||||
# destination = "mapped"
|
# dest = "status_code"
|
||||||
|
|
||||||
## Default value to be used for all values not contained in the mapping
|
## Default value to be used for all values not contained in the mapping
|
||||||
## table. When unset, the unmodified value for the field will be used if no
|
## table. When unset, the unmodified value for the field will be used if no
|
||||||
|
@ -22,18 +22,19 @@ var sampleConfig = `
|
||||||
# default = 0
|
# default = 0
|
||||||
|
|
||||||
## Table of mappings
|
## Table of mappings
|
||||||
[processors.enum.fields.value_mappings]
|
[processors.enum.mapping.value_mappings]
|
||||||
value1 = 1
|
green = 1
|
||||||
value2 = 2
|
yellow = 2
|
||||||
|
red = 3
|
||||||
`
|
`
|
||||||
|
|
||||||
type EnumMapper struct {
|
type EnumMapper struct {
|
||||||
Fields []Mapping
|
Mappings []Mapping `toml:"mapping"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Mapping struct {
|
type Mapping struct {
|
||||||
Source string
|
Field string
|
||||||
Destination string
|
Dest string
|
||||||
Default interface{}
|
Default interface{}
|
||||||
ValueMappings map[string]interface{}
|
ValueMappings map[string]interface{}
|
||||||
}
|
}
|
||||||
|
@ -54,8 +55,8 @@ func (mapper *EnumMapper) Apply(in ...telegraf.Metric) []telegraf.Metric {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mapper *EnumMapper) applyMappings(metric telegraf.Metric) telegraf.Metric {
|
func (mapper *EnumMapper) applyMappings(metric telegraf.Metric) telegraf.Metric {
|
||||||
for _, mapping := range mapper.Fields {
|
for _, mapping := range mapper.Mappings {
|
||||||
if originalValue, isPresent := metric.GetField(mapping.Source); isPresent == true {
|
if originalValue, isPresent := metric.GetField(mapping.Field); isPresent == true {
|
||||||
if adjustedValue, isString := adjustBoolValue(originalValue).(string); isString == true {
|
if adjustedValue, isString := adjustBoolValue(originalValue).(string); isString == true {
|
||||||
if mappedValue, isMappedValuePresent := mapping.mapValue(adjustedValue); isMappedValuePresent == true {
|
if mappedValue, isMappedValuePresent := mapping.mapValue(adjustedValue); isMappedValuePresent == true {
|
||||||
writeField(metric, mapping.getDestination(), mappedValue)
|
writeField(metric, mapping.getDestination(), mappedValue)
|
||||||
|
@ -84,16 +85,14 @@ func (mapping *Mapping) mapValue(original string) (interface{}, bool) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mapping *Mapping) getDestination() string {
|
func (mapping *Mapping) getDestination() string {
|
||||||
if mapping.Destination != "" {
|
if mapping.Dest != "" {
|
||||||
return mapping.Destination
|
return mapping.Dest
|
||||||
}
|
}
|
||||||
return mapping.Source
|
return mapping.Field
|
||||||
}
|
}
|
||||||
|
|
||||||
func writeField(metric telegraf.Metric, name string, value interface{}) {
|
func writeField(metric telegraf.Metric, name string, value interface{}) {
|
||||||
if metric.HasField(name) {
|
metric.RemoveField(name)
|
||||||
metric.RemoveField(name)
|
|
||||||
}
|
|
||||||
metric.AddField(name, value)
|
metric.AddField(name, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -49,7 +49,7 @@ func TestRetainsMetric(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapsSingleStringValue(t *testing.T) {
|
func TestMapsSingleStringValue(t *testing.T) {
|
||||||
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", ValueMappings: map[string]interface{}{"test": int64(1)}}}}
|
mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", ValueMappings: map[string]interface{}{"test": int64(1)}}}}
|
||||||
|
|
||||||
fields := calculateProcessedValues(mapper, createTestMetric())
|
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||||
|
|
||||||
|
@ -57,7 +57,7 @@ func TestMapsSingleStringValue(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNoFailureOnMappingsOnNonStringValuedFields(t *testing.T) {
|
func TestNoFailureOnMappingsOnNonStringValuedFields(t *testing.T) {
|
||||||
mapper := EnumMapper{Fields: []Mapping{{Source: "int_value", ValueMappings: map[string]interface{}{"13i": int64(7)}}}}
|
mapper := EnumMapper{Mappings: []Mapping{{Field: "int_value", ValueMappings: map[string]interface{}{"13i": int64(7)}}}}
|
||||||
|
|
||||||
fields := calculateProcessedValues(mapper, createTestMetric())
|
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||||
|
|
||||||
|
@ -65,7 +65,7 @@ func TestNoFailureOnMappingsOnNonStringValuedFields(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapSingleBoolValue(t *testing.T) {
|
func TestMapSingleBoolValue(t *testing.T) {
|
||||||
mapper := EnumMapper{Fields: []Mapping{{Source: "true_value", ValueMappings: map[string]interface{}{"true": int64(1)}}}}
|
mapper := EnumMapper{Mappings: []Mapping{{Field: "true_value", ValueMappings: map[string]interface{}{"true": int64(1)}}}}
|
||||||
|
|
||||||
fields := calculateProcessedValues(mapper, createTestMetric())
|
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||||
|
|
||||||
|
@ -73,7 +73,7 @@ func TestMapSingleBoolValue(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapsToDefaultValueOnUnknownSourceValue(t *testing.T) {
|
func TestMapsToDefaultValueOnUnknownSourceValue(t *testing.T) {
|
||||||
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", Default: int64(42), ValueMappings: map[string]interface{}{"other": int64(1)}}}}
|
mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", Default: int64(42), ValueMappings: map[string]interface{}{"other": int64(1)}}}}
|
||||||
|
|
||||||
fields := calculateProcessedValues(mapper, createTestMetric())
|
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||||
|
|
||||||
|
@ -81,7 +81,7 @@ func TestMapsToDefaultValueOnUnknownSourceValue(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDoNotMapToDefaultValueKnownSourceValue(t *testing.T) {
|
func TestDoNotMapToDefaultValueKnownSourceValue(t *testing.T) {
|
||||||
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", Default: int64(42), ValueMappings: map[string]interface{}{"test": int64(1)}}}}
|
mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", Default: int64(42), ValueMappings: map[string]interface{}{"test": int64(1)}}}}
|
||||||
|
|
||||||
fields := calculateProcessedValues(mapper, createTestMetric())
|
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||||
|
|
||||||
|
@ -89,7 +89,7 @@ func TestDoNotMapToDefaultValueKnownSourceValue(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNoMappingWithoutDefaultOrDefinedMappingValue(t *testing.T) {
|
func TestNoMappingWithoutDefaultOrDefinedMappingValue(t *testing.T) {
|
||||||
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", ValueMappings: map[string]interface{}{"other": int64(1)}}}}
|
mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", ValueMappings: map[string]interface{}{"other": int64(1)}}}}
|
||||||
|
|
||||||
fields := calculateProcessedValues(mapper, createTestMetric())
|
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||||
|
|
||||||
|
@ -97,7 +97,7 @@ func TestNoMappingWithoutDefaultOrDefinedMappingValue(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWritesToDestination(t *testing.T) {
|
func TestWritesToDestination(t *testing.T) {
|
||||||
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", Destination: "string_code", ValueMappings: map[string]interface{}{"test": int64(1)}}}}
|
mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", Dest: "string_code", ValueMappings: map[string]interface{}{"test": int64(1)}}}}
|
||||||
|
|
||||||
fields := calculateProcessedValues(mapper, createTestMetric())
|
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||||
|
|
||||||
|
|
|
@ -5,28 +5,23 @@ The `rename` processor renames measurements, fields, and tags.
|
||||||
### Configuration:
|
### Configuration:
|
||||||
|
|
||||||
```toml
|
```toml
|
||||||
## Measurement, tag, and field renamings are stored in separate sub-tables.
|
|
||||||
## Specify one sub-table per rename operation.
|
|
||||||
[[processors.rename]]
|
[[processors.rename]]
|
||||||
[[processors.rename.measurement]]
|
## Specify one sub-table per rename operation.
|
||||||
## measurement to change
|
[[processors.rename.replace]]
|
||||||
from = "network_interface_throughput"
|
measurement = "network_interface_throughput"
|
||||||
to = "throughput"
|
dest = "throughput"
|
||||||
|
|
||||||
[[processors.rename.tag]]
|
[[processors.rename.replace]]
|
||||||
## tag to change
|
tag = "hostname"
|
||||||
from = "hostname"
|
dest = "host"
|
||||||
to = "host"
|
|
||||||
|
|
||||||
[[processors.rename.field]]
|
[[processors.rename.replace]]
|
||||||
## field to change
|
field = "lower"
|
||||||
from = "lower"
|
dest = "min"
|
||||||
to = "min"
|
|
||||||
|
|
||||||
[[processors.rename.field]]
|
[[processors.rename.replace]]
|
||||||
## field to change
|
field = "upper"
|
||||||
from = "upper"
|
dest = "max"
|
||||||
to = "max"
|
|
||||||
```
|
```
|
||||||
|
|
||||||
### Tags:
|
### Tags:
|
||||||
|
@ -36,6 +31,6 @@ No tags are applied by this processor, though it can alter them by renaming.
|
||||||
### Example processing:
|
### Example processing:
|
||||||
|
|
||||||
```diff
|
```diff
|
||||||
- network_interface_throughput,hostname=backend.example.com,units=kbps lower=10i,upper=1000i,mean=500i 1502489900000000000
|
- network_interface_throughput,hostname=backend.example.com lower=10i,upper=1000i,mean=500i 1502489900000000000
|
||||||
+ throughput,host=backend.example.com,units=kbps min=10i,max=1000i,mean=500i 1502489900000000000
|
+ throughput,host=backend.example.com min=10i,max=1000i,mean=500i 1502489900000000000
|
||||||
```
|
```
|
||||||
|
|
|
@ -6,38 +6,17 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const sampleConfig = `
|
const sampleConfig = `
|
||||||
## Measurement, tag, and field renamings are stored in separate sub-tables.
|
|
||||||
## Specify one sub-table per rename operation.
|
|
||||||
# [[processors.rename.measurement]]
|
|
||||||
# ## measurement to change
|
|
||||||
# from = "kilobytes_per_second"
|
|
||||||
# to = "kbps"
|
|
||||||
|
|
||||||
# [[processors.rename.tag]]
|
|
||||||
# ## tag to change
|
|
||||||
# from = "host"
|
|
||||||
# to = "hostname"
|
|
||||||
|
|
||||||
# [[processors.rename.field]]
|
|
||||||
# ## field to change
|
|
||||||
# from = "lower"
|
|
||||||
# to = "min"
|
|
||||||
|
|
||||||
# [[processors.rename.field]]
|
|
||||||
# ## field to change
|
|
||||||
# from = "upper"
|
|
||||||
# to = "max"
|
|
||||||
`
|
`
|
||||||
|
|
||||||
type renamer struct {
|
type Replace struct {
|
||||||
From string
|
Measurement string `toml:"measurement"`
|
||||||
To string
|
Tag string `toml:"tag"`
|
||||||
|
Field string `toml:"field"`
|
||||||
|
Dest string `toml:"dest"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Rename struct {
|
type Rename struct {
|
||||||
Measurement []renamer
|
Replaces []Replace `toml:"replace"`
|
||||||
Tag []renamer
|
|
||||||
Field []renamer
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Rename) SampleConfig() string {
|
func (r *Rename) SampleConfig() string {
|
||||||
|
@ -50,24 +29,32 @@ func (r *Rename) Description() string {
|
||||||
|
|
||||||
func (r *Rename) Apply(in ...telegraf.Metric) []telegraf.Metric {
|
func (r *Rename) Apply(in ...telegraf.Metric) []telegraf.Metric {
|
||||||
for _, point := range in {
|
for _, point := range in {
|
||||||
for _, measurementRenamer := range r.Measurement {
|
for _, replace := range r.Replaces {
|
||||||
if point.Name() == measurementRenamer.From {
|
if replace.Dest == "" {
|
||||||
point.SetName(measurementRenamer.To)
|
continue
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
for _, tagRenamer := range r.Tag {
|
if replace.Measurement != "" {
|
||||||
if value, ok := point.GetTag(tagRenamer.From); ok {
|
if value := point.Name(); value == replace.Measurement {
|
||||||
point.RemoveTag(tagRenamer.From)
|
point.SetName(replace.Dest)
|
||||||
point.AddTag(tagRenamer.To, value)
|
}
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
for _, fieldRenamer := range r.Field {
|
if replace.Tag != "" {
|
||||||
if value, ok := point.GetField(fieldRenamer.From); ok {
|
if value, ok := point.GetTag(replace.Tag); ok {
|
||||||
point.RemoveField(fieldRenamer.From)
|
point.RemoveTag(replace.Tag)
|
||||||
point.AddField(fieldRenamer.To, value)
|
point.AddTag(replace.Dest, value)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if replace.Field != "" {
|
||||||
|
if value, ok := point.GetField(replace.Field); ok {
|
||||||
|
point.RemoveField(replace.Field)
|
||||||
|
point.AddField(replace.Dest, value)
|
||||||
|
}
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,10 +21,11 @@ func newMetric(name string, tags map[string]string, fields map[string]interface{
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMeasurementRename(t *testing.T) {
|
func TestMeasurementRename(t *testing.T) {
|
||||||
r := Rename{}
|
r := Rename{
|
||||||
r.Measurement = []renamer{
|
Replaces: []Replace{
|
||||||
{From: "foo", To: "bar"},
|
{Measurement: "foo", Dest: "bar"},
|
||||||
{From: "baz", To: "quux"},
|
{Measurement: "baz", Dest: "quux"},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
m1 := newMetric("foo", nil, nil)
|
m1 := newMetric("foo", nil, nil)
|
||||||
m2 := newMetric("bar", nil, nil)
|
m2 := newMetric("bar", nil, nil)
|
||||||
|
@ -36,9 +37,10 @@ func TestMeasurementRename(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTagRename(t *testing.T) {
|
func TestTagRename(t *testing.T) {
|
||||||
r := Rename{}
|
r := Rename{
|
||||||
r.Tag = []renamer{
|
Replaces: []Replace{
|
||||||
{From: "hostname", To: "host"},
|
{Tag: "hostname", Dest: "host"},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
m := newMetric("foo", map[string]string{"hostname": "localhost", "region": "east-1"}, nil)
|
m := newMetric("foo", map[string]string{"hostname": "localhost", "region": "east-1"}, nil)
|
||||||
results := r.Apply(m)
|
results := r.Apply(m)
|
||||||
|
@ -47,9 +49,10 @@ func TestTagRename(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFieldRename(t *testing.T) {
|
func TestFieldRename(t *testing.T) {
|
||||||
r := Rename{}
|
r := Rename{
|
||||||
r.Field = []renamer{
|
Replaces: []Replace{
|
||||||
{From: "time_msec", To: "time"},
|
{Field: "time_msec", Dest: "time"},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
m := newMetric("foo", nil, map[string]interface{}{"time_msec": int64(1250), "snakes": true})
|
m := newMetric("foo", nil, map[string]interface{}{"time_msec": int64(1250), "snakes": true})
|
||||||
results := r.Apply(m)
|
results := r.Apply(m)
|
||||||
|
|
Loading…
Reference in New Issue