Add strings processor (#4476)

This commit is contained in:
bsmaldon 2018-09-05 23:13:29 +01:00 committed by Daniel Nelson
parent 12ff8bb5e0
commit d6467e966f
4 changed files with 766 additions and 0 deletions

View File

@ -7,6 +7,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/processors/parser"
_ "github.com/influxdata/telegraf/plugins/processors/printer"
_ "github.com/influxdata/telegraf/plugins/processors/regex"
_ "github.com/influxdata/telegraf/plugins/processors/strings"
_ "github.com/influxdata/telegraf/plugins/processors/rename"
_ "github.com/influxdata/telegraf/plugins/processors/topk"
)

View File

@ -0,0 +1,83 @@
# Strings Processor Plugin
The `strings` plugin maps certain go string functions onto measurement, tag, and field values. Values can be modified in place or stored in another key.
Implemented functions are:
- lowercase
- uppercase
- trim
- trim_left
- trim_right
- trim_prefix
- trim_suffix
Please note that in this implementation these are processed in the order that they appear above.
Specify the `measurement`, `tag` or `field` that you want processed in each section and optionally a `dest` if you want the result stored in a new tag or field. You can specify lots of transformations on data with a single strings processor.
### Configuration:
```toml
[[processors.strings]]
# [[processors.strings.uppercase]]
# tag = "method"
# [[processors.strings.lowercase]]
# field = "uri_stem"
# dest = "uri_stem_normalised"
## Convert a tag value to lowercase
# [[processors.strings.trim]]
# field = "message"
# [[processors.strings.trim_left]]
# field = "message"
# cutset = "\t"
# [[processors.strings.trim_right]]
# field = "message"
# cutset = "\r\n"
# [[processors.strings.trim_prefix]]
# field = "my_value"
# prefix = "my_"
# [[processors.strings.trim_suffix]]
# field = "read_count"
# suffix = "_count"
```
#### Trim, TrimLeft, TrimRight
The `trim`, `trim_left`, and `trim_right` functions take an optional parameter: `cutset`. This value is a string containing the characters to remove from the value.
#### TrimPrefix, TrimSuffix
The `trim_prefix` and `trim_suffix` functions remote the given `prefix` or `suffix`
respectively from the string.
### Example
**Config**
```toml
[[processors.strings]]
[[processors.strings.lowercase]]
field = "uri-stem"
[[processors.strings.trim_prefix]]
field = "uri_stem"
prefix = "/api/"
[[processors.strings.uppercase]]
field = "cs-host"
dest = "cs-host_normalised"
```
**Input**
```
iis_log,method=get,uri_stem=/API/HealthCheck cs-host="MIXEDCASE_host",referrer="-",ident="-",http_version=1.1,agent="UserAgent",resp_bytes=270i 1519652321000000000
```
**Output**
```
iis_log,method=get,uri_stem=healthcheck cs-host="MIXEDCASE_host",cs-host_normalised="MIXEDCASE_HOST",referrer="-",ident="-",http_version=1.1,agent="UserAgent",resp_bytes=270i 1519652321000000000
```

View File

