Add replace function to strings processor (#4686)

This commit is contained in:
Bo Zhao 2018-09-26 19:30:02 -07:00 committed by Daniel Nelson
parent 0a8301ec3c
commit 8cbd39501b
3 changed files with 122 additions and 19 deletions

View File

@ -10,11 +10,14 @@ Implemented functions are:
- trim_right - trim_right
- trim_prefix - trim_prefix
- trim_suffix - trim_suffix
- replace
Please note that in this implementation these are processed in the order that they appear above. Please note that in this implementation these are processed in the order that they appear above.
Specify the `measurement`, `tag` or `field` that you want processed in each section and optionally a `dest` if you want the result stored in a new tag or field. You can specify lots of transformations on data with a single strings processor. Specify the `measurement`, `tag` or `field` that you want processed in each section and optionally a `dest` if you want the result stored in a new tag or field. You can specify lots of transformations on data with a single strings processor.
If you'd like to apply the change to every `tag`, `field`, or `measurement`, use the value "*" for each respective field. Note that the `dest` field will be ignored if "*" is used
### Configuration: ### Configuration:
```toml ```toml
@ -45,6 +48,11 @@ Specify the `measurement`, `tag` or `field` that you want processed in each sect
# [[processors.strings.trim_suffix]] # [[processors.strings.trim_suffix]]
# field = "read_count" # field = "read_count"
# suffix = "_count" # suffix = "_count"
# [[processors.strings.replace]]
# measurement = "*"
# old = ":"
# new = "_"
``` ```
#### Trim, TrimLeft, TrimRight #### Trim, TrimLeft, TrimRight
@ -56,6 +64,16 @@ The `trim`, `trim_left`, and `trim_right` functions take an optional parameter:
The `trim_prefix` and `trim_suffix` functions remote the given `prefix` or `suffix` The `trim_prefix` and `trim_suffix` functions remote the given `prefix` or `suffix`
respectively from the string. respectively from the string.
#### Replace
The `replace` function does a substring replacement across the entire
string to allow for different conventions between various input and output
plugins. Some example usages are eliminating disallowed characters in
field names or replacing separators between different separators.
Can also be used to eliminate unneeded chars that were in metrics.
If the entire name would be deleted, it will refuse to perform
the operation and keep the old name.
### Example ### Example
**Config** **Config**
```toml ```toml

View File

@ -16,6 +16,7 @@ type Strings struct {
TrimRight []converter `toml:"trim_right"` TrimRight []converter `toml:"trim_right"`
TrimPrefix []converter `toml:"trim_prefix"` TrimPrefix []converter `toml:"trim_prefix"`
TrimSuffix []converter `toml:"trim_suffix"` TrimSuffix []converter `toml:"trim_suffix"`
Replace []converter `toml:"replace"`
converters []converter converters []converter
init bool init bool
@ -31,6 +32,8 @@ type converter struct {
Cutset string Cutset string
Suffix string Suffix string
Prefix string Prefix string
Old string
New string
fn ConvertFunc fn ConvertFunc
} }
@ -68,6 +71,12 @@ const sampleConfig = `
# [[processors.strings.trim_suffix]] # [[processors.strings.trim_suffix]]
# field = "read_count" # field = "read_count"
# suffix = "_count" # suffix = "_count"
## Replace substrings within field names
# [[processors.strings.trim_suffix]]
# measurement = "*"
# old = ":"
# new = "_"
` `
func (s *Strings) SampleConfig() string { func (s *Strings) SampleConfig() string {
@ -79,37 +88,53 @@ func (s *Strings) Description() string {
} }
func (c *converter) convertTag(metric telegraf.Metric) { func (c *converter) convertTag(metric telegraf.Metric) {
tv, ok := metric.GetTag(c.Tag) var tags map[string]string
if !ok { if c.Tag == "*" {
return tags = metric.Tags()
} else {
tags = make(map[string]string)
tv, ok := metric.GetTag(c.Tag)
if !ok {
return
}
tags[c.Tag] = tv
} }
dest := c.Tag for tag, value := range tags {
if c.Dest != "" { dest := tag
dest = c.Dest if c.Tag != "*" && c.Dest != "" {
dest = c.Dest
}
metric.AddTag(dest, c.fn(value))
} }
metric.AddTag(dest, c.fn(tv))
} }
func (c *converter) convertField(metric telegraf.Metric) { func (c *converter) convertField(metric telegraf.Metric) {
fv, ok := metric.GetField(c.Field) var fields map[string]interface{}
if !ok { if c.Field == "*" {
return fields = metric.Fields()
} else {
fields = make(map[string]interface{})
fv, ok := metric.GetField(c.Field)
if !ok {
return
}
fields[c.Field] = fv
} }
dest := c.Field for tag, value := range fields {
if c.Dest != "" { dest := tag
dest = c.Dest if c.Tag != "*" && c.Dest != "" {
} dest = c.Dest
}
if fv, ok := fv.(string); ok { if fv, ok := value.(string); ok {
metric.AddField(dest, c.fn(fv)) metric.AddField(dest, c.fn(fv))
}
} }
} }
func (c *converter) convertMeasurement(metric telegraf.Metric) { func (c *converter) convertMeasurement(metric telegraf.Metric) {
if metric.Name() != c.Measurement { if metric.Name() != c.Measurement && c.Measurement != "*" {
return return
} }
@ -176,6 +201,17 @@ func (s *Strings) initOnce() {
c.fn = func(s string) string { return strings.TrimSuffix(s, c.Suffix) } c.fn = func(s string) string { return strings.TrimSuffix(s, c.Suffix) }
s.converters = append(s.converters, c) s.converters = append(s.converters, c)
} }
for _, c := range s.Replace {
c.fn = func(s string) string {
newString := strings.Replace(s, c.Old, c.New, -1)
if newString == "" {
return s
} else {
return newString
}
}
s.converters = append(s.converters, c)
}
s.init = true s.init = true
} }

View File

@ -481,3 +481,52 @@ func TestReadmeExample(t *testing.T) {
assert.Equal(t, expectedFields, processed[0].Fields()) assert.Equal(t, expectedFields, processed[0].Fields())
assert.Equal(t, expectedTags, processed[0].Tags()) assert.Equal(t, expectedTags, processed[0].Tags())
} }
func newMetric(name string) telegraf.Metric {
tags := map[string]string{}
fields := map[string]interface{}{}
m, _ := metric.New(name, tags, fields, time.Now())
return m
}
func TestMeasurementReplace(t *testing.T) {
plugin := &Strings{
Replace: []converter{
converter{
Old: "_",
New: "-",
Measurement: "*",
},
},
}
metrics := []telegraf.Metric{
newMetric("foo:some_value:bar"),
newMetric("average:cpu:usage"),
newMetric("average_cpu_usage"),
}
results := plugin.Apply(metrics...)
assert.Equal(t, "foo:some-value:bar", results[0].Name(), "`_` was not changed to `-`")
assert.Equal(t, "average:cpu:usage", results[1].Name(), "Input name should have been unchanged")
assert.Equal(t, "average-cpu-usage", results[2].Name(), "All instances of `_` should have been changed to `-`")
}
func TestMeasurementCharDeletion(t *testing.T) {
plugin := &Strings{
Replace: []converter{
converter{
Old: "foo",
New: "",
Measurement: "*",
},
},
}
metrics := []telegraf.Metric{
newMetric("foo:bar:baz"),
newMetric("foofoofoo"),
newMetric("barbarbar"),
}
results := plugin.Apply(metrics...)
assert.Equal(t, ":bar:baz", results[0].Name(), "Should have deleted the initial `foo`")
assert.Equal(t, "foofoofoo", results[1].Name(), "Should have refused to delete the whole string")
assert.Equal(t, "barbarbar", results[2].Name(), "Should not have changed the input")
}