From b1ae81bb75464302aa4586ceaded8921ffaa42f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20=C3=81lvarez?= <1671935+kir4h@users.noreply.github.com> Date: Fri, 1 May 2020 20:21:41 +0200 Subject: [PATCH] Add filepath processor plugin (#7418) --- README.md | 1 + plugins/processors/all/all.go | 1 + plugins/processors/filepath/README.md | 207 ++++++++++++++++++ plugins/processors/filepath/filepath.go | 150 +++++++++++++ plugins/processors/filepath/filepath_test.go | 70 ++++++ .../filepath/filepath_test_helpers.go | 100 +++++++++ .../filepath/filepath_windows_test.go | 43 ++++ 7 files changed, 572 insertions(+) create mode 100644 plugins/processors/filepath/README.md create mode 100644 plugins/processors/filepath/filepath.go create mode 100644 plugins/processors/filepath/filepath_test.go create mode 100644 plugins/processors/filepath/filepath_test_helpers.go create mode 100644 plugins/processors/filepath/filepath_windows_test.go diff --git a/README.md b/README.md index 571272b32..ec203c1f2 100644 --- a/README.md +++ b/README.md @@ -363,6 +363,7 @@ For documentation on the latest development code see the [documentation index][d * [date](/plugins/processors/date) * [dedup](/plugins/processors/dedup) * [enum](/plugins/processors/enum) +* [filepath](/plugins/processors/filepath) * [override](/plugins/processors/override) * [parser](/plugins/processors/parser) * [pivot](/plugins/processors/pivot) diff --git a/plugins/processors/all/all.go b/plugins/processors/all/all.go index ab6746c62..5ff977324 100644 --- a/plugins/processors/all/all.go +++ b/plugins/processors/all/all.go @@ -6,6 +6,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/processors/date" _ "github.com/influxdata/telegraf/plugins/processors/dedup" _ "github.com/influxdata/telegraf/plugins/processors/enum" + _ "github.com/influxdata/telegraf/plugins/processors/filepath" _ "github.com/influxdata/telegraf/plugins/processors/override" _ "github.com/influxdata/telegraf/plugins/processors/parser" _ "github.com/influxdata/telegraf/plugins/processors/pivot" diff --git a/plugins/processors/filepath/README.md b/plugins/processors/filepath/README.md new file mode 100644 index 000000000..f4473ff62 --- /dev/null +++ b/plugins/processors/filepath/README.md @@ -0,0 +1,207 @@ +# Filepath Processor Plugin + +The `filepath` processor plugin maps certain go functions from [path/filepath](https://golang.org/pkg/path/filepath/) +onto tag and field values. Values can be modified in place or stored in another key. + +Implemented functions are: + +* [Base](https://golang.org/pkg/path/filepath/#Base) (accessible through `[[processors.filepath.basename]]`) +* [Rel](https://golang.org/pkg/path/filepath/#Rel) (accessible through `[[processors.filepath.rel]]`) +* [Dir](https://golang.org/pkg/path/filepath/#Dir) (accessible through `[[processors.filepath.dir]]`) +* [Clean](https://golang.org/pkg/path/filepath/#Clean) (accessible through `[[processors.filepath.clean]]`) +* [ToSlash](https://golang.org/pkg/path/filepath/#ToSlash) (accessible through `[[processors.filepath.toslash]]`) + + On top of that, the plugin provides an extra function to retrieve the final path component without its extension. This + function is accessible through the `[[processors.filepath.stem]]` configuration item. + +Please note that, in this implementation, these functions are processed in the order that they appear above( except for +`stem` that is applied in the first place). + +Specify the `tag` and/or `field` that you want processed in each section and optionally a `dest` if you want the result +stored in a new tag or field. + +If you plan to apply multiple transformations to the same `tag`/`field`, bear in mind the processing order stated above. + +## Configuration + +```toml +[[processors.filepath]] + ## Treat the tag value as a path and convert it to its last element, storing the result in a new tag + # [[processors.filepath.basename]] + # tag = "path" + # dest = "basepath" + + ## Treat the field value as a path and keep all but the last element of path, typically the path's directory + # [[processors.filepath.dirname]] + # field = "path" + + ## Treat the tag value as a path, converting it to its the last element without its suffix + # [[processors.filepath.stem]] + # tag = "path" + + ## Treat the tag value as a path, converting it to the shortest path name equivalent + ## to path by purely lexical processing + # [[processors.filepath.clean]] + # tag = "path" + + ## Treat the tag value as a path, converting it to a relative path that is lexically + ## equivalent to the source path when joined to 'base_path' + # [[processors.filepath.rel]] + # tag = "path" + # base_path = "/var/log" + + ## Treat the tag value as a path, replacing each separator character in path with a '/' character. Has only + ## effect on Windows + # [[processors.filepath.toslash]] + # tag = "path" +``` + +## Considerations + +### Clean + +Even though `clean` is provided a standalone function, it is also invoked when using the `rel` and `dirname` functions, +so there is no need to use it along with them. + +That is: + + ```toml +[[processors.filepath]] + [[processors.filepath.dir]] + tag = "path" + [[processors.filepath.clean]] + tag = "path" + ``` + +Is equivalent to: + + ```toml +[[processors.filepath]] + [[processors.filepath.dir]] + tag = "path" + ``` + +### ToSlash + +The effects of this function are only noticeable on Windows platforms, because of the underlying golang implementation. + +## Examples + +### Basename + +```toml +[[processors.filepath]] + [[processors.filepath.basename]] + tag = "path" +``` + +```diff +- my_metric,path="/var/log/batch/ajob.log" duration_seconds=134 1587920425000000000 ++ my_metric,path="ajob.log" duration_seconds=134 1587920425000000000 +``` + +### Dirname + +```toml +[[processors.filepath]] + [[processors.filepath.dirname]] + field = "path" + dest = "folder" +``` + +```diff +- my_metric path="/var/log/batch/ajob.log",duration_seconds=134 1587920425000000000 ++ my_metric path="/var/log/batch/ajob.log",folder="/var/log/batch",duration_seconds=134 1587920425000000000 +``` + +### Stem + +```toml +[[processors.filepath]] + [[processors.filepath.stem]] + tag = "path" +``` + +```diff +- my_metric,path="/var/log/batch/ajob.log" duration_seconds=134 1587920425000000000 ++ my_metric,path="ajob" duration_seconds=134 1587920425000000000 +``` + +### Clean + +```toml +[[processors.filepath]] + [[processors.filepath.clean]] + tag = "path" +``` + +```diff +- my_metric,path="/var/log/dummy/../batch//ajob.log" duration_seconds=134 1587920425000000000 ++ my_metric,path="/var/log/batch/ajob.log" duration_seconds=134 1587920425000000000 +``` + +### Rel + +```toml +[[processors.filepath]] + [[processors.filepath.rel]] + tag = "path" + base_path = "/var/log" +``` + +```diff +- my_metric,path="/var/log/batch/ajob.log" duration_seconds=134 1587920425000000000 ++ my_metric,path="batch/ajob.log" duration_seconds=134 1587920425000000000 +``` + +### ToSlash + +```toml +[[processors.filepath]] + [[processors.filepath.rel]] + tag = "path" +``` + +```diff +- my_metric,path="\var\log\batch\ajob.log" duration_seconds=134 1587920425000000000 ++ my_metric,path="/var/log/batch/ajob.log" duration_seconds=134 1587920425000000000 +``` + +### Processing paths from tail plugin + +This plugin can be used together with the +[tail input plugn](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/tail) to make modifications +to the `path` tag injected for every file. + +Scenario: + +* A log file `/var/log/myjobs/mysql_backup.log`, containing logs for a job execution. Whenever the job ends, a line is +written to the log file following this format: `2020-04-05 11:45:21 total time execution: 70 seconds` +* We want to generate a measurement that captures the duration of the script as a field and includes the `path` as a +tag + * We are interested in the filename without its extensions, since it might be enough information for plotting our + execution times in a dashboard + * Just in case, we don't want to override the original path (if for some reason we end up having duplicates we might + want this information) + +For this purpose, we will use the `tail` input plugin, the `grok` parser plugin and the `filepath` processor. + +```toml +[[inputs.tail]] + files = ["/var/log/myjobs/**.log"] + data_format = "grok" + grok_patterns = ['%{TIMESTAMP_ISO8601:timestamp:ts-"2006-01-02 15:04:05"} total time execution: %{NUMBER:duration_seconds:int}'] + name_override = "myjobs" + +[[processors.filepath]] + [[processors.filepath.stem]] + tag = "path" + dest = "stempath" + +``` + +The resulting output for a job taking 70 seconds for the mentioned log file would look like: + +```text +myjobs_duration_seconds,host="my-host",path="/var/log/myjobs/mysql_backup.log",stempath="mysql_backup" 70 1587920425000000000 +``` diff --git a/plugins/processors/filepath/filepath.go b/plugins/processors/filepath/filepath.go new file mode 100644 index 000000000..70013de17 --- /dev/null +++ b/plugins/processors/filepath/filepath.go @@ -0,0 +1,150 @@ +package filepath + +import ( + "path/filepath" + "strings" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/processors" +) + +type Options struct { + BaseName []BaseOpts `toml:"basename"` + DirName []BaseOpts `toml:"dirname"` + Stem []BaseOpts + Clean []BaseOpts + Rel []RelOpts + ToSlash []BaseOpts `toml:"toslash"` +} + +type ProcessorFunc func(s string) string + +// BaseOpts contains options applicable to every function +type BaseOpts struct { + Field string + Tag string + Dest string +} + +type RelOpts struct { + BaseOpts + BasePath string +} + +const sampleConfig = ` + ## Treat the tag value as a path and convert it to its last element, storing the result in a new tag + # [[processors.filepath.basename]] + # tag = "path" + # dest = "basepath" + + ## Treat the field value as a path and keep all but the last element of path, typically the path's directory + # [[processors.filepath.dirname]] + # field = "path" + + ## Treat the tag value as a path, converting it to its the last element without its suffix + # [[processors.filepath.stem]] + # tag = "path" + + ## Treat the tag value as a path, converting it to the shortest path name equivalent + ## to path by purely lexical processing + # [[processors.filepath.clean]] + # tag = "path" + + ## Treat the tag value as a path, converting it to a relative path that is lexically + ## equivalent to the source path when joined to 'base_path' + # [[processors.filepath.rel]] + # tag = "path" + # base_path = "/var/log" + + ## Treat the tag value as a path, replacing each separator character in path with a '/' character. Has only + ## effect on Windows + # [[processors.filepath.toslash]] + # tag = "path" +` + +func (o *Options) SampleConfig() string { + return sampleConfig +} + +func (o *Options) Description() string { + return "Performs file path manipulations on tags and fields" +} + +// applyFunc applies the specified function to the metric +func (o *Options) applyFunc(bo BaseOpts, fn ProcessorFunc, metric telegraf.Metric) { + if bo.Tag != "" { + if v, ok := metric.GetTag(bo.Tag); ok { + targetTag := bo.Tag + + if bo.Dest != "" { + targetTag = bo.Dest + } + metric.AddTag(targetTag, fn(v)) + } + } + + if bo.Field != "" { + if v, ok := metric.GetField(bo.Field); ok { + targetField := bo.Field + + if bo.Dest != "" { + targetField = bo.Dest + } + + // Only string fields are considered + if v, ok := v.(string); ok { + metric.AddField(targetField, fn(v)) + } + + } + } +} + +func stemFilePath(path string) string { + return strings.TrimSuffix(filepath.Base(path), filepath.Ext(path)) +} + +// processMetric processes fields and tag values for a given metric applying the selected transformations +func (o *Options) processMetric(metric telegraf.Metric) { + // Stem + for _, v := range o.Stem { + o.applyFunc(v, stemFilePath, metric) + } + // Basename + for _, v := range o.BaseName { + o.applyFunc(v, filepath.Base, metric) + } + // Rel + for _, v := range o.Rel { + o.applyFunc(v.BaseOpts, func(s string) string { + relPath, _ := filepath.Rel(v.BasePath, s) + return relPath + }, metric) + } + // Dirname + for _, v := range o.DirName { + o.applyFunc(v, filepath.Dir, metric) + } + // Clean + for _, v := range o.Clean { + o.applyFunc(v, filepath.Clean, metric) + } + // ToSlash + for _, v := range o.ToSlash { + o.applyFunc(v, filepath.ToSlash, metric) + } +} + +func (o *Options) Apply(in ...telegraf.Metric) []telegraf.Metric { + for _, m := range in { + o.processMetric(m) + } + + return in +} + +func init() { + processors.Add("filepath", func() telegraf.Processor { + return &Options{} + }) +} diff --git a/plugins/processors/filepath/filepath_test.go b/plugins/processors/filepath/filepath_test.go new file mode 100644 index 000000000..a305c4c5c --- /dev/null +++ b/plugins/processors/filepath/filepath_test.go @@ -0,0 +1,70 @@ +// +build !windows + +package filepath + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" +) + +var samplePath = "/my/test//c/../path/file.log" + +func TestOptions_Apply(t *testing.T) { + tests := []testCase{ + { + name: "Smoke Test", + o: newOptions("/my/test/"), + inputMetrics: getSmokeTestInputMetrics(samplePath), + expectedMetrics: []telegraf.Metric{ + testutil.MustMetric( + smokeMetricName, + map[string]string{ + "baseTag": "file.log", + "dirTag": "/my/test/path", + "stemTag": "file", + "cleanTag": "/my/test/path/file.log", + "relTag": "path/file.log", + "slashTag": "/my/test//c/../path/file.log", + }, + map[string]interface{}{ + "baseField": "file.log", + "dirField": "/my/test/path", + "stemField": "file", + "cleanField": "/my/test/path/file.log", + "relField": "path/file.log", + "slashField": "/my/test//c/../path/file.log", + }, + time.Now()), + }, + }, + { + name: "Test Dest Option", + o: &Options{ + BaseName: []BaseOpts{ + { + Field: "sourcePath", + Tag: "sourcePath", + Dest: "basePath", + }, + }}, + inputMetrics: []telegraf.Metric{ + testutil.MustMetric( + "testMetric", + map[string]string{"sourcePath": samplePath}, + map[string]interface{}{"sourcePath": samplePath}, + time.Now()), + }, + expectedMetrics: []telegraf.Metric{ + testutil.MustMetric( + "testMetric", + map[string]string{"sourcePath": samplePath, "basePath": "file.log"}, + map[string]interface{}{"sourcePath": samplePath, "basePath": "file.log"}, + time.Now()), + }, + }, + } + runTestOptionsApply(t, tests) +} diff --git a/plugins/processors/filepath/filepath_test_helpers.go b/plugins/processors/filepath/filepath_test_helpers.go new file mode 100644 index 000000000..571730b54 --- /dev/null +++ b/plugins/processors/filepath/filepath_test_helpers.go @@ -0,0 +1,100 @@ +package filepath + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" +) + +const smokeMetricName = "testmetric" + +type testCase struct { + name string + o *Options + inputMetrics []telegraf.Metric + expectedMetrics []telegraf.Metric +} + +func newOptions(basePath string) *Options { + return &Options{ + BaseName: []BaseOpts{ + { + Field: "baseField", + Tag: "baseTag", + }, + }, + DirName: []BaseOpts{ + { + Field: "dirField", + Tag: "dirTag", + }, + }, + Stem: []BaseOpts{ + { + Field: "stemField", + Tag: "stemTag", + }, + }, + Clean: []BaseOpts{ + { + Field: "cleanField", + Tag: "cleanTag", + }, + }, + Rel: []RelOpts{ + { + BaseOpts: BaseOpts{ + Field: "relField", + Tag: "relTag", + }, + BasePath: basePath, + }, + }, + ToSlash: []BaseOpts{ + { + Field: "slashField", + Tag: "slashTag", + }, + }, + } +} + +func getSampleMetricTags(path string) map[string]string { + return map[string]string{ + "baseTag": path, + "dirTag": path, + "stemTag": path, + "cleanTag": path, + "relTag": path, + "slashTag": path, + } +} + +func getSampleMetricFields(path string) map[string]interface{} { + return map[string]interface{}{ + "baseField": path, + "dirField": path, + "stemField": path, + "cleanField": path, + "relField": path, + "slashField": path, + } +} + +func getSmokeTestInputMetrics(path string) []telegraf.Metric { + return []telegraf.Metric{ + testutil.MustMetric(smokeMetricName, getSampleMetricTags(path), getSampleMetricFields(path), + time.Now()), + } +} + +func runTestOptionsApply(t *testing.T, tests []testCase) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tt.o.Apply(tt.inputMetrics...) + testutil.RequireMetricsEqual(t, tt.expectedMetrics, got, testutil.SortMetrics(), testutil.IgnoreTime()) + }) + } +} diff --git a/plugins/processors/filepath/filepath_windows_test.go b/plugins/processors/filepath/filepath_windows_test.go new file mode 100644 index 000000000..daca33d18 --- /dev/null +++ b/plugins/processors/filepath/filepath_windows_test.go @@ -0,0 +1,43 @@ +package filepath + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" +) + +var samplePath = "c:\\my\\test\\\\c\\..\\path\\file.log" + +func TestOptions_Apply(t *testing.T) { + tests := []testCase{ + { + name: "Smoke Test", + o: newOptions("c:\\my\\test\\"), + inputMetrics: getSmokeTestInputMetrics(samplePath), + expectedMetrics: []telegraf.Metric{ + testutil.MustMetric( + smokeMetricName, + map[string]string{ + "baseTag": "file.log", + "dirTag": "c:\\my\\test\\path", + "stemTag": "file", + "cleanTag": "c:\\my\\test\\path\\file.log", + "relTag": "path\\file.log", + "slashTag": "c:/my/test//c/../path/file.log", + }, + map[string]interface{}{ + "baseField": "file.log", + "dirField": "c:\\my\\test\\path", + "stemField": "file", + "cleanField": "c:\\my\\test\\path\\file.log", + "relField": "path\\file.log", + "slashField": "c:/my/test//c/../path/file.log", + }, + time.Now()), + }, + }, + } + runTestOptionsApply(t, tests) +}