Add support for field/tag keys to strings processor (#6129)

This commit is contained in:
Greg 2019-07-19 14:16:54 -06:00 committed by Daniel Nelson
parent 77b1a43539
commit bdb4598b3f
3 changed files with 411 additions and 10 deletions

View File

@ -14,41 +14,50 @@ Implemented functions are:
Please note that in this implementation these are processed in the order that they appear above. Please note that in this implementation these are processed in the order that they appear above.
Specify the `measurement`, `tag` or `field` that you want processed in each section and optionally a `dest` if you want the result stored in a new tag or field. You can specify lots of transformations on data with a single strings processor. Specify the `measurement`, `tag`, `tag_key`, `field`, or `field_key` that you want processed in each section and optionally a `dest` if you want the result stored in a new tag or field. You can specify lots of transformations on data with a single strings processor.
If you'd like to apply the change to every `tag`, `field`, or `measurement`, use the value "*" for each respective field. Note that the `dest` field will be ignored if "*" is used If you'd like to apply the change to every `tag`, `tag_key`, `field`, `field_key`, or `measurement`, use the value `"*"` for each respective field. Note that the `dest` field will be ignored if `"*"` is used.
If you'd like to apply multiple processings to the same `tag_key` or `field_key`, note the process order stated above. See [Example 2]() for an example.
### Configuration: ### Configuration:
```toml ```toml
[[processors.strings]] [[processors.strings]]
# [[processors.strings.uppercase]] ## Convert a field value to lowercase and store in a new field
# tag = "method"
# [[processors.strings.lowercase]] # [[processors.strings.lowercase]]
# field = "uri_stem" # field = "uri_stem"
# dest = "uri_stem_normalised" # dest = "uri_stem_normalised"
## Convert a tag value to lowercase ## Convert a tag value to uppercase
# [[processors.strings.uppercase]]
# tag = "method"
## Trim leading and trailing whitespace using the default cutset
# [[processors.strings.trim]] # [[processors.strings.trim]]
# field = "message" # field = "message"
## Trim leading characters in cutset
# [[processors.strings.trim_left]] # [[processors.strings.trim_left]]
# field = "message" # field = "message"
# cutset = "\t" # cutset = "\t"
## Trim trailing characters in cutset
# [[processors.strings.trim_right]] # [[processors.strings.trim_right]]
# field = "message" # field = "message"
# cutset = "\r\n" # cutset = "\r\n"
## Trim the given prefix from the field
# [[processors.strings.trim_prefix]] # [[processors.strings.trim_prefix]]
# field = "my_value" # field = "my_value"
# prefix = "my_" # prefix = "my_"
## Trim the given suffix from the field
# [[processors.strings.trim_suffix]] # [[processors.strings.trim_suffix]]
# field = "read_count" # field = "read_count"
# suffix = "_count" # suffix = "_count"
## Replace all non-overlapping instances of old with new
# [[processors.strings.replace]] # [[processors.strings.replace]]
# measurement = "*" # measurement = "*"
# old = ":" # old = ":"
@ -79,10 +88,10 @@ the operation and keep the old name.
```toml ```toml
[[processors.strings]] [[processors.strings]]
[[processors.strings.lowercase]] [[processors.strings.lowercase]]
field = "uri-stem" tag = "uri_stem"
[[processors.strings.trim_prefix]] [[processors.strings.trim_prefix]]
field = "uri_stem" tag = "uri_stem"
prefix = "/api/" prefix = "/api/"
[[processors.strings.uppercase]] [[processors.strings.uppercase]]
@ -92,10 +101,33 @@ the operation and keep the old name.
**Input** **Input**
``` ```
iis_log,method=get,uri_stem=/API/HealthCheck cs-host="MIXEDCASE_host",referrer="-",ident="-",http_version=1.1,agent="UserAgent",resp_bytes=270i 1519652321000000000 iis_log,method=get,uri_stem=/API/HealthCheck cs-host="MIXEDCASE_host",http_version=1.1 1519652321000000000
``` ```
**Output** **Output**
``` ```
iis_log,method=get,uri_stem=healthcheck cs-host="MIXEDCASE_host",cs-host_normalised="MIXEDCASE_HOST",referrer="-",ident="-",http_version=1.1,agent="UserAgent",resp_bytes=270i 1519652321000000000 iis_log,method=get,uri_stem=healthcheck cs-host="MIXEDCASE_host",http_version=1.1,cs-host_normalised="MIXEDCASE_HOST" 1519652321000000000
```
### Example 2
**Config**
```toml
[[processors.strings]]
[[processors.strings.lowercase]]
tag_key = "URI-Stem"
[[processors.strings.replace]]
tag_key = "uri-stem"
old = "-"
new = "_"
```
**Input**
```
iis_log,URI-Stem=/API/HealthCheck http_version=1.1 1519652321000000000
```
**Output**
```
iis_log,uri_stem=/API/HealthCheck http_version=1.1 1519652321000000000
``` ```

View File

@ -26,7 +26,9 @@ type ConvertFunc func(s string) string
type converter struct { type converter struct {
Field string Field string
FieldKey string
Tag string Tag string
TagKey string
Measurement string Measurement string
Dest string Dest string
Cutset string Cutset string
@ -109,6 +111,27 @@ func (c *converter) convertTag(metric telegraf.Metric) {
} }
} }
func (c *converter) convertTagKey(metric telegraf.Metric) {
var tags map[string]string
if c.TagKey == "*" {
tags = metric.Tags()
} else {
tags = make(map[string]string)
tv, ok := metric.GetTag(c.TagKey)
if !ok {
return
}
tags[c.TagKey] = tv
}
for key, value := range tags {
if k := c.fn(key); k != "" {
metric.RemoveTag(key)
metric.AddTag(k, value)
}
}
}
func (c *converter) convertField(metric telegraf.Metric) { func (c *converter) convertField(metric telegraf.Metric) {
var fields map[string]interface{} var fields map[string]interface{}
if c.Field == "*" { if c.Field == "*" {
@ -133,6 +156,27 @@ func (c *converter) convertField(metric telegraf.Metric) {
} }
} }
func (c *converter) convertFieldKey(metric telegraf.Metric) {
var fields map[string]interface{}
if c.FieldKey == "*" {
fields = metric.Fields()
} else {
fields = make(map[string]interface{})
fv, ok := metric.GetField(c.FieldKey)
if !ok {
return
}
fields[c.FieldKey] = fv
}
for key, value := range fields {
if k := c.fn(key); k != "" {
metric.RemoveField(key)
metric.AddField(k, value)
}
}
}
func (c *converter) convertMeasurement(metric telegraf.Metric) { func (c *converter) convertMeasurement(metric telegraf.Metric) {
if metric.Name() != c.Measurement && c.Measurement != "*" { if metric.Name() != c.Measurement && c.Measurement != "*" {
return return
@ -146,10 +190,18 @@ func (c *converter) convert(metric telegraf.Metric) {
c.convertField(metric) c.convertField(metric)
} }
if c.FieldKey != "" {
c.convertFieldKey(metric)
}
if c.Tag != "" { if c.Tag != "" {
c.convertTag(metric) c.convertTag(metric)
} }
if c.TagKey != "" {
c.convertTagKey(metric)
}
if c.Measurement != "" { if c.Measurement != "" {
c.convertMeasurement(metric) c.convertMeasurement(metric)
} }

View File

@ -25,6 +25,22 @@ func newM1() telegraf.Metric {
return m1 return m1
} }
func newM2() telegraf.Metric {
m1, _ := metric.New("IIS_log",
map[string]string{
"verb": "GET",
"S-ComputerName": "MIXEDCASE_hostname",
},
map[string]interface{}{
"Request": "/mixed/CASE/paTH/?from=-1D&to=now",
"req/sec": 5,
" whitespace ": " whitespace\t",
},
time.Now(),
)
return m1
}
func TestFieldConversions(t *testing.T) { func TestFieldConversions(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
@ -253,6 +269,226 @@ func TestFieldConversions(t *testing.T) {
} }
} }
func TestFieldKeyConversions(t *testing.T) {
tests := []struct {
name string
plugin *Strings
check func(t *testing.T, actual telegraf.Metric)
}{
{
name: "Should change existing field key to lowercase",
plugin: &Strings{
Lowercase: []converter{
{
FieldKey: "Request",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField("request")
require.True(t, ok)
require.Equal(t, "/mixed/CASE/paTH/?from=-1D&to=now", fv)
},
},
{
name: "Should change existing field key to uppercase",
plugin: &Strings{
Uppercase: []converter{
{
FieldKey: "Request",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField("Request")
require.False(t, ok)
fv, ok = actual.GetField("REQUEST")
require.True(t, ok)
require.Equal(t, "/mixed/CASE/paTH/?from=-1D&to=now", fv)
},
},
{
name: "Should trim from both sides",
plugin: &Strings{
Trim: []converter{
{
FieldKey: "Request",
Cutset: "eR",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField("quest")
require.True(t, ok)
require.Equal(t, "/mixed/CASE/paTH/?from=-1D&to=now", fv)
},
},
{
name: "Should trim from both sides but not make lowercase",
plugin: &Strings{
// Tag/field key multiple executions occur in the following order: (initOnce)
// Lowercase
// Uppercase
// Trim
// TrimLeft
// TrimRight
// TrimPrefix
// TrimSuffix
// Replace
Lowercase: []converter{
{
FieldKey: "Request",
},
},
Trim: []converter{
{
FieldKey: "request",
Cutset: "tse",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField("requ")
require.True(t, ok)
require.Equal(t, "/mixed/CASE/paTH/?from=-1D&to=now", fv)
},
},
{
name: "Should trim from left side",
plugin: &Strings{
TrimLeft: []converter{
{
FieldKey: "req/sec",
Cutset: "req/",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField("sec")
require.True(t, ok)
require.Equal(t, int64(5), fv)
},
},
{
name: "Should trim from right side",
plugin: &Strings{
TrimRight: []converter{
{
FieldKey: "req/sec",
Cutset: "req/",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField("req/sec")
require.True(t, ok)
require.Equal(t, int64(5), fv)
},
},
{
name: "Should trim prefix 'req/'",
plugin: &Strings{
TrimPrefix: []converter{
{
FieldKey: "req/sec",
Prefix: "req/",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField("sec")
require.True(t, ok)
require.Equal(t, int64(5), fv)
},
},
{
name: "Should trim suffix '/sec'",
plugin: &Strings{
TrimSuffix: []converter{
{
FieldKey: "req/sec",
Suffix: "/sec",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField("req")
require.True(t, ok)
require.Equal(t, int64(5), fv)
},
},
{
name: "Trim without cutset removes whitespace",
plugin: &Strings{
Trim: []converter{
{
FieldKey: " whitespace ",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField("whitespace")
require.True(t, ok)
require.Equal(t, " whitespace\t", fv)
},
},
{
name: "Trim left without cutset removes whitespace",
plugin: &Strings{
TrimLeft: []converter{
{
FieldKey: " whitespace ",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField("whitespace ")
require.True(t, ok)
require.Equal(t, " whitespace\t", fv)
},
},
{
name: "Trim right without cutset removes whitespace",
plugin: &Strings{
TrimRight: []converter{
{
FieldKey: " whitespace ",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField(" whitespace")
require.True(t, ok)
require.Equal(t, " whitespace\t", fv)
},
},
{
name: "No change if field missing",
plugin: &Strings{
Lowercase: []converter{
{
FieldKey: "xyzzy",
Suffix: "-1D&to=now",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
fv, ok := actual.GetField("Request")
require.True(t, ok)
require.Equal(t, "/mixed/CASE/paTH/?from=-1D&to=now", fv)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
metrics := tt.plugin.Apply(newM2())
require.Len(t, metrics, 1)
tt.check(t, metrics[0])
})
}
}
func TestTagConversions(t *testing.T) { func TestTagConversions(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
@ -337,6 +573,87 @@ func TestTagConversions(t *testing.T) {
} }
} }
func TestTagKeyConversions(t *testing.T) {
tests := []struct {
name string
plugin *Strings
check func(t *testing.T, actual telegraf.Metric)
}{
{
name: "Should change existing tag key to lowercase",
plugin: &Strings{
Lowercase: []converter{
{
Tag: "S-ComputerName",
TagKey: "S-ComputerName",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
tv, ok := actual.GetTag("verb")
require.True(t, ok)
require.Equal(t, "GET", tv)
tv, ok = actual.GetTag("s-computername")
require.True(t, ok)
require.Equal(t, "mixedcase_hostname", tv)
},
},
{
name: "Should add new lowercase tag key",
plugin: &Strings{
Lowercase: []converter{
{
TagKey: "S-ComputerName",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
tv, ok := actual.GetTag("verb")
require.True(t, ok)
require.Equal(t, "GET", tv)
tv, ok = actual.GetTag("S-ComputerName")
require.False(t, ok)
tv, ok = actual.GetTag("s-computername")
require.True(t, ok)
require.Equal(t, "MIXEDCASE_hostname", tv)
},
},
{
name: "Should add new uppercase tag key",
plugin: &Strings{
Uppercase: []converter{
{
TagKey: "S-ComputerName",
},
},
},
check: func(t *testing.T, actual telegraf.Metric) {
tv, ok := actual.GetTag("verb")
require.True(t, ok)
require.Equal(t, "GET", tv)
tv, ok = actual.GetTag("S-ComputerName")
require.False(t, ok)
tv, ok = actual.GetTag("S-COMPUTERNAME")
require.True(t, ok)
require.Equal(t, "MIXEDCASE_hostname", tv)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
metrics := tt.plugin.Apply(newM2())
require.Len(t, metrics, 1)
tt.check(t, metrics[0])
})
}
}
func TestMeasurementConversions(t *testing.T) { func TestMeasurementConversions(t *testing.T) {
tests := []struct { tests := []struct {
name string name string