@ -0,0 +1,199 @@
package strings
import (
"strings"
"unicode"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/processors"
)
type Strings struct {
Lowercase []converter `toml:"lowercase"`
Uppercase []converter `toml:"uppercase"`
Trim []converter `toml:"trim"`
TrimLeft []converter `toml:"trim_left"`
TrimRight []converter `toml:"trim_right"`
TrimPrefix []converter `toml:"trim_prefix"`
TrimSuffix []converter `toml:"trim_suffix"`
converters []converter
init bool
}
type ConvertFunc func(s string) string
type converter struct {
Field string
Tag string
Measurement string
Dest string
Cutset string
Suffix string
Prefix string
fn ConvertFunc
}
const sampleConfig = `
## Convert a tag value to uppercase
# [[processors.strings.uppercase]]
# tag = "method"
## Convert a field value to lowercase and store in a new field
# [[processors.strings.lowercase]]
# field = "uri_stem"
# dest = "uri_stem_normalised"
## Trim leading and trailing whitespace using the default cutset
# [[processors.strings.trim]]
# field = "message"
## Trim leading characters in cutset
# [[processors.strings.trim_left]]
# field = "message"
# cutset = "\t"
## Trim trailing characters in cutset
# [[processors.strings.trim_right]]
# field = "message"
# cutset = "\r\n"
## Trim the given prefix from the field
# [[processors.strings.trim_prefix]]
# field = "my_value"
# prefix = "my_"
## Trim the given suffix from the field
# [[processors.strings.trim_suffix]]
# field = "read_count"
# suffix = "_count"
`
func (s *Strings) SampleConfig() string {
return sampleConfig
}
func (s *Strings) Description() string {
return "Perform string processing on tags, fields, and measurements"
}
func (c *converter) convertTag(metric telegraf.Metric) {
tv, ok := metric.GetTag(c.Tag)
if !ok {
return
}
dest := c.Tag
if c.Dest != "" {
dest = c.Dest
}
metric.AddTag(dest, c.fn(tv))
}
func (c *converter) convertField(metric telegraf.Metric) {
fv, ok := metric.GetField(c.Field)
if !ok {
return
}
dest := c.Field
if c.Dest != "" {
dest = c.Dest
}
if fv, ok := fv.(string); ok {
metric.AddField(dest, c.fn(fv))
}
}
func (c *converter) convertMeasurement(metric telegraf.Metric) {
if metric.Name() != c.Measurement {
return
}
metric.SetName(c.fn(metric.Name()))
}
func (c *converter) convert(metric telegraf.Metric) {
if c.Field != "" {
c.convertField(metric)
}
if c.Tag != "" {
c.convertTag(metric)
}
if c.Measurement != "" {
c.convertMeasurement(metric)
}
}
func (s *Strings) initOnce() {
if s.init {
return
}
s.converters = make([]converter, 0)
for _, c := range s.Lowercase {
c.fn = strings.ToLower
s.converters = append(s.converters, c)
}
for _, c := range s.Uppercase {
c.fn = strings.ToUpper
s.converters = append(s.converters, c)
}
for _, c := range s.Trim {
if c.Cutset != "" {
c.fn = func(s string) string { return strings.Trim(s, c.Cutset) }
} else {
c.fn = func(s string) string { return strings.TrimFunc(s, unicode.IsSpace) }
}
s.converters = append(s.converters, c)
}
for _, c := range s.TrimLeft {
if c.Cutset != "" {
c.fn = func(s string) string { return strings.TrimLeft(s, c.Cutset) }
} else {
c.fn = func(s string) string { return strings.TrimLeftFunc(s, unicode.IsSpace) }
}
s.converters = append(s.converters, c)
}
for _, c := range s.TrimRight {
if c.Cutset != "" {
c.fn = func(s string) string { return strings.TrimRight(s, c.Cutset) }
} else {
c.fn = func(s string) string { return strings.TrimRightFunc(s, unicode.IsSpace) }
}
s.converters = append(s.converters, c)
}
for _, c := range s.TrimPrefix {
c.fn = func(s string) string { return strings.TrimPrefix(s, c.Prefix) }
s.converters = append(s.converters, c)
}
for _, c := range s.TrimSuffix {
c.fn = func(s string) string { return strings.TrimSuffix(s, c.Suffix) }
s.converters = append(s.converters, c)
}
s.init = true
}
func (s *Strings) Apply(in ...telegraf.Metric) []telegraf.Metric {
s.initOnce()
for _, metric := range in {
for _, converter := range s.converters {
converter.convert(metric)
}
}
return in
}
func init() {
processors.Add("strings", func() telegraf.Processor {
return &Strings{}
})
}

View File

