Add converter processor (#4178)

This commit is contained in:
Daniel Nelson 2018-05-23 14:29:57 -07:00 committed by GitHub
parent 2d0028ce1c
commit 94715c5ae9
4 changed files with 967 additions and 0 deletions

View File

@ -1,6 +1,7 @@
package all package all
import ( import (
_ "github.com/influxdata/telegraf/plugins/processors/converter"
_ "github.com/influxdata/telegraf/plugins/processors/override" _ "github.com/influxdata/telegraf/plugins/processors/override"
_ "github.com/influxdata/telegraf/plugins/processors/printer" _ "github.com/influxdata/telegraf/plugins/processors/printer"
_ "github.com/influxdata/telegraf/plugins/processors/regex" _ "github.com/influxdata/telegraf/plugins/processors/regex"

View File

@ -0,0 +1,57 @@
# Converter Processor
The converter processor is used to change the type of tag or field values. In
addition to changing field types it can convert fields to tags and vis versa.
Values that cannot be converted are dropped.
**Note:** When converting tags to fields, take care not to ensure the series is still
uniquely identifiable. Fields with the same series key (measurement + tags)
will overwrite one another.
### Configuration:
```toml
# Convert values to another metric value type
[processors.converter]
## Tags to convert
##
## The table key determines the target type, and the array of key-values
## select the keys to convert. The array may contain globs.
## <target-type> = [<tag-key>...]
[processors.converter.tags]
string = []
integer = []
unsigned = []
boolean = []
float = []
## Fields to convert
##
## The table key determines the target type, and the array of key-values
## select the keys to convert. The array may contain globs.
## <target-type> = [<field-key>...]
[processors.converter.fields]
tag = []
string = []
integer = []
unsigned = []
boolean = []
float = []
```
### Examples:
```toml
[processors.converter]
[processors.converter.tags]
string = ["port"]
[processors.converter.fields]
integer = ["scboard_*"]
tag = ["ParseServerConfigGeneration"]
```
```diff
- apache,port=80,server=debian-stretch-apache BusyWorkers=1,BytesPerReq=0,BytesPerSec=0,CPUChildrenSystem=0,CPUChildrenUser=0,CPULoad=0.00995025,CPUSystem=0.01,CPUUser=0.01,ConnsAsyncClosing=0,ConnsAsyncKeepAlive=0,ConnsAsyncWriting=0,ConnsTotal=0,IdleWorkers=49,Load1=0.01,Load15=0,Load5=0,ParentServerConfigGeneration=3,ParentServerMPMGeneration=2,ReqPerSec=0.00497512,ServerUptimeSeconds=201,TotalAccesses=1,TotalkBytes=0,Uptime=201,scboard_closing=0,scboard_dnslookup=0,scboard_finishing=0,scboard_idle_cleanup=0,scboard_keepalive=0,scboard_logging=0,scboard_open=100,scboard_reading=0,scboard_sending=1,scboard_starting=0,scboard_waiting=49 1502489900000000000
+ apache,server=debian-stretch-apache,ParentServerConfigGeneration=3 port="80",BusyWorkers=1,BytesPerReq=0,BytesPerSec=0,CPUChildrenSystem=0,CPUChildrenUser=0,CPULoad=0.00995025,CPUSystem=0.01,CPUUser=0.01,ConnsAsyncClosing=0,ConnsAsyncKeepAlive=0,ConnsAsyncWriting=0,ConnsTotal=0,IdleWorkers=49,Load1=0.01,Load15=0,Load5=0,ParentServerMPMGeneration=2,ReqPerSec=0.00497512,ServerUptimeSeconds=201,TotalAccesses=1,TotalkBytes=0,Uptime=201,scboard_closing=0i,scboard_dnslookup=0i,scboard_finishing=0i,scboard_idle_cleanup=0i,scboard_keepalive=0i,scboard_logging=0i,scboard_open=100i,scboard_reading=0i,scboard_sending=1i,scboard_starting=0i,scboard_waiting=49i 1502489900000000000
```

View File

@ -0,0 +1,430 @@
package converter
import (
"fmt"
"log"
"math"
"strconv"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/plugins/processors"
)
var sampleConfig = `
## Tags to convert
##
## The table key determines the target type, and the array of key-values
## select the keys to convert. The array may contain globs.
## <target-type> = [<tag-key>...]
[processors.converter.tags]
string = []
integer = []
unsigned = []
boolean = []
float = []
## Fields to convert
##
## The table key determines the target type, and the array of key-values
## select the keys to convert. The array may contain globs.
## <target-type> = [<field-key>...]
[processors.converter.fields]
tag = []
string = []
integer = []
unsigned = []
boolean = []
float = []
`
type Conversion struct {
Tag []string `toml:"tag"`
String []string `toml:"string"`
Integer []string `toml:"integer"`
Unsigned []string `toml:"unsigned"`
Boolean []string `toml:"boolean"`
Float []string `toml:"float"`
}
type Converter struct {
Tags *Conversion `toml:"tags"`
Fields *Conversion `toml:"fields"`
initialized bool
tagConversions *ConversionFilter
fieldConversions *ConversionFilter
}
type ConversionFilter struct {
Tag filter.Filter
String filter.Filter
Integer filter.Filter
Unsigned filter.Filter
Boolean filter.Filter
Float filter.Filter
}
func (p *Converter) SampleConfig() string {
return sampleConfig
}
func (p *Converter) Description() string {
return "Convert values to another metric value type"
}
func (p *Converter) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
if !p.initialized {
err := p.compile()
if err != nil {
logPrintf("initialization error: %v\n", err)
return metrics
}
}
for _, metric := range metrics {
p.convertTags(metric)
p.convertFields(metric)
}
return metrics
}
func (p *Converter) compile() error {
tf, err := compileFilter(p.Tags)
if err != nil {
return err
}
ff, err := compileFilter(p.Fields)
if err != nil {
return err
}
if tf == nil && ff == nil {
return fmt.Errorf("no filters found")
}
p.tagConversions = tf
p.fieldConversions = ff
p.initialized = true
return nil
}
func compileFilter(conv *Conversion) (*ConversionFilter, error) {
if conv == nil {
return nil, nil
}
var err error
cf := &ConversionFilter{}
cf.Tag, err = filter.Compile(conv.Tag)
if err != nil {
return nil, err
}
cf.String, err = filter.Compile(conv.String)
if err != nil {
return nil, err
}
cf.Integer, err = filter.Compile(conv.Integer)
if err != nil {
return nil, err
}
cf.Unsigned, err = filter.Compile(conv.Unsigned)
if err != nil {
return nil, err
}
cf.Boolean, err = filter.Compile(conv.Boolean)
if err != nil {
return nil, err
}
cf.Float, err = filter.Compile(conv.Float)
if err != nil {
return nil, err
}
return cf, nil
}
// convertTags converts tags into fields
func (p *Converter) convertTags(metric telegraf.Metric) {
if p.tagConversions == nil {
return
}
for key, value := range metric.Tags() {
if p.tagConversions.String != nil && p.tagConversions.String.Match(key) {
metric.RemoveTag(key)
metric.AddField(key, value)
continue
}
if p.tagConversions.Integer != nil && p.tagConversions.Integer.Match(key) {
v, ok := toInteger(value)
if !ok {
metric.RemoveTag(key)
logPrintf("error converting to integer [%T]: %v\n", value, value)
continue
}
metric.RemoveTag(key)
metric.AddField(key, v)
}
if p.tagConversions.Unsigned != nil && p.tagConversions.Unsigned.Match(key) {
v, ok := toUnsigned(value)
if !ok {
metric.RemoveTag(key)
logPrintf("error converting to unsigned [%T]: %v\n", value, value)
continue
}
metric.RemoveTag(key)
metric.AddField(key, v)
continue
}
if p.tagConversions.Boolean != nil && p.tagConversions.Boolean.Match(key) {
v, ok := toBool(value)
if !ok {
metric.RemoveTag(key)
logPrintf("error converting to boolean [%T]: %v\n", value, value)
continue
}
metric.RemoveTag(key)
metric.AddField(key, v)
continue
}
if p.tagConversions.Float != nil && p.tagConversions.Float.Match(key) {
v, ok := toFloat(value)
if !ok {
metric.RemoveTag(key)
logPrintf("error converting to float [%T]: %v\n", value, value)
continue
}
metric.RemoveTag(key)
metric.AddField(key, v)
continue
}
}
}
// convertFields converts fields into tags or other field types
func (p *Converter) convertFields(metric telegraf.Metric) {
if p.fieldConversions == nil {
return
}
for key, value := range metric.Fields() {
if p.fieldConversions.Tag != nil && p.fieldConversions.Tag.Match(key) {
v, ok := toString(value)
if !ok {
metric.RemoveField(key)
logPrintf("error converting to tag [%T]: %v\n", value, value)
continue
}
metric.RemoveField(key)
metric.AddTag(key, v)
continue
}
if p.fieldConversions.Float != nil && p.fieldConversions.Float.Match(key) {
v, ok := toFloat(value)
if !ok {
metric.RemoveField(key)
logPrintf("error converting to integer [%T]: %v\n", value, value)
continue
}
metric.RemoveField(key)
metric.AddField(key, v)
continue
}
if p.fieldConversions.Integer != nil && p.fieldConversions.Integer.Match(key) {
v, ok := toInteger(value)
if !ok {
metric.RemoveField(key)
logPrintf("error converting to integer [%T]: %v\n", value, value)
continue
}
metric.RemoveField(key)
metric.AddField(key, v)
continue
}
if p.fieldConversions.Unsigned != nil && p.fieldConversions.Unsigned.Match(key) {
v, ok := toUnsigned(value)
if !ok {
metric.RemoveField(key)
logPrintf("error converting to unsigned [%T]: %v\n", value, value)
continue
}
metric.RemoveField(key)
metric.AddField(key, v)
continue
}
if p.fieldConversions.Boolean != nil && p.fieldConversions.Boolean.Match(key) {
v, ok := toBool(value)
if !ok {
metric.RemoveField(key)
logPrintf("error converting to bool [%T]: %v\n", value, value)
continue
}
metric.RemoveField(key)
metric.AddField(key, v)
continue
}
if p.fieldConversions.String != nil && p.fieldConversions.String.Match(key) {
v, ok := toString(value)
if !ok {
metric.RemoveField(key)
logPrintf("error converting to string [%T]: %v\n", value, value)
continue
}
metric.RemoveField(key)
metric.AddField(key, v)
continue
}
}
}
func toBool(v interface{}) (bool, bool) {
switch value := v.(type) {
case int64, uint64, float64:
if value != 0 {
return true, true
} else {
return false, false
}
case bool:
return value, true
case string:
result, err := strconv.ParseBool(value)
return result, err == nil
}
return false, false
}
func toInteger(v interface{}) (int64, bool) {
switch value := v.(type) {
case int64:
return value, true
case uint64:
if value <= uint64(math.MaxInt64) {
return int64(value), true
} else {
return math.MaxInt64, true
}
case float64:
if value < float64(math.MinInt64) {
return math.MinInt64, true
} else if value > float64(math.MaxInt64) {
return math.MaxInt64, true
} else {
return int64(value), true
}
case bool:
if value {
return 1, true
} else {
return 0, true
}
case string:
result, err := strconv.ParseInt(value, 10, 64)
return result, err == nil
}
return 0, false
}
func toUnsigned(v interface{}) (uint64, bool) {
switch value := v.(type) {
case uint64:
return value, true
case int64:
if value < 0 {
return 0, true
} else {
return uint64(value), true
}
case float64:
if value < 0.0 {
return 0, true
} else if value > float64(math.MaxUint64) {
return math.MaxUint64, true
} else {
return uint64(value), true
}
case bool:
if value {
return 1, true
} else {
return 0, true
}
case string:
result, err := strconv.ParseUint(value, 10, 64)
return result, err == nil
}
return 0, false
}
func toFloat(v interface{}) (float64, bool) {
switch value := v.(type) {
case int64:
return float64(value), true
case uint64:
return float64(value), true
case float64:
return value, true
case bool:
if value {
return 1.0, true
} else {
return 0.0, true
}
case string:
result, err := strconv.ParseFloat(value, 64)
return result, err == nil
}
return 0.0, false
}
func toString(v interface{}) (string, bool) {
switch value := v.(type) {
case int64:
return strconv.FormatInt(value, 10), true
case uint64:
return strconv.FormatUint(value, 10), true
case float64:
return strconv.FormatFloat(value, 'f', -1, 64), true
case bool:
return strconv.FormatBool(value), true
case string:
return value, true
}
return "", false
}
func logPrintf(format string, v ...interface{}) {
log.Printf("D! [processors.converter] "+format, v...)
}
func init() {
processors.Add("converter", func() telegraf.Processor {
return &Converter{}
})
}

View File

@ -0,0 +1,479 @@
package converter
import (
"math"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/require"
)
func Metric(v telegraf.Metric, err error) telegraf.Metric {
if err != nil {
panic(err)
}
return v
}
func TestConverter(t *testing.T) {
tests := []struct {
name string
converter *Converter
input telegraf.Metric
expected telegraf.Metric
}{
{
name: "empty",
converter: &Converter{},
input: Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
),
expected: Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
),
},
{
name: "from tag",
converter: &Converter{
Tags: &Conversion{
String: []string{"string"},
Integer: []string{"int"},
Unsigned: []string{"uint"},
Boolean: []string{"bool"},
Float: []string{"float"},
Tag: []string{"tag"},
},
},
input: Metric(
metric.New(
"cpu",
map[string]string{
"float": "42",
"int": "42",
"uint": "42",
"bool": "true",
"string": "howdy",
"tag": "tag",
},
map[string]interface{}{},
time.Unix(0, 0),
),
),
expected: Metric(
metric.New(
"cpu",
map[string]string{
"tag": "tag",
},
map[string]interface{}{
"float": 42.0,
"int": int64(42),
"uint": uint64(42),
"bool": true,
"string": "howdy",
},
time.Unix(0, 0),
),
),
},
{
name: "from tag unconvertible",
converter: &Converter{
Tags: &Conversion{
Integer: []string{"int"},
Unsigned: []string{"uint"},
Boolean: []string{"bool"},
Float: []string{"float"},
},
},
input: Metric(
metric.New(
"cpu",
map[string]string{
"float": "a",
"int": "b",
"uint": "c",
"bool": "maybe",
},
map[string]interface{}{},
time.Unix(0, 0),
),
),
expected: Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{},
time.Unix(0, 0),
),
),
},
{
name: "from string field",
converter: &Converter{
Fields: &Conversion{
String: []string{"a"},
Integer: []string{"b"},
Unsigned: []string{"c"},
Boolean: []string{"d"},
Float: []string{"e"},
Tag: []string{"f"},
},
},
input: Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"a": "howdy",
"b": "42",
"c": "42",
"d": "true",
"e": "42.0",
"f": "foo",
},
time.Unix(0, 0),
),
),
expected: Metric(
metric.New(
"cpu",
map[string]string{
"f": "foo",
},
map[string]interface{}{
"a": "howdy",
"b": int64(42),
"c": uint64(42),
"d": true,
"e": 42.0,
},
time.Unix(0, 0),
),
),
},
{
name: "from string field unconvertible",
converter: &Converter{
Fields: &Conversion{
Integer: []string{"a"},
Unsigned: []string{"b"},
Boolean: []string{"c"},
Float: []string{"d"},
},
},
input: Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"a": "a",
"b": "b",
"c": "c",
"d": "d",
},
time.Unix(0, 0),
),
),
expected: Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{},
time.Unix(0, 0),
),
),
},
{
name: "from integer field",
converter: &Converter{
Fields: &Conversion{
String: []string{"a"},
Integer: []string{"b"},
Unsigned: []string{"c", "negative_uint"},
Boolean: []string{"d"},
Float: []string{"e"},
Tag: []string{"f"},
},
},
input: Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"a": int64(42),
"b": int64(42),
"c": int64(42),
"d": int64(42),
"e": int64(42),
"f": int64(42),
"negative_uint": int64(-42),
},
time.Unix(0, 0),
),
),
expected: Metric(
metric.New(
"cpu",
map[string]string{
"f": "42",
},
map[string]interface{}{
"a": "42",
"b": int64(42),
"c": uint64(42),
"d": true,
"e": 42.0,
"negative_uint": uint64(0),
},
time.Unix(0, 0),
),
),
},
{
name: "from unsigned field",
converter: &Converter{
Fields: &Conversion{
String: []string{"a"},
Integer: []string{"b", "overflow_int"},
Unsigned: []string{"c"},
Boolean: []string{"d"},
Float: []string{"e"},
Tag: []string{"f"},
},
},
input: Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"a": uint64(42),
"b": uint64(42),
"c": uint64(42),
"d": uint64(42),
"e": uint64(42),
"f": uint64(42),
"overflow_int": uint64(math.MaxUint64),
},
time.Unix(0, 0),
),
),
expected: Metric(
metric.New(
"cpu",
map[string]string{
"f": "42",
},
map[string]interface{}{
"a": "42",
"b": int64(42),
"c": uint64(42),
"d": true,
"e": 42.0,
"overflow_int": int64(math.MaxInt64),
},
time.Unix(0, 0),
),
),
},
{
name: "out of range for unsigned",
converter: &Converter{
Fields: &Conversion{
Unsigned: []string{"a", "b"},
},
},
input: Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"a": int64(-42),
"b": math.MaxFloat64,
},
time.Unix(0, 0),
),
),
expected: Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"a": uint64(0),
"b": uint64(math.MaxUint64),
},
time.Unix(0, 0),
),
),
},
{
name: "boolean field",
converter: &Converter{
Fields: &Conversion{
String: []string{"a", "af"},
Integer: []string{"b", "bf"},
Unsigned: []string{"c", "cf"},
Boolean: []string{"d", "df"},
Float: []string{"e", "ef"},
Tag: []string{"f", "ff"},
},
},
input: Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"a": true,
"b": true,
"c": true,
"d": true,
"e": true,
"f": true,
"af": false,
"bf": false,
"cf": false,
"df": false,
"ef": false,
"ff": false,
},
time.Unix(0, 0),
),
),
expected: Metric(
metric.New(
"cpu",
map[string]string{
"f": "true",
"ff": "false",
},
map[string]interface{}{
"a": "true",
"af": "false",
"b": int64(1),
"bf": int64(0),
"c": uint64(1),
"cf": uint64(0),
"d": true,
"df": false,
"e": 1.0,
"ef": 0.0,
},
time.Unix(0, 0),
),
),
},
{
name: "from float field",
converter: &Converter{
Fields: &Conversion{
String: []string{"a"},
Integer: []string{"b", "too_large_int", "too_small_int"},
Unsigned: []string{"c", "negative_uint", "too_large_uint", "too_small_uint"},
Boolean: []string{"d"},
Float: []string{"e"},
Tag: []string{"f"},
},
},
input: Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"a": 42.0,
"b": 42.0,
"c": 42.0,
"d": 42.0,
"e": 42.0,
"f": 42.0,
"too_large_int": math.MaxFloat64,
"too_large_uint": math.MaxFloat64,
"too_small_int": -math.MaxFloat64,
"too_small_uint": -math.MaxFloat64,
"negative_uint": -42.0,
},
time.Unix(0, 0),
),
),
expected: Metric(
metric.New(
"cpu",
map[string]string{
"f": "42",
},
map[string]interface{}{
"a": "42",
"b": int64(42),
"c": uint64(42),
"d": true,
"e": 42.0,
"too_large_int": int64(math.MaxInt64),
"too_large_uint": uint64(math.MaxUint64),
"too_small_int": int64(math.MinInt64),
"too_small_uint": uint64(0),
"negative_uint": uint64(0),
},
time.Unix(0, 0),
),
),
},
{
name: "globbing",
converter: &Converter{
Fields: &Conversion{
Integer: []string{"int_*"},
},
},
input: Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"int_a": "1",
"int_b": "2",
"float_a": 1.0,
},
time.Unix(0, 0),
),
),
expected: Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"int_a": int64(1),
"int_b": int64(2),
"float_a": 1.0,
},
time.Unix(0, 0),
),
),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
metrics := tt.converter.Apply(tt.input)
require.Equal(t, 1, len(metrics))
require.Equal(t, tt.expected.Name(), metrics[0].Name())
require.Equal(t, tt.expected.Tags(), metrics[0].Tags())
require.Equal(t, tt.expected.Fields(), metrics[0].Fields())
require.Equal(t, tt.expected.Time(), metrics[0].Time())
})
}
}