From 9f8de25e0ebd921dd7b0a8071b412499563900ee Mon Sep 17 00:00:00 2001 From: Ayrdrie Date: Wed, 22 Aug 2018 16:28:50 -0700 Subject: [PATCH] Add parser processor (#4551) --- metric/builder.go | 4 +- plugins/processors/all/all.go | 1 + plugins/processors/parser/README.md | 43 ++ plugins/processors/parser/parser.go | 124 +++++ plugins/processors/parser/parser_test.go | 670 +++++++++++++++++++++++ 5 files changed, 841 insertions(+), 1 deletion(-) create mode 100644 plugins/processors/parser/README.md create mode 100644 plugins/processors/parser/parser.go create mode 100644 plugins/processors/parser/parser_test.go diff --git a/metric/builder.go b/metric/builder.go index c579046df..9a331b9a4 100644 --- a/metric/builder.go +++ b/metric/builder.go @@ -41,7 +41,9 @@ func (b *Builder) SetTime(tm time.Time) { } func (b *Builder) Reset() { - b.metric = &metric{} + b.metric = &metric{ + tp: telegraf.Untyped, + } } func (b *Builder) Metric() (telegraf.Metric, error) { diff --git a/plugins/processors/all/all.go b/plugins/processors/all/all.go index f581ea602..5c2e2549e 100644 --- a/plugins/processors/all/all.go +++ b/plugins/processors/all/all.go @@ -4,6 +4,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/processors/converter" _ "github.com/influxdata/telegraf/plugins/processors/enum" _ "github.com/influxdata/telegraf/plugins/processors/override" + _ "github.com/influxdata/telegraf/plugins/processors/parser" _ "github.com/influxdata/telegraf/plugins/processors/printer" _ "github.com/influxdata/telegraf/plugins/processors/regex" _ "github.com/influxdata/telegraf/plugins/processors/rename" diff --git a/plugins/processors/parser/README.md b/plugins/processors/parser/README.md new file mode 100644 index 000000000..7564a75ea --- /dev/null +++ b/plugins/processors/parser/README.md @@ -0,0 +1,43 @@ +# Parser Processor Plugin + +This plugin parses defined fields containing the specified data format and +creates new metrics based on the contents of the field. + +## Configuration +```toml +[[processors.parser]] + ## The name of the fields whose value will be parsed. + parse_fields = ["message"] + + ## If true, incoming metrics are not emitted. + drop_original = false + + ## If set to override, emitted metrics will be merged by overriding the + ## original metric using the newly parsed metrics. + merge = "override" + + ## The dataformat to be read from files + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +``` + +### Example: + +```toml +[[processors.parser]] + parse_fields = ["message"] + merge = "override" + data_format = "logfmt" +``` + +**Input**: +``` +syslog,appname=influxd,facility=daemon,hostname=http://influxdb.example.org (influxdb.example.org),severity=info version=1i,severity_code=6i,facility_code=3i,timestamp=1533848508138040000i,procid="6629",message=" ts=2018-08-09T21:01:48.137963Z lvl=info msg=\"Executing query\" log_id=09p7QbOG000 service=query query=\"SHOW DATABASES\"" +``` + +**Output**: +``` +syslog,appname=influxd,facility=daemon,hostname=http://influxdb.example.org (influxdb.example.org),severity=info version=1i,severity_code=6i,facility_code=3i,timestamp=1533848508138040000i,procid="6629",ts="2018-08-09T21:01:48.137963Z",lvl=info msg="Executing query",log_id="09p7QbOG000",service="query",query="SHOW DATABASES" +``` diff --git a/plugins/processors/parser/parser.go b/plugins/processors/parser/parser.go new file mode 100644 index 000000000..63230763a --- /dev/null +++ b/plugins/processors/parser/parser.go @@ -0,0 +1,124 @@ +package parser + +import ( + "log" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/processors" +) + +type Parser struct { + parsers.Config + DropOriginal bool `toml:"drop_original"` + Merge string `toml:"merge"` + ParseFields []string `toml:"parse_fields"` + Parser parsers.Parser +} + +var SampleConfig = ` + ## The name of the fields whose value will be parsed. + parse_fields = [] + + ## If true, incoming metrics are not emitted. + drop_original = false + + ## If set to override, emitted metrics will be merged by overriding the + ## original metric using the newly parsed metrics. + merge = "override" + + ## The dataformat to be read from files + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +` + +func (p *Parser) SampleConfig() string { + return SampleConfig +} + +func (p *Parser) Description() string { + return "Parse a value in a specified field/tag(s) and add the result in a new metric" +} + +func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric { + if p.Parser == nil { + var err error + p.Parser, err = parsers.NewParser(&p.Config) + if err != nil { + log.Printf("E! [processors.parser] could not create parser: %v", err) + return metrics + } + } + + results := []telegraf.Metric{} + + for _, metric := range metrics { + newMetrics := []telegraf.Metric{} + if !p.DropOriginal { + newMetrics = append(newMetrics, metric) + } + + for _, key := range p.ParseFields { + for _, field := range metric.FieldList() { + if field.Key == key { + switch value := field.Value.(type) { + case string: + fromFieldMetric, err := p.parseField(value) + if err != nil { + log.Printf("E! [processors.parser] could not parse field %s: %v", key, err) + } + + for _, m := range fromFieldMetric { + if m.Name() == "" { + m.SetName(metric.Name()) + } + } + + // multiple parsed fields shouldn't create multiple + // metrics so we'll merge tags/fields down into one + // prior to returning. + newMetrics = append(newMetrics, fromFieldMetric...) + default: + log.Printf("E! [processors.parser] field '%s' not a string, skipping", key) + } + } + } + } + + if len(newMetrics) == 0 { + continue + } + + if p.Merge == "override" { + results = append(results, merge(newMetrics[0], newMetrics[1:])) + } else { + results = append(results, newMetrics...) + } + } + return results +} + +func merge(base telegraf.Metric, metrics []telegraf.Metric) telegraf.Metric { + for _, metric := range metrics { + for _, field := range metric.FieldList() { + base.AddField(field.Key, field.Value) + } + for _, tag := range metric.TagList() { + base.AddTag(tag.Key, tag.Value) + } + base.SetName(metric.Name()) + } + return base +} + +func (p *Parser) parseField(value string) ([]telegraf.Metric, error) { + return p.Parser.Parse([]byte(value)) +} + +func init() { + processors.Add("parser", func() telegraf.Processor { + return &Parser{DropOriginal: false} + }) +} diff --git a/plugins/processors/parser/parser_test.go b/plugins/processors/parser/parser_test.go new file mode 100644 index 000000000..ac042848f --- /dev/null +++ b/plugins/processors/parser/parser_test.go @@ -0,0 +1,670 @@ +package parser + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +//compares metrics without comparing time +func compareMetrics(t *testing.T, expected, actual []telegraf.Metric) { + assert.Equal(t, len(expected), len(actual)) + for i, metric := range actual { + require.Equal(t, expected[i].Name(), metric.Name()) + require.Equal(t, expected[i].Fields(), metric.Fields()) + require.Equal(t, expected[i].Tags(), metric.Tags()) + } +} + +func Metric(v telegraf.Metric, err error) telegraf.Metric { + if err != nil { + panic(err) + } + return v +} + +func TestApply(t *testing.T) { + tests := []struct { + name string + parseFields []string + config parsers.Config + dropOriginal bool + merge string + input telegraf.Metric + expected []telegraf.Metric + }{ + { + name: "parse one field drop original", + parseFields: []string{"sample"}, + dropOriginal: true, + config: parsers.Config{ + DataFormat: "json", + TagKeys: []string{ + "ts", + "lvl", + "msg", + "method", + }, + }, + input: Metric( + metric.New( + "singleField", + map[string]string{ + "some": "tag", + }, + map[string]interface{}{ + "sample": `{"ts":"2018-07-24T19:43:40.275Z","lvl":"info","msg":"http request","method":"POST"}`, + }, + time.Unix(0, 0))), + expected: []telegraf.Metric{ + Metric(metric.New( + "singleField", + map[string]string{ + "ts": "2018-07-24T19:43:40.275Z", + "lvl": "info", + "msg": "http request", + "method": "POST", + }, + map[string]interface{}{}, + time.Unix(0, 0))), + }, + }, + { + name: "parse one field with merge", + parseFields: []string{"sample"}, + dropOriginal: false, + merge: "override", + config: parsers.Config{ + DataFormat: "json", + TagKeys: []string{ + "ts", + "lvl", + "msg", + "method", + }, + }, + input: Metric( + metric.New( + "singleField", + map[string]string{ + "some": "tag", + }, + map[string]interface{}{ + "sample": `{"ts":"2018-07-24T19:43:40.275Z","lvl":"info","msg":"http request","method":"POST"}`, + }, + time.Unix(0, 0))), + expected: []telegraf.Metric{ + Metric(metric.New( + "singleField", + map[string]string{ + "some": "tag", + "ts": "2018-07-24T19:43:40.275Z", + "lvl": "info", + "msg": "http request", + "method": "POST", + }, + map[string]interface{}{ + "sample": `{"ts":"2018-07-24T19:43:40.275Z","lvl":"info","msg":"http request","method":"POST"}`, + }, + time.Unix(0, 0))), + }, + }, + { + name: "parse one field keep", + parseFields: []string{"sample"}, + dropOriginal: false, + config: parsers.Config{ + DataFormat: "json", + TagKeys: []string{ + "ts", + "lvl", + "msg", + "method", + }, + }, + input: Metric( + metric.New( + "singleField", + map[string]string{ + "some": "tag", + }, + map[string]interface{}{ + "sample": `{"ts":"2018-07-24T19:43:40.275Z","lvl":"info","msg":"http request","method":"POST"}`, + }, + time.Unix(0, 0))), + expected: []telegraf.Metric{ + Metric(metric.New( + "singleField", + map[string]string{ + "some": "tag", + }, + map[string]interface{}{ + "sample": `{"ts":"2018-07-24T19:43:40.275Z","lvl":"info","msg":"http request","method":"POST"}`, + }, + time.Unix(0, 0))), + Metric(metric.New( + "singleField", + map[string]string{ + "ts": "2018-07-24T19:43:40.275Z", + "lvl": "info", + "msg": "http request", + "method": "POST", + }, + map[string]interface{}{}, + time.Unix(0, 0))), + }, + }, + { + name: "parse one field keep with measurement name", + parseFields: []string{"message"}, + config: parsers.Config{ + DataFormat: "influx", + }, + dropOriginal: false, + input: Metric( + metric.New( + "influxField", + map[string]string{}, + map[string]interface{}{ + "message": "deal,computer_name=hosta message=\"stuff\" 1530654676316265790", + }, + time.Unix(0, 0))), + expected: []telegraf.Metric{ + Metric(metric.New( + "influxField", + map[string]string{}, + map[string]interface{}{ + "message": "deal,computer_name=hosta message=\"stuff\" 1530654676316265790", + }, + time.Unix(0, 0))), + Metric(metric.New( + "deal", + map[string]string{ + "computer_name": "hosta", + }, + map[string]interface{}{ + "message": "stuff", + }, + time.Unix(0, 0))), + }, + }, + { + name: "parse one field override replaces name", + parseFields: []string{"message"}, + dropOriginal: false, + merge: "override", + config: parsers.Config{ + DataFormat: "influx", + }, + input: Metric( + metric.New( + "influxField", + map[string]string{ + "some": "tag", + }, + map[string]interface{}{ + "message": "deal,computer_name=hosta message=\"stuff\" 1530654676316265790", + }, + time.Unix(0, 0))), + expected: []telegraf.Metric{ + Metric(metric.New( + "deal", + map[string]string{ + "computer_name": "hosta", + "some": "tag", + }, + map[string]interface{}{ + "message": "stuff", + }, + time.Unix(0, 0))), + }, + }, + { + name: "parse grok field", + parseFields: []string{"grokSample"}, + dropOriginal: true, + config: parsers.Config{ + DataFormat: "grok", + GrokPatterns: []string{"%{COMBINED_LOG_FORMAT}"}, + }, + input: Metric( + metric.New( + "success", + map[string]string{}, + map[string]interface{}{ + "grokSample": "127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] \"GET /xampp/status.php HTTP/1.1\" 200 3891 \"http://cadenza/xampp/navi.php\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\"", + }, + time.Unix(0, 0))), + expected: []telegraf.Metric{ + Metric(metric.New( + "success", + map[string]string{ + "resp_code": "200", + "verb": "GET", + }, + map[string]interface{}{ + "resp_bytes": int64(3891), + "auth": "-", + "request": "/xampp/status.php", + "referrer": "http://cadenza/xampp/navi.php", + "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0", + "client_ip": "127.0.0.1", + "ident": "-", + "http_version": float64(1.1), + }, + time.Unix(0, 0))), + }, + }, + { + name: "parse two fields [replace]", + parseFields: []string{"field_1", "field_2"}, + dropOriginal: true, + config: parsers.Config{ + DataFormat: "json", + TagKeys: []string{"lvl", "err"}, + }, + input: Metric( + metric.New( + "bigMeasure", + map[string]string{}, + map[string]interface{}{ + "field_1": `{"lvl":"info","msg":"http request"}`, + "field_2": `{"err":"fatal","fatal":"security threat"}`, + }, + time.Unix(0, 0))), + expected: []telegraf.Metric{ + Metric(metric.New( + "bigMeasure", + map[string]string{ + "lvl": "info", + }, + map[string]interface{}{}, + time.Unix(0, 0))), + Metric(metric.New( + "bigMeasure", + map[string]string{ + "err": "fatal", + }, + map[string]interface{}{}, + time.Unix(0, 0))), + }, + }, + { + name: "parse two fields [merge]", + parseFields: []string{"field_1", "field_2"}, + dropOriginal: false, + merge: "override", + config: parsers.Config{ + DataFormat: "json", + TagKeys: []string{"lvl", "msg", "err", "fatal"}, + }, + input: Metric( + metric.New( + "bigMeasure", + map[string]string{}, + map[string]interface{}{ + "field_1": `{"lvl":"info","msg":"http request"}`, + "field_2": `{"err":"fatal","fatal":"security threat"}`, + }, + time.Unix(0, 0))), + expected: []telegraf.Metric{ + Metric(metric.New( + "bigMeasure", + map[string]string{ + "lvl": "info", + "msg": "http request", + "err": "fatal", + "fatal": "security threat", + }, + map[string]interface{}{ + "field_1": `{"lvl":"info","msg":"http request"}`, + "field_2": `{"err":"fatal","fatal":"security threat"}`, + }, + time.Unix(0, 0))), + }, + }, + { + name: "parse two fields [keep]", + parseFields: []string{"field_1", "field_2"}, + dropOriginal: false, + config: parsers.Config{ + DataFormat: "json", + TagKeys: []string{"lvl", "msg", "err", "fatal"}, + }, + input: Metric( + metric.New( + "bigMeasure", + map[string]string{}, + map[string]interface{}{ + "field_1": `{"lvl":"info","msg":"http request"}`, + "field_2": `{"err":"fatal","fatal":"security threat"}`, + }, + time.Unix(0, 0))), + expected: []telegraf.Metric{ + Metric(metric.New( + "bigMeasure", + map[string]string{}, + map[string]interface{}{ + "field_1": `{"lvl":"info","msg":"http request"}`, + "field_2": `{"err":"fatal","fatal":"security threat"}`, + }, + time.Unix(0, 0))), + Metric(metric.New( + "bigMeasure", + map[string]string{ + "lvl": "info", + "msg": "http request", + }, + map[string]interface{}{}, + time.Unix(0, 0))), + Metric(metric.New( + "bigMeasure", + map[string]string{ + "err": "fatal", + "fatal": "security threat", + }, + map[string]interface{}{}, + time.Unix(0, 0))), + }, + }, + { + name: "Fail to parse one field but parses other [keep]", + parseFields: []string{"good", "bad"}, + dropOriginal: false, + config: parsers.Config{ + DataFormat: "json", + TagKeys: []string{"lvl"}, + }, + input: Metric( + metric.New( + "success", + map[string]string{}, + map[string]interface{}{ + "good": `{"lvl":"info"}`, + "bad": "why", + }, + time.Unix(0, 0))), + expected: []telegraf.Metric{ + Metric(metric.New( + "success", + map[string]string{}, + map[string]interface{}{ + "good": `{"lvl":"info"}`, + "bad": "why", + }, + time.Unix(0, 0))), + Metric(metric.New( + "success", + map[string]string{ + "lvl": "info", + }, + map[string]interface{}{}, + time.Unix(0, 0))), + }, + }, + { + name: "Fail to parse one field but parses other [keep] v2", + parseFields: []string{"bad", "good", "ok"}, + dropOriginal: false, + config: parsers.Config{ + DataFormat: "json", + TagKeys: []string{"lvl", "thing"}, + }, + input: Metric( + metric.New( + "success", + map[string]string{}, + map[string]interface{}{ + "bad": "why", + "good": `{"lvl":"info"}`, + "ok": `{"thing":"thang"}`, + }, + time.Unix(0, 0))), + expected: []telegraf.Metric{ + Metric(metric.New( + "success", + map[string]string{}, + map[string]interface{}{ + "bad": "why", + "good": `{"lvl":"info"}`, + "ok": `{"thing":"thang"}`, + }, + time.Unix(0, 0))), + Metric(metric.New( + "success", + map[string]string{ + "lvl": "info", + }, + map[string]interface{}{}, + time.Unix(0, 0))), + Metric(metric.New( + "success", + map[string]string{ + "thing": "thang", + }, + map[string]interface{}{}, + time.Unix(0, 0))), + }, + }, + { + name: "Fail to parse one field but parses other [merge]", + parseFields: []string{"good", "bad"}, + dropOriginal: false, + merge: "override", + config: parsers.Config{ + DataFormat: "json", + TagKeys: []string{"lvl"}, + }, + input: Metric( + metric.New( + "success", + map[string]string{ + "a": "tag", + }, + map[string]interface{}{ + "good": `{"lvl":"info"}`, + "bad": "why", + }, + time.Unix(0, 0))), + expected: []telegraf.Metric{ + Metric(metric.New( + "success", + map[string]string{ + "a": "tag", + "lvl": "info", + }, + map[string]interface{}{ + "good": `{"lvl":"info"}`, + "bad": "why", + }, + time.Unix(0, 0))), + }, + }, + { + name: "Fail to parse one field but parses other [replace]", + parseFields: []string{"good", "bad"}, + dropOriginal: true, + config: parsers.Config{ + DataFormat: "json", + TagKeys: []string{"lvl"}, + }, + input: Metric( + metric.New( + "success", + map[string]string{ + "thing": "tag", + }, + map[string]interface{}{ + "good": `{"lvl":"info"}`, + "bad": "why", + }, + time.Unix(0, 0))), + expected: []telegraf.Metric{ + Metric(metric.New( + "success", + map[string]string{ + "lvl": "info", + }, + map[string]interface{}{}, + time.Unix(0, 0))), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + parser := Parser{ + Config: tt.config, + ParseFields: tt.parseFields, + DropOriginal: tt.dropOriginal, + Merge: tt.merge, + } + + output := parser.Apply(tt.input) + t.Logf("Testing: %s", tt.name) + compareMetrics(t, tt.expected, output) + }) + } +} + +func TestBadApply(t *testing.T) { + tests := []struct { + name string + parseFields []string + config parsers.Config + input telegraf.Metric + expected []telegraf.Metric + }{ + { + name: "field not found", + parseFields: []string{"bad_field"}, + config: parsers.Config{ + DataFormat: "json", + }, + input: Metric( + metric.New( + "bad", + map[string]string{}, + map[string]interface{}{ + "some_field": 5, + }, + time.Unix(0, 0))), + expected: []telegraf.Metric{ + Metric(metric.New( + "bad", + map[string]string{}, + map[string]interface{}{ + "some_field": 5, + }, + time.Unix(0, 0))), + }, + }, + { + name: "non string field", + parseFields: []string{"some_field"}, + config: parsers.Config{ + DataFormat: "json", + }, + input: Metric( + metric.New( + "bad", + map[string]string{}, + map[string]interface{}{ + "some_field": 5, + }, + time.Unix(0, 0))), + expected: []telegraf.Metric{ + Metric(metric.New( + "bad", + map[string]string{}, + map[string]interface{}{ + "some_field": 5, + }, + time.Unix(0, 0))), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + parser := Parser{ + Config: tt.config, + ParseFields: tt.parseFields, + } + + output := parser.Apply(tt.input) + + compareMetrics(t, output, tt.expected) + }) + } +} + +// Benchmarks + +func getMetricFields(metric telegraf.Metric) interface{} { + key := "field3" + if value, ok := metric.Fields()[key]; ok { + return value + } + return nil +} + +func getMetricFieldList(metric telegraf.Metric) interface{} { + key := "field3" + fields := metric.FieldList() + for _, field := range fields { + if field.Key == key { + return field.Value + } + } + return nil +} + +func BenchmarkFieldListing(b *testing.B) { + metric := Metric(metric.New( + "test", + map[string]string{ + "some": "tag", + }, + map[string]interface{}{ + "field0": `{"ts":"2018-07-24T19:43:40.275Z","lvl":"info","msg":"http request","method":"POST"}`, + "field1": `{"ts":"2018-07-24T19:43:40.275Z","lvl":"info","msg":"http request","method":"POST"}`, + "field2": `{"ts":"2018-07-24T19:43:40.275Z","lvl":"info","msg":"http request","method":"POST"}`, + "field3": `{"ts":"2018-07-24T19:43:40.275Z","lvl":"info","msg":"http request","method":"POST"}`, + "field4": `{"ts":"2018-07-24T19:43:40.275Z","lvl":"info","msg":"http request","method":"POST"}`, + "field5": `{"ts":"2018-07-24T19:43:40.275Z","lvl":"info","msg":"http request","method":"POST"}`, + "field6": `{"ts":"2018-07-24T19:43:40.275Z","lvl":"info","msg":"http request","method":"POST"}`, + }, + time.Unix(0, 0))) + + for n := 0; n < b.N; n++ { + getMetricFieldList(metric) + } +} + +func BenchmarkFields(b *testing.B) { + metric := Metric(metric.New( + "test", + map[string]string{ + "some": "tag", + }, + map[string]interface{}{ + "field0": `{"ts":"2018-07-24T19:43:40.275Z","lvl":"info","msg":"http request","method":"POST"}`, + "field1": `{"ts":"2018-07-24T19:43:40.275Z","lvl":"info","msg":"http request","method":"POST"}`, + "field2": `{"ts":"2018-07-24T19:43:40.275Z","lvl":"info","msg":"http request","method":"POST"}`, + "field3": `{"ts":"2018-07-24T19:43:40.275Z","lvl":"info","msg":"http request","method":"POST"}`, + "field4": `{"ts":"2018-07-24T19:43:40.275Z","lvl":"info","msg":"http request","method":"POST"}`, + "field5": `{"ts":"2018-07-24T19:43:40.275Z","lvl":"info","msg":"http request","method":"POST"}`, + "field6": `{"ts":"2018-07-24T19:43:40.275Z","lvl":"info","msg":"http request","method":"POST"}`, + }, + time.Unix(0, 0))) + + for n := 0; n < b.N; n++ { + getMetricFields(metric) + } +}