@ -0,0 +1,483 @@
package strings
import (
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func newM1() telegraf.Metric {
m1, _ := metric.New("IIS_log",
map[string]string{
"verb": "GET",
"s-computername": "MIXEDCASE_hostname",
},
map[string]interface{}{
"request": "/mixed/CASE/paTH/?from=-1D&to=now",
"whitespace": " whitespace\t",
},
time.Now(),
)
return m1
}
func newM2() telegraf.Metric {
m2, _ := metric.New("IIS_log",
map[string]string{
"verb": "GET",
"resp_code": "200",
"s-computername": "MIXEDCASE_hostname",
},
map[string]interface{}{
"request": "/mixed/CASE/paTH/?from=-1D&to=now",
"cs-host": "AAAbbb",
"ignore_number": int64(200),
"ignore_bool": true,
},
time.Now(),
)
return m2
}
func TestFieldConversions(t *testing.T) {
tests := []struct {
name string
plugin *Strings
check func(t *testing.T, actual telegraf.Metric)
}{
{
name: "Should change existing field to lowercase",
plugin: &Strings{
Lowercase: []converter{
converter{
Field: "request",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField("request")
require.True(t, ok)
require.Equal(t, "/mixed/case/path/?from=-1d&to=now", fv)
},
},
{
name: "Should change existing field to uppercase",
plugin: &Strings{
Uppercase: []converter{
converter{
Field: "request",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField("request")
require.True(t, ok)
require.Equal(t, "/MIXED/CASE/PATH/?FROM=-1D&TO=NOW", fv)
},
},
{
name: "Should add new lowercase field",
plugin: &Strings{
Lowercase: []converter{
converter{
Field: "request",
Dest: "lowercase_request",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField("request")
require.True(t, ok)
require.Equal(t, "/mixed/CASE/paTH/?from=-1D&to=now", fv)
fv, ok = actual.GetField("lowercase_request")
require.True(t, ok)
require.Equal(t, "/mixed/case/path/?from=-1d&to=now", fv)
},
},
{
name: "Should trim from both sides",
plugin: &Strings{
Trim: []converter{
converter{
Field: "request",
Cutset: "/w",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField("request")
require.True(t, ok)
require.Equal(t, "mixed/CASE/paTH/?from=-1D&to=no", fv)
},
},
{
name: "Should trim from both sides and make lowercase",
plugin: &Strings{
Trim: []converter{
converter{
Field: "request",
Cutset: "/w",
},
},
Lowercase: []converter{
converter{
Field: "request",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField("request")
require.True(t, ok)
require.Equal(t, "mixed/case/path/?from=-1d&to=no", fv)
},
},
{
name: "Should trim from left side",
plugin: &Strings{
TrimLeft: []converter{
converter{
Field: "request",
Cutset: "/w",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField("request")
require.True(t, ok)
require.Equal(t, "mixed/CASE/paTH/?from=-1D&to=now", fv)
},
},
{
name: "Should trim from right side",
plugin: &Strings{
TrimRight: []converter{
converter{
Field: "request",
Cutset: "/w",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField("request")
require.True(t, ok)
require.Equal(t, "/mixed/CASE/paTH/?from=-1D&to=no", fv)
},
},
{
name: "Should trim prefix '/mixed'",
plugin: &Strings{
TrimPrefix: []converter{
converter{
Field: "request",
Prefix: "/mixed",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField("request")
require.True(t, ok)
require.Equal(t, "/CASE/paTH/?from=-1D&to=now", fv)
},
},
{
name: "Should trim suffix '-1D&to=now'",
plugin: &Strings{
TrimSuffix: []converter{
converter{
Field: "request",
Suffix: "-1D&to=now",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField("request")
require.True(t, ok)
require.Equal(t, "/mixed/CASE/paTH/?from=", fv)
},
},
{
name: "Trim without cutset removes whitespace",
plugin: &Strings{
Trim: []converter{
converter{
Field: "whitespace",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField("whitespace")
require.True(t, ok)
require.Equal(t, "whitespace", fv)
},
},
{
name: "Trim left without cutset removes whitespace",
plugin: &Strings{
TrimLeft: []converter{
converter{
Field: "whitespace",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField("whitespace")
require.True(t, ok)
require.Equal(t, "whitespace\t", fv)
},
},
{
name: "Trim right without cutset removes whitespace",
plugin: &Strings{
TrimRight: []converter{
converter{
Field: "whitespace",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField("whitespace")
require.True(t, ok)
require.Equal(t, " whitespace", fv)
},
},
{
name: "No change if field missing",
plugin: &Strings{
Lowercase: []converter{
converter{
Field: "xyzzy",
Suffix: "-1D&to=now",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField("request")
require.True(t, ok)
require.Equal(t, "/mixed/CASE/paTH/?from=-1D&to=now", fv)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
metrics := tt.plugin.Apply(newM1())
require.Len(t, metrics, 1)
tt.check(t, metrics[0])
})
}
}
func TestTagConversions(t *testing.T) {
tests := []struct {
name string
plugin *Strings
check func(t *testing.T, actual telegraf.Metric)
}{
{
name: "Should change existing tag to lowercase",
plugin: &Strings{
Lowercase: []converter{
converter{
Tag: "s-computername",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
tv, ok := actual.GetTag("verb")
require.True(t, ok)
require.Equal(t, "GET", tv)
tv, ok = actual.GetTag("s-computername")
require.True(t, ok)
require.Equal(t, "mixedcase_hostname", tv)
},
},
{
name: "Should add new lowercase tag",
plugin: &Strings{
Lowercase: []converter{
converter{
Tag: "s-computername",
Dest: "s-computername_lowercase",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
tv, ok := actual.GetTag("verb")
require.True(t, ok)
require.Equal(t, "GET", tv)
tv, ok = actual.GetTag("s-computername")
require.True(t, ok)
require.Equal(t, "MIXEDCASE_hostname", tv)
tv, ok = actual.GetTag("s-computername_lowercase")
require.True(t, ok)
require.Equal(t, "mixedcase_hostname", tv)
},
},
{
name: "Should add new uppercase tag",
plugin: &Strings{
Uppercase: []converter{
converter{
Tag: "s-computername",
Dest: "s-computername_uppercase",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
tv, ok := actual.GetTag("verb")
require.True(t, ok)
require.Equal(t, "GET", tv)
tv, ok = actual.GetTag("s-computername")
require.True(t, ok)
require.Equal(t, "MIXEDCASE_hostname", tv)
tv, ok = actual.GetTag("s-computername_uppercase")
require.True(t, ok)
require.Equal(t, "MIXEDCASE_HOSTNAME", tv)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
metrics := tt.plugin.Apply(newM1())
require.Len(t, metrics, 1)
tt.check(t, metrics[0])
})
}
}
func TestMeasurementConversions(t *testing.T) {
tests := []struct {
name string
plugin *Strings
check func(t *testing.T, actual telegraf.Metric)
}{
{
name: "lowercase measurement",
plugin: &Strings{
Lowercase: []converter{
converter{
Measurement: "IIS_log",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
name := actual.Name()
require.Equal(t, "iis_log", name)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
metrics := tt.plugin.Apply(newM1())
require.Len(t, metrics, 1)
tt.check(t, metrics[0])
})
}
}
func TestMultipleConversions(t *testing.T) {
plugin := &Strings{
Lowercase: []converter{
converter{
Tag: "s-computername",
},
converter{
Field: "request",
},
converter{
Field: "cs-host",
Dest: "cs-host_lowercase",
},
},
Uppercase: []converter{
converter{
Tag: "verb",
},
},
}
processed := plugin.Apply(newM2())
expectedFields := map[string]interface{}{
"request": "/mixed/case/path/?from=-1d&to=now",
"ignore_number": int64(200),
"ignore_bool": true,
"cs-host": "AAAbbb",
"cs-host_lowercase": "aaabbb",
}
expectedTags := map[string]string{
"verb": "GET",
"resp_code": "200",
"s-computername": "mixedcase_hostname",
}
assert.Equal(t, expectedFields, processed[0].Fields())
assert.Equal(t, expectedTags, processed[0].Tags())
}
func TestReadmeExample(t *testing.T) {
plugin := &Strings{
Lowercase: []converter{
converter{
Tag: "uri_stem",
},
},
TrimPrefix: []converter{
converter{
Tag: "uri_stem",
Prefix: "/api/",
},
},
Uppercase: []converter{
converter{
Field: "cs-host",
Dest: "cs-host_normalised",
},
},
}
m, _ := metric.New("iis_log",
map[string]string{
"verb": "get",
"uri_stem": "/API/HealthCheck",
},
map[string]interface{}{
"cs-host": "MIXEDCASE_host",
"referrer": "-",
"ident": "-",
"http_version": "1.1",
"agent": "UserAgent",
"resp_bytes": int64(270),
},
time.Now(),
)
processed := plugin.Apply(m)
expectedTags := map[string]string{
"verb": "get",
"uri_stem": "healthcheck",
}
expectedFields := map[string]interface{}{
"cs-host": "MIXEDCASE_host",
"cs-host_normalised": "MIXEDCASE_HOST",
"referrer": "-",
"ident": "-",
"http_version": "1.1",
"agent": "UserAgent",
"resp_bytes": int64(270),
}
assert.Equal(t, expectedFields, processed[0].Fields())
assert.Equal(t, expectedTags, processed[0].Tags())
}