Add filepath processor plugin (#7418)

This commit is contained in:
Andrés Álvarez 2020-05-01 20:21:41 +02:00 committed by GitHub
parent 59acbd4f13
commit b1ae81bb75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 572 additions and 0 deletions

View File

@ -363,6 +363,7 @@ For documentation on the latest development code see the [documentation index][d
* [date](/plugins/processors/date) * [date](/plugins/processors/date)
* [dedup](/plugins/processors/dedup) * [dedup](/plugins/processors/dedup)
* [enum](/plugins/processors/enum) * [enum](/plugins/processors/enum)
* [filepath](/plugins/processors/filepath)
* [override](/plugins/processors/override) * [override](/plugins/processors/override)
* [parser](/plugins/processors/parser) * [parser](/plugins/processors/parser)
* [pivot](/plugins/processors/pivot) * [pivot](/plugins/processors/pivot)

View File

@ -6,6 +6,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/processors/date" _ "github.com/influxdata/telegraf/plugins/processors/date"
_ "github.com/influxdata/telegraf/plugins/processors/dedup" _ "github.com/influxdata/telegraf/plugins/processors/dedup"
_ "github.com/influxdata/telegraf/plugins/processors/enum" _ "github.com/influxdata/telegraf/plugins/processors/enum"
_ "github.com/influxdata/telegraf/plugins/processors/filepath"
_ "github.com/influxdata/telegraf/plugins/processors/override" _ "github.com/influxdata/telegraf/plugins/processors/override"
_ "github.com/influxdata/telegraf/plugins/processors/parser" _ "github.com/influxdata/telegraf/plugins/processors/parser"
_ "github.com/influxdata/telegraf/plugins/processors/pivot" _ "github.com/influxdata/telegraf/plugins/processors/pivot"

View File

@ -0,0 +1,207 @@
# Filepath Processor Plugin
The `filepath` processor plugin maps certain go functions from [path/filepath](https://golang.org/pkg/path/filepath/)
onto tag and field values. Values can be modified in place or stored in another key.
Implemented functions are:
* [Base](https://golang.org/pkg/path/filepath/#Base) (accessible through `[[processors.filepath.basename]]`)
* [Rel](https://golang.org/pkg/path/filepath/#Rel) (accessible through `[[processors.filepath.rel]]`)
* [Dir](https://golang.org/pkg/path/filepath/#Dir) (accessible through `[[processors.filepath.dir]]`)
* [Clean](https://golang.org/pkg/path/filepath/#Clean) (accessible through `[[processors.filepath.clean]]`)
* [ToSlash](https://golang.org/pkg/path/filepath/#ToSlash) (accessible through `[[processors.filepath.toslash]]`)
On top of that, the plugin provides an extra function to retrieve the final path component without its extension. This
function is accessible through the `[[processors.filepath.stem]]` configuration item.
Please note that, in this implementation, these functions are processed in the order that they appear above( except for
`stem` that is applied in the first place).
Specify the `tag` and/or `field` that you want processed in each section and optionally a `dest` if you want the result
stored in a new tag or field.
If you plan to apply multiple transformations to the same `tag`/`field`, bear in mind the processing order stated above.
## Configuration
```toml
[[processors.filepath]]
## Treat the tag value as a path and convert it to its last element, storing the result in a new tag
# [[processors.filepath.basename]]
# tag = "path"
# dest = "basepath"
## Treat the field value as a path and keep all but the last element of path, typically the path's directory
# [[processors.filepath.dirname]]
# field = "path"
## Treat the tag value as a path, converting it to its the last element without its suffix
# [[processors.filepath.stem]]
# tag = "path"
## Treat the tag value as a path, converting it to the shortest path name equivalent
## to path by purely lexical processing
# [[processors.filepath.clean]]
# tag = "path"
## Treat the tag value as a path, converting it to a relative path that is lexically
## equivalent to the source path when joined to 'base_path'
# [[processors.filepath.rel]]
# tag = "path"
# base_path = "/var/log"
## Treat the tag value as a path, replacing each separator character in path with a '/' character. Has only
## effect on Windows
# [[processors.filepath.toslash]]
# tag = "path"
```
## Considerations
### Clean
Even though `clean` is provided a standalone function, it is also invoked when using the `rel` and `dirname` functions,
so there is no need to use it along with them.
That is:
```toml
[[processors.filepath]]
[[processors.filepath.dir]]
tag = "path"
[[processors.filepath.clean]]
tag = "path"
```
Is equivalent to:
```toml
[[processors.filepath]]
[[processors.filepath.dir]]
tag = "path"
```
### ToSlash
The effects of this function are only noticeable on Windows platforms, because of the underlying golang implementation.
## Examples
### Basename
```toml
[[processors.filepath]]
[[processors.filepath.basename]]
tag = "path"
```
```diff
- my_metric,path="/var/log/batch/ajob.log" duration_seconds=134 1587920425000000000
+ my_metric,path="ajob.log" duration_seconds=134 1587920425000000000
```
### Dirname
```toml
[[processors.filepath]]
[[processors.filepath.dirname]]
field = "path"
dest = "folder"
```
```diff
- my_metric path="/var/log/batch/ajob.log",duration_seconds=134 1587920425000000000
+ my_metric path="/var/log/batch/ajob.log",folder="/var/log/batch",duration_seconds=134 1587920425000000000
```
### Stem
```toml
[[processors.filepath]]
[[processors.filepath.stem]]
tag = "path"
```
```diff
- my_metric,path="/var/log/batch/ajob.log" duration_seconds=134 1587920425000000000
+ my_metric,path="ajob" duration_seconds=134 1587920425000000000
```
### Clean
```toml
[[processors.filepath]]
[[processors.filepath.clean]]
tag = "path"
```
```diff
- my_metric,path="/var/log/dummy/../batch//ajob.log" duration_seconds=134 1587920425000000000
+ my_metric,path="/var/log/batch/ajob.log" duration_seconds=134 1587920425000000000
```
### Rel
```toml
[[processors.filepath]]
[[processors.filepath.rel]]
tag = "path"
base_path = "/var/log"
```
```diff
- my_metric,path="/var/log/batch/ajob.log" duration_seconds=134 1587920425000000000
+ my_metric,path="batch/ajob.log" duration_seconds=134 1587920425000000000
```
### ToSlash
```toml
[[processors.filepath]]
[[processors.filepath.rel]]
tag = "path"
```
```diff
- my_metric,path="\var\log\batch\ajob.log" duration_seconds=134 1587920425000000000
+ my_metric,path="/var/log/batch/ajob.log" duration_seconds=134 1587920425000000000
```
### Processing paths from tail plugin
This plugin can be used together with the
[tail input plugn](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/tail) to make modifications
to the `path` tag injected for every file.
Scenario:
* A log file `/var/log/myjobs/mysql_backup.log`, containing logs for a job execution. Whenever the job ends, a line is
written to the log file following this format: `2020-04-05 11:45:21 total time execution: 70 seconds`
* We want to generate a measurement that captures the duration of the script as a field and includes the `path` as a
tag
* We are interested in the filename without its extensions, since it might be enough information for plotting our
execution times in a dashboard
* Just in case, we don't want to override the original path (if for some reason we end up having duplicates we might
want this information)
For this purpose, we will use the `tail` input plugin, the `grok` parser plugin and the `filepath` processor.
```toml
[[inputs.tail]]
files = ["/var/log/myjobs/**.log"]
data_format = "grok"
grok_patterns = ['%{TIMESTAMP_ISO8601:timestamp:ts-"2006-01-02 15:04:05"} total time execution: %{NUMBER:duration_seconds:int}']
name_override = "myjobs"
[[processors.filepath]]
[[processors.filepath.stem]]
tag = "path"
dest = "stempath"
```
The resulting output for a job taking 70 seconds for the mentioned log file would look like:
```text
myjobs_duration_seconds,host="my-host",path="/var/log/myjobs/mysql_backup.log",stempath="mysql_backup" 70 1587920425000000000
```

View File

@ -0,0 +1,150 @@
package filepath
import (
"path/filepath"
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/processors"
)
type Options struct {
BaseName []BaseOpts `toml:"basename"`
DirName []BaseOpts `toml:"dirname"`
Stem []BaseOpts
Clean []BaseOpts
Rel []RelOpts
ToSlash []BaseOpts `toml:"toslash"`
}
type ProcessorFunc func(s string) string
// BaseOpts contains options applicable to every function
type BaseOpts struct {
Field string
Tag string
Dest string
}
type RelOpts struct {
BaseOpts
BasePath string
}
const sampleConfig = `
## Treat the tag value as a path and convert it to its last element, storing the result in a new tag
# [[processors.filepath.basename]]
# tag = "path"
# dest = "basepath"
## Treat the field value as a path and keep all but the last element of path, typically the path's directory
# [[processors.filepath.dirname]]
# field = "path"
## Treat the tag value as a path, converting it to its the last element without its suffix
# [[processors.filepath.stem]]
# tag = "path"
## Treat the tag value as a path, converting it to the shortest path name equivalent
## to path by purely lexical processing
# [[processors.filepath.clean]]
# tag = "path"
## Treat the tag value as a path, converting it to a relative path that is lexically
## equivalent to the source path when joined to 'base_path'
# [[processors.filepath.rel]]
# tag = "path"
# base_path = "/var/log"
## Treat the tag value as a path, replacing each separator character in path with a '/' character. Has only
## effect on Windows
# [[processors.filepath.toslash]]
# tag = "path"
`
func (o *Options) SampleConfig() string {
return sampleConfig
}
func (o *Options) Description() string {
return "Performs file path manipulations on tags and fields"
}
// applyFunc applies the specified function to the metric
func (o *Options) applyFunc(bo BaseOpts, fn ProcessorFunc, metric telegraf.Metric) {
if bo.Tag != "" {
if v, ok := metric.GetTag(bo.Tag); ok {
targetTag := bo.Tag
if bo.Dest != "" {
targetTag = bo.Dest
}
metric.AddTag(targetTag, fn(v))
}
}
if bo.Field != "" {
if v, ok := metric.GetField(bo.Field); ok {
targetField := bo.Field
if bo.Dest != "" {
targetField = bo.Dest
}
// Only string fields are considered
if v, ok := v.(string); ok {
metric.AddField(targetField, fn(v))
}
}
}
}
func stemFilePath(path string) string {
return strings.TrimSuffix(filepath.Base(path), filepath.Ext(path))
}
// processMetric processes fields and tag values for a given metric applying the selected transformations
func (o *Options) processMetric(metric telegraf.Metric) {
// Stem
for _, v := range o.Stem {
o.applyFunc(v, stemFilePath, metric)
}
// Basename
for _, v := range o.BaseName {
o.applyFunc(v, filepath.Base, metric)
}
// Rel
for _, v := range o.Rel {
o.applyFunc(v.BaseOpts, func(s string) string {
relPath, _ := filepath.Rel(v.BasePath, s)
return relPath
}, metric)
}
// Dirname
for _, v := range o.DirName {
o.applyFunc(v, filepath.Dir, metric)
}
// Clean
for _, v := range o.Clean {
o.applyFunc(v, filepath.Clean, metric)
}
// ToSlash
for _, v := range o.ToSlash {
o.applyFunc(v, filepath.ToSlash, metric)
}
}
func (o *Options) Apply(in ...telegraf.Metric) []telegraf.Metric {
for _, m := range in {
o.processMetric(m)
}
return in
}
func init() {
processors.Add("filepath", func() telegraf.Processor {
return &Options{}
})
}

View File

@ -0,0 +1,70 @@
// +build !windows
package filepath
import (
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
)
var samplePath = "/my/test//c/../path/file.log"
func TestOptions_Apply(t *testing.T) {
tests := []testCase{
{
name: "Smoke Test",
o: newOptions("/my/test/"),
inputMetrics: getSmokeTestInputMetrics(samplePath),
expectedMetrics: []telegraf.Metric{
testutil.MustMetric(
smokeMetricName,
map[string]string{
"baseTag": "file.log",
"dirTag": "/my/test/path",
"stemTag": "file",
"cleanTag": "/my/test/path/file.log",
"relTag": "path/file.log",
"slashTag": "/my/test//c/../path/file.log",
},
map[string]interface{}{
"baseField": "file.log",
"dirField": "/my/test/path",
"stemField": "file",
"cleanField": "/my/test/path/file.log",
"relField": "path/file.log",
"slashField": "/my/test//c/../path/file.log",
},
time.Now()),
},
},
{
name: "Test Dest Option",
o: &Options{
BaseName: []BaseOpts{
{
Field: "sourcePath",
Tag: "sourcePath",
Dest: "basePath",
},
}},
inputMetrics: []telegraf.Metric{
testutil.MustMetric(
"testMetric",
map[string]string{"sourcePath": samplePath},
map[string]interface{}{"sourcePath": samplePath},
time.Now()),
},
expectedMetrics: []telegraf.Metric{
testutil.MustMetric(
"testMetric",
map[string]string{"sourcePath": samplePath, "basePath": "file.log"},
map[string]interface{}{"sourcePath": samplePath, "basePath": "file.log"},
time.Now()),
},
},
}
runTestOptionsApply(t, tests)
}

View File

@ -0,0 +1,100 @@
package filepath
import (
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
)
const smokeMetricName = "testmetric"
type testCase struct {
name string
o *Options
inputMetrics []telegraf.Metric
expectedMetrics []telegraf.Metric
}
func newOptions(basePath string) *Options {
return &Options{
BaseName: []BaseOpts{
{
Field: "baseField",
Tag: "baseTag",
},
},
DirName: []BaseOpts{
{
Field: "dirField",
Tag: "dirTag",
},
},
Stem: []BaseOpts{
{
Field: "stemField",
Tag: "stemTag",
},
},
Clean: []BaseOpts{
{
Field: "cleanField",
Tag: "cleanTag",
},
},
Rel: []RelOpts{
{
BaseOpts: BaseOpts{
Field: "relField",
Tag: "relTag",
},
BasePath: basePath,
},
},
ToSlash: []BaseOpts{
{
Field: "slashField",
Tag: "slashTag",
},
},
}
}
func getSampleMetricTags(path string) map[string]string {
return map[string]string{
"baseTag": path,
"dirTag": path,
"stemTag": path,
"cleanTag": path,
"relTag": path,
"slashTag": path,
}
}
func getSampleMetricFields(path string) map[string]interface{} {
return map[string]interface{}{
"baseField": path,
"dirField": path,
"stemField": path,
"cleanField": path,
"relField": path,
"slashField": path,
}
}
func getSmokeTestInputMetrics(path string) []telegraf.Metric {
return []telegraf.Metric{
testutil.MustMetric(smokeMetricName, getSampleMetricTags(path), getSampleMetricFields(path),
time.Now()),
}
}
func runTestOptionsApply(t *testing.T, tests []testCase) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.o.Apply(tt.inputMetrics...)
testutil.RequireMetricsEqual(t, tt.expectedMetrics, got, testutil.SortMetrics(), testutil.IgnoreTime())
})
}
}

View File

@ -0,0 +1,43 @@
package filepath
import (
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
)
var samplePath = "c:\\my\\test\\\\c\\..\\path\\file.log"
func TestOptions_Apply(t *testing.T) {
tests := []testCase{
{
name: "Smoke Test",
o: newOptions("c:\\my\\test\\"),
inputMetrics: getSmokeTestInputMetrics(samplePath),
expectedMetrics: []telegraf.Metric{
testutil.MustMetric(
smokeMetricName,
map[string]string{
"baseTag": "file.log",
"dirTag": "c:\\my\\test\\path",
"stemTag": "file",
"cleanTag": "c:\\my\\test\\path\\file.log",
"relTag": "path\\file.log",
"slashTag": "c:/my/test//c/../path/file.log",
},
map[string]interface{}{
"baseField": "file.log",
"dirField": "c:\\my\\test\\path",
"stemField": "file",
"cleanField": "c:\\my\\test\\path\\file.log",
"relField": "path\\file.log",
"slashField": "c:/my/test//c/../path/file.log",
},
time.Now()),
},
},
}
runTestOptionsApply(t, tests)
}