Add support for converting tag or field to measurement in converter processor (#7049)
This commit is contained in:
@@ -2,7 +2,6 @@ package converter
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"strconv"
|
||||
|
||||
@@ -18,6 +17,7 @@ var sampleConfig = `
|
||||
## select the keys to convert. The array may contain globs.
|
||||
## <target-type> = [<tag-key>...]
|
||||
[processors.converter.tags]
|
||||
measurement = []
|
||||
string = []
|
||||
integer = []
|
||||
unsigned = []
|
||||
@@ -30,6 +30,7 @@ var sampleConfig = `
|
||||
## select the keys to convert. The array may contain globs.
|
||||
## <target-type> = [<field-key>...]
|
||||
[processors.converter.fields]
|
||||
measurement = []
|
||||
tag = []
|
||||
string = []
|
||||
integer = []
|
||||
@@ -39,30 +40,32 @@ var sampleConfig = `
|
||||
`
|
||||
|
||||
type Conversion struct {
|
||||
Tag []string `toml:"tag"`
|
||||
String []string `toml:"string"`
|
||||
Integer []string `toml:"integer"`
|
||||
Unsigned []string `toml:"unsigned"`
|
||||
Boolean []string `toml:"boolean"`
|
||||
Float []string `toml:"float"`
|
||||
Measurement []string `toml:"measurement"`
|
||||
Tag []string `toml:"tag"`
|
||||
String []string `toml:"string"`
|
||||
Integer []string `toml:"integer"`
|
||||
Unsigned []string `toml:"unsigned"`
|
||||
Boolean []string `toml:"boolean"`
|
||||
Float []string `toml:"float"`
|
||||
}
|
||||
|
||||
type Converter struct {
|
||||
Tags *Conversion `toml:"tags"`
|
||||
Fields *Conversion `toml:"fields"`
|
||||
Tags *Conversion `toml:"tags"`
|
||||
Fields *Conversion `toml:"fields"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
initialized bool
|
||||
tagConversions *ConversionFilter
|
||||
fieldConversions *ConversionFilter
|
||||
}
|
||||
|
||||
type ConversionFilter struct {
|
||||
Tag filter.Filter
|
||||
String filter.Filter
|
||||
Integer filter.Filter
|
||||
Unsigned filter.Filter
|
||||
Boolean filter.Filter
|
||||
Float filter.Filter
|
||||
Measurement filter.Filter
|
||||
Tag filter.Filter
|
||||
String filter.Filter
|
||||
Integer filter.Filter
|
||||
Unsigned filter.Filter
|
||||
Boolean filter.Filter
|
||||
Float filter.Filter
|
||||
}
|
||||
|
||||
func (p *Converter) SampleConfig() string {
|
||||
@@ -73,15 +76,11 @@ func (p *Converter) Description() string {
|
||||
return "Convert values to another metric value type"
|
||||
}
|
||||
|
||||
func (p *Converter) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
|
||||
if !p.initialized {
|
||||
err := p.compile()
|
||||
if err != nil {
|
||||
logPrintf("initialization error: %v\n", err)
|
||||
return metrics
|
||||
}
|
||||
}
|
||||
func (p *Converter) Init() error {
|
||||
return p.compile()
|
||||
}
|
||||
|
||||
func (p *Converter) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
|
||||
for _, metric := range metrics {
|
||||
p.convertTags(metric)
|
||||
p.convertFields(metric)
|
||||
@@ -106,7 +105,6 @@ func (p *Converter) compile() error {
|
||||
|
||||
p.tagConversions = tf
|
||||
p.fieldConversions = ff
|
||||
p.initialized = true
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -117,6 +115,11 @@ func compileFilter(conv *Conversion) (*ConversionFilter, error) {
|
||||
|
||||
var err error
|
||||
cf := &ConversionFilter{}
|
||||
cf.Measurement, err = filter.Compile(conv.Measurement)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cf.Tag, err = filter.Compile(conv.Tag)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -150,13 +153,19 @@ func compileFilter(conv *Conversion) (*ConversionFilter, error) {
|
||||
return cf, nil
|
||||
}
|
||||
|
||||
// convertTags converts tags into fields
|
||||
// convertTags converts tags into measurements or fields.
|
||||
func (p *Converter) convertTags(metric telegraf.Metric) {
|
||||
if p.tagConversions == nil {
|
||||
return
|
||||
}
|
||||
|
||||
for key, value := range metric.Tags() {
|
||||
if p.tagConversions.Measurement != nil && p.tagConversions.Measurement.Match(key) {
|
||||
metric.RemoveTag(key)
|
||||
metric.SetName(value)
|
||||
continue
|
||||
}
|
||||
|
||||
if p.tagConversions.String != nil && p.tagConversions.String.Match(key) {
|
||||
metric.RemoveTag(key)
|
||||
metric.AddField(key, value)
|
||||
@@ -167,7 +176,7 @@ func (p *Converter) convertTags(metric telegraf.Metric) {
|
||||
v, ok := toInteger(value)
|
||||
if !ok {
|
||||
metric.RemoveTag(key)
|
||||
logPrintf("error converting to integer [%T]: %v\n", value, value)
|
||||
p.Log.Errorf("error converting to integer [%T]: %v", value, value)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -179,7 +188,7 @@ func (p *Converter) convertTags(metric telegraf.Metric) {
|
||||
v, ok := toUnsigned(value)
|
||||
if !ok {
|
||||
metric.RemoveTag(key)
|
||||
logPrintf("error converting to unsigned [%T]: %v\n", value, value)
|
||||
p.Log.Errorf("error converting to unsigned [%T]: %v", value, value)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -192,7 +201,7 @@ func (p *Converter) convertTags(metric telegraf.Metric) {
|
||||
v, ok := toBool(value)
|
||||
if !ok {
|
||||
metric.RemoveTag(key)
|
||||
logPrintf("error converting to boolean [%T]: %v\n", value, value)
|
||||
p.Log.Errorf("error converting to boolean [%T]: %v", value, value)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -205,7 +214,7 @@ func (p *Converter) convertTags(metric telegraf.Metric) {
|
||||
v, ok := toFloat(value)
|
||||
if !ok {
|
||||
metric.RemoveTag(key)
|
||||
logPrintf("error converting to float [%T]: %v\n", value, value)
|
||||
p.Log.Errorf("error converting to float [%T]: %v", value, value)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -216,18 +225,31 @@ func (p *Converter) convertTags(metric telegraf.Metric) {
|
||||
}
|
||||
}
|
||||
|
||||
// convertFields converts fields into tags or other field types
|
||||
// convertFields converts fields into measurements, tags, or other field types.
|
||||
func (p *Converter) convertFields(metric telegraf.Metric) {
|
||||
if p.fieldConversions == nil {
|
||||
return
|
||||
}
|
||||
|
||||
for key, value := range metric.Fields() {
|
||||
if p.fieldConversions.Measurement != nil && p.fieldConversions.Measurement.Match(key) {
|
||||
v, ok := toString(value)
|
||||
if !ok {
|
||||
metric.RemoveField(key)
|
||||
p.Log.Errorf("error converting to measurement [%T]: %v", value, value)
|
||||
continue
|
||||
}
|
||||
|
||||
metric.RemoveField(key)
|
||||
metric.SetName(v)
|
||||
continue
|
||||
}
|
||||
|
||||
if p.fieldConversions.Tag != nil && p.fieldConversions.Tag.Match(key) {
|
||||
v, ok := toString(value)
|
||||
if !ok {
|
||||
metric.RemoveField(key)
|
||||
logPrintf("error converting to tag [%T]: %v\n", value, value)
|
||||
p.Log.Errorf("error converting to tag [%T]: %v", value, value)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -240,7 +262,7 @@ func (p *Converter) convertFields(metric telegraf.Metric) {
|
||||
v, ok := toFloat(value)
|
||||
if !ok {
|
||||
metric.RemoveField(key)
|
||||
logPrintf("error converting to float [%T]: %v\n", value, value)
|
||||
p.Log.Errorf("error converting to float [%T]: %v", value, value)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -253,7 +275,7 @@ func (p *Converter) convertFields(metric telegraf.Metric) {
|
||||
v, ok := toInteger(value)
|
||||
if !ok {
|
||||
metric.RemoveField(key)
|
||||
logPrintf("error converting to integer [%T]: %v\n", value, value)
|
||||
p.Log.Errorf("error converting to integer [%T]: %v", value, value)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -266,7 +288,7 @@ func (p *Converter) convertFields(metric telegraf.Metric) {
|
||||
v, ok := toUnsigned(value)
|
||||
if !ok {
|
||||
metric.RemoveField(key)
|
||||
logPrintf("error converting to unsigned [%T]: %v\n", value, value)
|
||||
p.Log.Errorf("error converting to unsigned [%T]: %v", value, value)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -279,7 +301,7 @@ func (p *Converter) convertFields(metric telegraf.Metric) {
|
||||
v, ok := toBool(value)
|
||||
if !ok {
|
||||
metric.RemoveField(key)
|
||||
logPrintf("error converting to bool [%T]: %v\n", value, value)
|
||||
p.Log.Errorf("error converting to bool [%T]: %v", value, value)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -292,7 +314,7 @@ func (p *Converter) convertFields(metric telegraf.Metric) {
|
||||
v, ok := toString(value)
|
||||
if !ok {
|
||||
metric.RemoveField(key)
|
||||
logPrintf("error converting to string [%T]: %v\n", value, value)
|
||||
p.Log.Errorf("Error converting to string [%T]: %v", value, value)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -336,7 +358,7 @@ func toInteger(v interface{}) (int64, bool) {
|
||||
} else if value > float64(math.MaxInt64) {
|
||||
return math.MaxInt64, true
|
||||
} else {
|
||||
return int64(Round(value)), true
|
||||
return int64(math.Round(value)), true
|
||||
}
|
||||
case bool:
|
||||
if value {
|
||||
@@ -375,7 +397,7 @@ func toUnsigned(v interface{}) (uint64, bool) {
|
||||
} else if value > float64(math.MaxUint64) {
|
||||
return math.MaxUint64, true
|
||||
} else {
|
||||
return uint64(Round(value)), true
|
||||
return uint64(math.Round(value)), true
|
||||
}
|
||||
case bool:
|
||||
if value {
|
||||
@@ -435,20 +457,6 @@ func toString(v interface{}) (string, bool) {
|
||||
return "", false
|
||||
}
|
||||
|
||||
// math.Round was not added until Go 1.10, can be removed when support for Go
|
||||
// 1.9 is dropped.
|
||||
func Round(x float64) float64 {
|
||||
t := math.Trunc(x)
|
||||
if math.Abs(x-t) >= 0.5 {
|
||||
return t + math.Copysign(1, x)
|
||||
}
|
||||
return t
|
||||
}
|
||||
|
||||
func logPrintf(format string, v ...interface{}) {
|
||||
log.Printf("D! [processors.converter] "+format, v...)
|
||||
}
|
||||
|
||||
func init() {
|
||||
processors.Add("converter", func() telegraf.Processor {
|
||||
return &Converter{}
|
||||
|
||||
Reference in New Issue
Block a user