Add regex processor plugin (#3839)
This commit is contained in:
parent
1148fd3563
commit
9294e82009
|
@ -3,5 +3,6 @@ package all
|
||||||
import (
|
import (
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/override"
|
_ "github.com/influxdata/telegraf/plugins/processors/override"
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/printer"
|
_ "github.com/influxdata/telegraf/plugins/processors/printer"
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/processors/regex"
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/topk"
|
_ "github.com/influxdata/telegraf/plugins/processors/topk"
|
||||||
)
|
)
|
||||||
|
|
|
@ -0,0 +1,46 @@
|
||||||
|
# Regex Processor Plugin
|
||||||
|
|
||||||
|
The `regex` plugin transforms tag and field values with regex pattern. If `result_key` parameter is present, it can produce new tags and fields from existing ones.
|
||||||
|
|
||||||
|
### Configuration:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[processors.regex]]
|
||||||
|
namepass = ["nginx_requests"]
|
||||||
|
|
||||||
|
# Tag and field conversions defined in a separate sub-tables
|
||||||
|
[[processors.regex.tags]]
|
||||||
|
## Tag to change
|
||||||
|
key = "resp_code"
|
||||||
|
## Regular expression to match on a tag value
|
||||||
|
pattern = "^(\\d)\\d\\d$"
|
||||||
|
## Pattern for constructing a new value (${1} represents first subgroup)
|
||||||
|
replacement = "${1}xx"
|
||||||
|
|
||||||
|
[[processors.regex.fields]]
|
||||||
|
key = "request"
|
||||||
|
## All the power of the Go regular expressions available here
|
||||||
|
## For example, named subgroups
|
||||||
|
pattern = "^/api(?P<method>/[\\w/]+)\\S*"
|
||||||
|
replacement = "${method}"
|
||||||
|
## If result_key is present, a new field will be created
|
||||||
|
## instead of changing existing field
|
||||||
|
result_key = "method"
|
||||||
|
|
||||||
|
# Multiple conversions may be applied for one field sequentially
|
||||||
|
# Let's extract one more value
|
||||||
|
[[processors.regex.fields]]
|
||||||
|
key = "request"
|
||||||
|
pattern = ".*category=(\\w+).*"
|
||||||
|
replacement = "${1}"
|
||||||
|
result_key = "search_category"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Tags:
|
||||||
|
|
||||||
|
No tags are applied by this processor.
|
||||||
|
|
||||||
|
### Example Output:
|
||||||
|
```
|
||||||
|
nginx_requests,verb=GET,resp_code=2xx request="/api/search/?category=plugins&q=regex&sort=asc",method="/search/",search_category="plugins",referrer="-",ident="-",http_version=1.1,agent="UserAgent",client_ip="127.0.0.1",auth="-",resp_bytes=270i 1519652321000000000
|
||||||
|
```
|
|
@ -0,0 +1,110 @@
|
||||||
|
package regex
|
||||||
|
|
||||||
|
import (
|
||||||
|
"regexp"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/plugins/processors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Regex struct {
|
||||||
|
Tags []converter
|
||||||
|
Fields []converter
|
||||||
|
regexCache map[string]*regexp.Regexp
|
||||||
|
}
|
||||||
|
|
||||||
|
type converter struct {
|
||||||
|
Key string
|
||||||
|
Pattern string
|
||||||
|
Replacement string
|
||||||
|
ResultKey string
|
||||||
|
}
|
||||||
|
|
||||||
|
const sampleConfig = `
|
||||||
|
## Tag and field conversions defined in a separate sub-tables
|
||||||
|
# [[processors.regex.tags]]
|
||||||
|
# ## Tag to change
|
||||||
|
# key = "resp_code"
|
||||||
|
# ## Regular expression to match on a tag value
|
||||||
|
# pattern = "^(\\d)\\d\\d$"
|
||||||
|
# ## Pattern for constructing a new value (${1} represents first subgroup)
|
||||||
|
# replacement = "${1}xx"
|
||||||
|
|
||||||
|
# [[processors.regex.fields]]
|
||||||
|
# key = "request"
|
||||||
|
# ## All the power of the Go regular expressions available here
|
||||||
|
# ## For example, named subgroups
|
||||||
|
# pattern = "^/api(?P<method>/[\\w/]+)\\S*"
|
||||||
|
# replacement = "${method}"
|
||||||
|
# ## If result_key is present, a new field will be created
|
||||||
|
# ## instead of changing existing field
|
||||||
|
# result_key = "method"
|
||||||
|
|
||||||
|
## Multiple conversions may be applied for one field sequentially
|
||||||
|
## Let's extract one more value
|
||||||
|
# [[processors.regex.fields]]
|
||||||
|
# key = "request"
|
||||||
|
# pattern = ".*category=(\\w+).*"
|
||||||
|
# replacement = "${1}"
|
||||||
|
# result_key = "search_category"
|
||||||
|
`
|
||||||
|
|
||||||
|
func NewRegex() *Regex {
|
||||||
|
return &Regex{
|
||||||
|
regexCache: make(map[string]*regexp.Regexp),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Regex) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Regex) Description() string {
|
||||||
|
return "Transforms tag and field values with regex pattern"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Regex) Apply(in ...telegraf.Metric) []telegraf.Metric {
|
||||||
|
for _, metric := range in {
|
||||||
|
for _, converter := range r.Tags {
|
||||||
|
if value, ok := metric.GetTag(converter.Key); ok {
|
||||||
|
metric.AddTag(r.convert(converter, value))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, converter := range r.Fields {
|
||||||
|
if value, ok := metric.GetField(converter.Key); ok {
|
||||||
|
switch value := value.(type) {
|
||||||
|
case string:
|
||||||
|
metric.AddField(r.convert(converter, value))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return in
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Regex) convert(c converter, src string) (string, string) {
|
||||||
|
regex, compiled := r.regexCache[c.Pattern]
|
||||||
|
if !compiled {
|
||||||
|
regex = regexp.MustCompile(c.Pattern)
|
||||||
|
r.regexCache[c.Pattern] = regex
|
||||||
|
}
|
||||||
|
|
||||||
|
value := ""
|
||||||
|
if c.ResultKey == "" || regex.MatchString(src) {
|
||||||
|
value = regex.ReplaceAllString(src, c.Replacement)
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.ResultKey != "" {
|
||||||
|
return c.ResultKey, value
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.Key, value
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
processors.Add("regex", func() telegraf.Processor {
|
||||||
|
return NewRegex()
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,273 @@
|
||||||
|
package regex
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newM1() telegraf.Metric {
|
||||||
|
m1, _ := metric.New("access_log",
|
||||||
|
map[string]string{
|
||||||
|
"verb": "GET",
|
||||||
|
"resp_code": "200",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"request": "/users/42/",
|
||||||
|
},
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
return m1
|
||||||
|
}
|
||||||
|
|
||||||
|
func newM2() telegraf.Metric {
|
||||||
|
m2, _ := metric.New("access_log",
|
||||||
|
map[string]string{
|
||||||
|
"verb": "GET",
|
||||||
|
"resp_code": "200",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"request": "/api/search/?category=plugins&q=regex&sort=asc",
|
||||||
|
"ignore_number": int64(200),
|
||||||
|
"ignore_bool": true,
|
||||||
|
},
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
return m2
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFieldConversions(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
message string
|
||||||
|
converter converter
|
||||||
|
expectedFields map[string]interface{}
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
message: "Should change existing field",
|
||||||
|
converter: converter{
|
||||||
|
Key: "request",
|
||||||
|
Pattern: "^/users/\\d+/$",
|
||||||
|
Replacement: "/users/{id}/",
|
||||||
|
},
|
||||||
|
expectedFields: map[string]interface{}{
|
||||||
|
"request": "/users/{id}/",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
message: "Should add new field",
|
||||||
|
converter: converter{
|
||||||
|
Key: "request",
|
||||||
|
Pattern: "^/users/\\d+/$",
|
||||||
|
Replacement: "/users/{id}/",
|
||||||
|
ResultKey: "normalized_request",
|
||||||
|
},
|
||||||
|
expectedFields: map[string]interface{}{
|
||||||
|
"request": "/users/42/",
|
||||||
|
"normalized_request": "/users/{id}/",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
regex := NewRegex()
|
||||||
|
regex.Fields = []converter{
|
||||||
|
test.converter,
|
||||||
|
}
|
||||||
|
|
||||||
|
processed := regex.Apply(newM1())
|
||||||
|
|
||||||
|
expectedTags := map[string]string{
|
||||||
|
"verb": "GET",
|
||||||
|
"resp_code": "200",
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, test.expectedFields, processed[0].Fields(), test.message)
|
||||||
|
assert.Equal(t, expectedTags, processed[0].Tags(), "Should not change tags")
|
||||||
|
assert.Equal(t, "access_log", processed[0].Name(), "Should not change name")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTagConversions(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
message string
|
||||||
|
converter converter
|
||||||
|
expectedTags map[string]string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
message: "Should change existing tag",
|
||||||
|
converter: converter{
|
||||||
|
Key: "resp_code",
|
||||||
|
Pattern: "^(\\d)\\d\\d$",
|
||||||
|
Replacement: "${1}xx",
|
||||||
|
},
|
||||||
|
expectedTags: map[string]string{
|
||||||
|
"verb": "GET",
|
||||||
|
"resp_code": "2xx",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
message: "Should add new tag",
|
||||||
|
converter: converter{
|
||||||
|
Key: "resp_code",
|
||||||
|
Pattern: "^(\\d)\\d\\d$",
|
||||||
|
Replacement: "${1}xx",
|
||||||
|
ResultKey: "resp_code_group",
|
||||||
|
},
|
||||||
|
expectedTags: map[string]string{
|
||||||
|
"verb": "GET",
|
||||||
|
"resp_code": "200",
|
||||||
|
"resp_code_group": "2xx",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
regex := NewRegex()
|
||||||
|
regex.Tags = []converter{
|
||||||
|
test.converter,
|
||||||
|
}
|
||||||
|
|
||||||
|
processed := regex.Apply(newM1())
|
||||||
|
|
||||||
|
expectedFields := map[string]interface{}{
|
||||||
|
"request": "/users/42/",
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, expectedFields, processed[0].Fields(), test.message, "Should not change fields")
|
||||||
|
assert.Equal(t, test.expectedTags, processed[0].Tags(), test.message)
|
||||||
|
assert.Equal(t, "access_log", processed[0].Name(), "Should not change name")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMultipleConversions(t *testing.T) {
|
||||||
|
regex := NewRegex()
|
||||||
|
regex.Tags = []converter{
|
||||||
|
{
|
||||||
|
Key: "resp_code",
|
||||||
|
Pattern: "^(\\d)\\d\\d$",
|
||||||
|
Replacement: "${1}xx",
|
||||||
|
ResultKey: "resp_code_group",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: "resp_code_group",
|
||||||
|
Pattern: "2xx",
|
||||||
|
Replacement: "OK",
|
||||||
|
ResultKey: "resp_code_text",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
regex.Fields = []converter{
|
||||||
|
{
|
||||||
|
Key: "request",
|
||||||
|
Pattern: "^/api(?P<method>/[\\w/]+)\\S*",
|
||||||
|
Replacement: "${method}",
|
||||||
|
ResultKey: "method",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: "request",
|
||||||
|
Pattern: ".*category=(\\w+).*",
|
||||||
|
Replacement: "${1}",
|
||||||
|
ResultKey: "search_category",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
processed := regex.Apply(newM2())
|
||||||
|
|
||||||
|
expectedFields := map[string]interface{}{
|
||||||
|
"request": "/api/search/?category=plugins&q=regex&sort=asc",
|
||||||
|
"method": "/search/",
|
||||||
|
"search_category": "plugins",
|
||||||
|
"ignore_number": int64(200),
|
||||||
|
"ignore_bool": true,
|
||||||
|
}
|
||||||
|
expectedTags := map[string]string{
|
||||||
|
"verb": "GET",
|
||||||
|
"resp_code": "200",
|
||||||
|
"resp_code_group": "2xx",
|
||||||
|
"resp_code_text": "OK",
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, expectedFields, processed[0].Fields())
|
||||||
|
assert.Equal(t, expectedTags, processed[0].Tags())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNoMatches(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
message string
|
||||||
|
converter converter
|
||||||
|
expectedFields map[string]interface{}
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
message: "Should not change anything if there is no field with given key",
|
||||||
|
converter: converter{
|
||||||
|
Key: "not_exists",
|
||||||
|
Pattern: "\\.*",
|
||||||
|
Replacement: "x",
|
||||||
|
},
|
||||||
|
expectedFields: map[string]interface{}{
|
||||||
|
"request": "/users/42/",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
message: "Should not change anything if regex doesn't match",
|
||||||
|
converter: converter{
|
||||||
|
Key: "request",
|
||||||
|
Pattern: "not_match",
|
||||||
|
Replacement: "x",
|
||||||
|
},
|
||||||
|
expectedFields: map[string]interface{}{
|
||||||
|
"request": "/users/42/",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
message: "Should emit empty string when result_key given but regex doesn't match",
|
||||||
|
converter: converter{
|
||||||
|
Key: "request",
|
||||||
|
Pattern: "not_match",
|
||||||
|
Replacement: "x",
|
||||||
|
ResultKey: "new_field",
|
||||||
|
},
|
||||||
|
expectedFields: map[string]interface{}{
|
||||||
|
"request": "/users/42/",
|
||||||
|
"new_field": "",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
regex := NewRegex()
|
||||||
|
regex.Fields = []converter{
|
||||||
|
test.converter,
|
||||||
|
}
|
||||||
|
|
||||||
|
processed := regex.Apply(newM1())
|
||||||
|
|
||||||
|
assert.Equal(t, test.expectedFields, processed[0].Fields(), test.message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkConversions(b *testing.B) {
|
||||||
|
regex := NewRegex()
|
||||||
|
regex.Tags = []converter{
|
||||||
|
{
|
||||||
|
Key: "resp_code",
|
||||||
|
Pattern: "^(\\d)\\d\\d$",
|
||||||
|
Replacement: "${1}xx",
|
||||||
|
ResultKey: "resp_code_group",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
regex.Fields = []converter{
|
||||||
|
{
|
||||||
|
Key: "request",
|
||||||
|
Pattern: "^/users/\\d+/$",
|
||||||
|
Replacement: "/users/{id}/",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for n := 0; n < b.N; n++ {
|
||||||
|
processed := regex.Apply(newM1())
|
||||||
|
_ = processed
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue