From ccc4a85fd643ff8699b2d61b96fc20ad2addab92 Mon Sep 17 00:00:00 2001 From: Alexander Shepelin Date: Tue, 22 May 2018 01:46:10 +0300 Subject: [PATCH] Add regex processor plugin (#3839) --- plugins/processors/all/all.go | 1 + plugins/processors/regex/README.md | 46 +++++ plugins/processors/regex/regex.go | 110 ++++++++++ plugins/processors/regex/regex_test.go | 273 +++++++++++++++++++++++++ 4 files changed, 430 insertions(+) create mode 100644 plugins/processors/regex/README.md create mode 100644 plugins/processors/regex/regex.go create mode 100644 plugins/processors/regex/regex_test.go diff --git a/plugins/processors/all/all.go b/plugins/processors/all/all.go index f14014e29..c15f76391 100644 --- a/plugins/processors/all/all.go +++ b/plugins/processors/all/all.go @@ -3,5 +3,6 @@ package all import ( _ "github.com/influxdata/telegraf/plugins/processors/override" _ "github.com/influxdata/telegraf/plugins/processors/printer" + _ "github.com/influxdata/telegraf/plugins/processors/regex" _ "github.com/influxdata/telegraf/plugins/processors/topk" ) diff --git a/plugins/processors/regex/README.md b/plugins/processors/regex/README.md new file mode 100644 index 000000000..c9eec037b --- /dev/null +++ b/plugins/processors/regex/README.md @@ -0,0 +1,46 @@ +# Regex Processor Plugin + +The `regex` plugin transforms tag and field values with regex pattern. If `result_key` parameter is present, it can produce new tags and fields from existing ones. + +### Configuration: + +```toml +[[processors.regex]] + namepass = ["nginx_requests"] + + # Tag and field conversions defined in a separate sub-tables + [[processors.regex.tags]] + ## Tag to change + key = "resp_code" + ## Regular expression to match on a tag value + pattern = "^(\\d)\\d\\d$" + ## Pattern for constructing a new value (${1} represents first subgroup) + replacement = "${1}xx" + + [[processors.regex.fields]] + key = "request" + ## All the power of the Go regular expressions available here + ## For example, named subgroups + pattern = "^/api(?P/[\\w/]+)\\S*" + replacement = "${method}" + ## If result_key is present, a new field will be created + ## instead of changing existing field + result_key = "method" + + # Multiple conversions may be applied for one field sequentially + # Let's extract one more value + [[processors.regex.fields]] + key = "request" + pattern = ".*category=(\\w+).*" + replacement = "${1}" + result_key = "search_category" +``` + +### Tags: + +No tags are applied by this processor. + +### Example Output: +``` +nginx_requests,verb=GET,resp_code=2xx request="/api/search/?category=plugins&q=regex&sort=asc",method="/search/",search_category="plugins",referrer="-",ident="-",http_version=1.1,agent="UserAgent",client_ip="127.0.0.1",auth="-",resp_bytes=270i 1519652321000000000 +``` diff --git a/plugins/processors/regex/regex.go b/plugins/processors/regex/regex.go new file mode 100644 index 000000000..3282406c8 --- /dev/null +++ b/plugins/processors/regex/regex.go @@ -0,0 +1,110 @@ +package regex + +import ( + "regexp" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/processors" +) + +type Regex struct { + Tags []converter + Fields []converter + regexCache map[string]*regexp.Regexp +} + +type converter struct { + Key string + Pattern string + Replacement string + ResultKey string +} + +const sampleConfig = ` + ## Tag and field conversions defined in a separate sub-tables + # [[processors.regex.tags]] + # ## Tag to change + # key = "resp_code" + # ## Regular expression to match on a tag value + # pattern = "^(\\d)\\d\\d$" + # ## Pattern for constructing a new value (${1} represents first subgroup) + # replacement = "${1}xx" + + # [[processors.regex.fields]] + # key = "request" + # ## All the power of the Go regular expressions available here + # ## For example, named subgroups + # pattern = "^/api(?P/[\\w/]+)\\S*" + # replacement = "${method}" + # ## If result_key is present, a new field will be created + # ## instead of changing existing field + # result_key = "method" + + ## Multiple conversions may be applied for one field sequentially + ## Let's extract one more value + # [[processors.regex.fields]] + # key = "request" + # pattern = ".*category=(\\w+).*" + # replacement = "${1}" + # result_key = "search_category" +` + +func NewRegex() *Regex { + return &Regex{ + regexCache: make(map[string]*regexp.Regexp), + } +} + +func (r *Regex) SampleConfig() string { + return sampleConfig +} + +func (r *Regex) Description() string { + return "Transforms tag and field values with regex pattern" +} + +func (r *Regex) Apply(in ...telegraf.Metric) []telegraf.Metric { + for _, metric := range in { + for _, converter := range r.Tags { + if value, ok := metric.GetTag(converter.Key); ok { + metric.AddTag(r.convert(converter, value)) + } + } + + for _, converter := range r.Fields { + if value, ok := metric.GetField(converter.Key); ok { + switch value := value.(type) { + case string: + metric.AddField(r.convert(converter, value)) + } + } + } + } + + return in +} + +func (r *Regex) convert(c converter, src string) (string, string) { + regex, compiled := r.regexCache[c.Pattern] + if !compiled { + regex = regexp.MustCompile(c.Pattern) + r.regexCache[c.Pattern] = regex + } + + value := "" + if c.ResultKey == "" || regex.MatchString(src) { + value = regex.ReplaceAllString(src, c.Replacement) + } + + if c.ResultKey != "" { + return c.ResultKey, value + } + + return c.Key, value +} + +func init() { + processors.Add("regex", func() telegraf.Processor { + return NewRegex() + }) +} diff --git a/plugins/processors/regex/regex_test.go b/plugins/processors/regex/regex_test.go new file mode 100644 index 000000000..e7c15e5aa --- /dev/null +++ b/plugins/processors/regex/regex_test.go @@ -0,0 +1,273 @@ +package regex + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/stretchr/testify/assert" +) + +func newM1() telegraf.Metric { + m1, _ := metric.New("access_log", + map[string]string{ + "verb": "GET", + "resp_code": "200", + }, + map[string]interface{}{ + "request": "/users/42/", + }, + time.Now(), + ) + return m1 +} + +func newM2() telegraf.Metric { + m2, _ := metric.New("access_log", + map[string]string{ + "verb": "GET", + "resp_code": "200", + }, + map[string]interface{}{ + "request": "/api/search/?category=plugins&q=regex&sort=asc", + "ignore_number": int64(200), + "ignore_bool": true, + }, + time.Now(), + ) + return m2 +} + +func TestFieldConversions(t *testing.T) { + tests := []struct { + message string + converter converter + expectedFields map[string]interface{} + }{ + { + message: "Should change existing field", + converter: converter{ + Key: "request", + Pattern: "^/users/\\d+/$", + Replacement: "/users/{id}/", + }, + expectedFields: map[string]interface{}{ + "request": "/users/{id}/", + }, + }, + { + message: "Should add new field", + converter: converter{ + Key: "request", + Pattern: "^/users/\\d+/$", + Replacement: "/users/{id}/", + ResultKey: "normalized_request", + }, + expectedFields: map[string]interface{}{ + "request": "/users/42/", + "normalized_request": "/users/{id}/", + }, + }, + } + + for _, test := range tests { + regex := NewRegex() + regex.Fields = []converter{ + test.converter, + } + + processed := regex.Apply(newM1()) + + expectedTags := map[string]string{ + "verb": "GET", + "resp_code": "200", + } + + assert.Equal(t, test.expectedFields, processed[0].Fields(), test.message) + assert.Equal(t, expectedTags, processed[0].Tags(), "Should not change tags") + assert.Equal(t, "access_log", processed[0].Name(), "Should not change name") + } +} + +func TestTagConversions(t *testing.T) { + tests := []struct { + message string + converter converter + expectedTags map[string]string + }{ + { + message: "Should change existing tag", + converter: converter{ + Key: "resp_code", + Pattern: "^(\\d)\\d\\d$", + Replacement: "${1}xx", + }, + expectedTags: map[string]string{ + "verb": "GET", + "resp_code": "2xx", + }, + }, + { + message: "Should add new tag", + converter: converter{ + Key: "resp_code", + Pattern: "^(\\d)\\d\\d$", + Replacement: "${1}xx", + ResultKey: "resp_code_group", + }, + expectedTags: map[string]string{ + "verb": "GET", + "resp_code": "200", + "resp_code_group": "2xx", + }, + }, + } + + for _, test := range tests { + regex := NewRegex() + regex.Tags = []converter{ + test.converter, + } + + processed := regex.Apply(newM1()) + + expectedFields := map[string]interface{}{ + "request": "/users/42/", + } + + assert.Equal(t, expectedFields, processed[0].Fields(), test.message, "Should not change fields") + assert.Equal(t, test.expectedTags, processed[0].Tags(), test.message) + assert.Equal(t, "access_log", processed[0].Name(), "Should not change name") + } +} + +func TestMultipleConversions(t *testing.T) { + regex := NewRegex() + regex.Tags = []converter{ + { + Key: "resp_code", + Pattern: "^(\\d)\\d\\d$", + Replacement: "${1}xx", + ResultKey: "resp_code_group", + }, + { + Key: "resp_code_group", + Pattern: "2xx", + Replacement: "OK", + ResultKey: "resp_code_text", + }, + } + regex.Fields = []converter{ + { + Key: "request", + Pattern: "^/api(?P/[\\w/]+)\\S*", + Replacement: "${method}", + ResultKey: "method", + }, + { + Key: "request", + Pattern: ".*category=(\\w+).*", + Replacement: "${1}", + ResultKey: "search_category", + }, + } + + processed := regex.Apply(newM2()) + + expectedFields := map[string]interface{}{ + "request": "/api/search/?category=plugins&q=regex&sort=asc", + "method": "/search/", + "search_category": "plugins", + "ignore_number": int64(200), + "ignore_bool": true, + } + expectedTags := map[string]string{ + "verb": "GET", + "resp_code": "200", + "resp_code_group": "2xx", + "resp_code_text": "OK", + } + + assert.Equal(t, expectedFields, processed[0].Fields()) + assert.Equal(t, expectedTags, processed[0].Tags()) +} + +func TestNoMatches(t *testing.T) { + tests := []struct { + message string + converter converter + expectedFields map[string]interface{} + }{ + { + message: "Should not change anything if there is no field with given key", + converter: converter{ + Key: "not_exists", + Pattern: "\\.*", + Replacement: "x", + }, + expectedFields: map[string]interface{}{ + "request": "/users/42/", + }, + }, + { + message: "Should not change anything if regex doesn't match", + converter: converter{ + Key: "request", + Pattern: "not_match", + Replacement: "x", + }, + expectedFields: map[string]interface{}{ + "request": "/users/42/", + }, + }, + { + message: "Should emit empty string when result_key given but regex doesn't match", + converter: converter{ + Key: "request", + Pattern: "not_match", + Replacement: "x", + ResultKey: "new_field", + }, + expectedFields: map[string]interface{}{ + "request": "/users/42/", + "new_field": "", + }, + }, + } + + for _, test := range tests { + regex := NewRegex() + regex.Fields = []converter{ + test.converter, + } + + processed := regex.Apply(newM1()) + + assert.Equal(t, test.expectedFields, processed[0].Fields(), test.message) + } +} + +func BenchmarkConversions(b *testing.B) { + regex := NewRegex() + regex.Tags = []converter{ + { + Key: "resp_code", + Pattern: "^(\\d)\\d\\d$", + Replacement: "${1}xx", + ResultKey: "resp_code_group", + }, + } + regex.Fields = []converter{ + { + Key: "request", + Pattern: "^/users/\\d+/$", + Replacement: "/users/{id}/", + }, + } + + for n := 0; n < b.N; n++ { + processed := regex.Apply(newM1()) + _ = processed + } +}