Add Enum Processor (#3772)

This commit is contained in:
Karsten Schnitter 2018-07-04 00:32:52 +02:00 committed by Daniel Nelson
parent c389a68f19
commit 515ff03364
4 changed files with 252 additions and 0 deletions

View File

@ -2,6 +2,7 @@ package all
import (
_ "github.com/influxdata/telegraf/plugins/processors/converter"
_ "github.com/influxdata/telegraf/plugins/processors/enum"
_ "github.com/influxdata/telegraf/plugins/processors/override"
_ "github.com/influxdata/telegraf/plugins/processors/printer"
_ "github.com/influxdata/telegraf/plugins/processors/regex"

View File

@ -0,0 +1,34 @@
# Enum Processor Plugin
The Enum Processor allows the configuration of value mappings for metric fields.
The main use-case for this is to rewrite status codes such as _red_, _amber_ and
_green_ by numeric values such as 0, 1, 2. The plugin supports string and bool
types for the field values. Multiple Fields can be configured with separate
value mappings for each field. Default mapping values can be configured to be
used for all values, which are not contained in the value_mappings. The
processor supports explicit configuration of a destination field. By default the
source field is overwritten.
### Configuration
Configuration using table syntax:
`toml
# Configure a status mapping for field 'status'
[[processors.enum.fields]]
source = "status"
destination = "code"
default = -1
[processors.enum.fields.value_mappings]
green = 0
yellow = 1
red = 2
`
Configuration using inline syntax:
`toml
# Configure a status mapping for field 'status'
[[processors.enum.fields]]
source = "status"
destination = "code"
default = -1
value_mappings = {green = 0, yellow = 1, red = 2 }
`

View File

@ -0,0 +1,111 @@
package enum
import (
"strconv"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/processors"
)
var sampleConfig = `
## NOTE This processor will map metric values to different values. It is aimed
## to map enum values to numeric values.
## Fields to be considered
# [[processors.enum.fields]]
#
# Name of the field source field to map
# source = "name"
#
# Optional destination field to be used for the mapped value. Source field is
# used, when no explicit destination is configured.
# destination = "mapped"
#
# Optional default value to be used for all values not contained in the mapping
# table. Only applied when configured.
# default = 0
#
# Value Mapping Table
# [processors.enum.value_mappings]
# value1 = 1
# value2 = 2
#
## Alternatively the mapping table can be given in inline notation
# value_mappings = {value1 = 1, value2 = 2}
`
type EnumMapper struct {
Fields []Mapping
}
type Mapping struct {
Source string
Destination string
Default interface{}
ValueMappings map[string]interface{}
}
func (mapper *EnumMapper) SampleConfig() string {
return sampleConfig
}
func (mapper *EnumMapper) Description() string {
return "Map enum values according to given table."
}
func (mapper *EnumMapper) Apply(in ...telegraf.Metric) []telegraf.Metric {
for i := 0; i < len(in); i++ {
in[i] = mapper.applyMappings(in[i])
}
return in
}
func (mapper *EnumMapper) applyMappings(metric telegraf.Metric) telegraf.Metric {
for _, mapping := range mapper.Fields {
if originalValue, isPresent := metric.GetField(mapping.Source); isPresent == true {
if adjustedValue, isString := adjustBoolValue(originalValue).(string); isString == true {
if mappedValue, isMappedValuePresent := mapping.mapValue(adjustedValue); isMappedValuePresent == true {
writeField(metric, mapping.getDestination(), mappedValue)
}
}
}
}
return metric
}
func adjustBoolValue(in interface{}) interface{} {
if mappedBool, isBool := in.(bool); isBool == true {
return strconv.FormatBool(mappedBool)
}
return in
}
func (mapping *Mapping) mapValue(original string) (interface{}, bool) {
if mapped, found := mapping.ValueMappings[original]; found == true {
return mapped, true
}
if mapping.Default != nil {
return mapping.Default, true
}
return original, false
}
func (mapping *Mapping) getDestination() string {
if mapping.Destination != "" {
return mapping.Destination
}
return mapping.Source
}
func writeField(metric telegraf.Metric, name string, value interface{}) {
if metric.HasField(name) {
metric.RemoveField(name)
}
metric.AddField(name, value)
}
func init() {
processors.Add("enum", func() telegraf.Processor {
return &EnumMapper{}
})
}

View File

@ -0,0 +1,106 @@
package enum
import (
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert"
)
func createTestMetric() telegraf.Metric {
metric, _ := metric.New("m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{
"string_value": "test",
"int_value": int(13),
"true_value": true,
},
time.Now(),
)
return metric
}
func calculateProcessedValues(mapper EnumMapper, metric telegraf.Metric) map[string]interface{} {
processed := mapper.Apply(metric)
return processed[0].Fields()
}
func assertFieldValue(t *testing.T, expected interface{}, field string, fields map[string]interface{}) {
value, present := fields[field]
assert.True(t, present, "value of field '"+field+"' was not present")
assert.EqualValues(t, expected, value)
}
func TestRetainsMetric(t *testing.T) {
mapper := EnumMapper{}
source := createTestMetric()
target := mapper.Apply(source)[0]
fields := target.Fields()
assertFieldValue(t, "test", "string_value", fields)
assertFieldValue(t, 13, "int_value", fields)
assertFieldValue(t, true, "true_value", fields)
assert.Equal(t, "m1", target.Name())
assert.Equal(t, source.Tags(), target.Tags())
assert.Equal(t, source.Time(), target.Time())
}
func TestMapsSingleStringValue(t *testing.T) {
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", ValueMappings: map[string]interface{}{"test": int64(1)}}}}
fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, 1, "string_value", fields)
}
func TestNoFailureOnMappingsOnNonStringValuedFields(t *testing.T) {
mapper := EnumMapper{Fields: []Mapping{{Source: "int_value", ValueMappings: map[string]interface{}{"13i": int64(7)}}}}
fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, 13, "int_value", fields)
}
func TestMapSingleBoolValue(t *testing.T) {
mapper := EnumMapper{Fields: []Mapping{{Source: "true_value", ValueMappings: map[string]interface{}{"true": int64(1)}}}}
fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, 1, "true_value", fields)
}
func TestMapsToDefaultValueOnUnknownSourceValue(t *testing.T) {
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", Default: int64(42), ValueMappings: map[string]interface{}{"other": int64(1)}}}}
fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, 42, "string_value", fields)
}
func TestDoNotMapToDefaultValueKnownSourceValue(t *testing.T) {
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", Default: int64(42), ValueMappings: map[string]interface{}{"test": int64(1)}}}}
fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, 1, "string_value", fields)
}
func TestNoMappingWithoutDefaultOrDefinedMappingValue(t *testing.T) {
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", ValueMappings: map[string]interface{}{"other": int64(1)}}}}
fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, "test", "string_value", fields)
}
func TestWritesToDestination(t *testing.T) {
mapper := EnumMapper{Fields: []Mapping{{Source: "string_value", Destination: "string_code", ValueMappings: map[string]interface{}{"test": int64(1)}}}}
fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, "test", "string_value", fields)
assertFieldValue(t, 1, "string_code", fields)
}