diff --git a/CHANGELOG.md b/CHANGELOG.md index 3df83322a..f1fd5ed43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,20 @@ ## v0.13 [unreleased] +### Release Notes +- `tagexclude` and `tagexclude` are now available, which can be used to remove +tags from measurements on inputs and outputs. See +[the configuration doc](https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md) +for more details. +- **Measurement filtering:** All measurement filters now match based on glob +only. Previously there was an undocumented behavior where filters would match +based on _prefix_ in addition to globs. This means that a filter like +`fielddrop = ["time_"]` will need to be changed to `fielddrop = ["time_*"]` + +### Features +- [#1017](https://github.com/influxdata/telegraf/pull/1017): taginclude and tagexclude arguments. + +### Bugfixes + ## v0.12.1 [2016-04-14] ### Release Notes diff --git a/Godeps b/Godeps index 14430ea5d..71057f497 100644 --- a/Godeps +++ b/Godeps @@ -16,6 +16,7 @@ github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3 github.com/eapache/queue ded5959c0d4e360646dc9e9908cff48666781367 github.com/eclipse/paho.mqtt.golang 0f7a459f04f13a41b7ed752d47944528d4bf9a86 github.com/go-sql-driver/mysql 1fca743146605a172a266e1654e01e5cd5669bee +github.com/gobwas/glob d877f6352135181470c40c73ebb81aefa22115fa github.com/golang/protobuf 552c7b9542c194800fd493123b3798ef0a832032 github.com/golang/snappy 427fb6fc07997f43afa32f35e850833760e489a7 github.com/gonuts/go-shellquote e842a11b24c6abfb3dd27af69a17f482e4b483c2 diff --git a/agent/accumulator.go b/agent/accumulator.go index 7ec22cd7f..6b2ffde2d 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -96,6 +96,7 @@ func (ac *accumulator) AddFields( tags[k] = v } } + ac.inputConfig.Filter.FilterTags(tags) result := make(map[string]interface{}) for k, v := range fields { diff --git a/agent/accumulator_test.go b/agent/accumulator_test.go index 05f9b02aa..ee8f65e48 100644 --- a/agent/accumulator_test.go +++ b/agent/accumulator_test.go @@ -300,3 +300,35 @@ func TestAddBools(t *testing.T) { fmt.Sprintf("acctest,acc=test,default=tag value=false %d", now.UnixNano()), actual) } + +// Test that tag filters get applied to metrics. +func TestAccFilterTags(t *testing.T) { + a := accumulator{} + now := time.Now() + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + filter := internal_models.Filter{ + TagExclude: []string{"acc"}, + } + assert.NoError(t, filter.CompileFilter()) + a.inputConfig = &internal_models.InputConfig{} + a.inputConfig.Filter = filter + + a.Add("acctest", float64(101), map[string]string{}) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Contains(t, actual, "acctest value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest value=101 %d", now.UnixNano()), + actual) +} diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 0afaa120f..9f783f87a 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -3,11 +3,20 @@ ## Generating a Configuration File A default Telegraf config file can be generated using the -sample-config flag: -`telegraf -sample-config > telegraf.conf` + +``` +telegraf -sample-config > telegraf.conf +``` To generate a file with specific inputs and outputs, you can use the -input-filter and -output-filter flags: -`telegraf -sample-config -input-filter cpu:mem:net:swap -output-filter influxdb:kafka` + +``` +telegraf -sample-config -input-filter cpu:mem:net:swap -output-filter influxdb:kafka +``` + +You can see the latest config file with all available plugins +[here](https://github.com/influxdata/telegraf/blob/master/etc/telegraf.conf) ## Environment Variables @@ -17,8 +26,8 @@ for numbers and booleans they should be plain (ie, $INT_VAR, $BOOL_VAR) ## `[global_tags]` Configuration -Global tags can be specific in the `[global_tags]` section of the config file in -key="value" format. All metrics being gathered on this host will be tagged +Global tags can be specified in the `[global_tags]` section of the config file +in key="value" format. All metrics being gathered on this host will be tagged with the tags specified here. ## `[agent]` Configuration @@ -47,9 +56,35 @@ ie, a jitter of 5s and flush_interval 10s means flushes will happen every 10-15s * **quiet**: Run telegraf in quiet mode. * **hostname**: Override default hostname, if empty use os.Hostname(). -## `[inputs.xxx]` Configuration +#### Measurement Filtering -There are some configuration options that are configurable per input: +Filters can be configured per input or output, see below for examples. + +* **namepass**: An array of strings that is used to filter metrics generated by the +current input. Each string in the array is tested as a glob match against +measurement names and if it matches, the field is emitted. +* **namedrop**: The inverse of pass, if a measurement name matches, it is not emitted. +* **fieldpass**: An array of strings that is used to filter metrics generated by the +current input. Each string in the array is tested as a glob match against field names +and if it matches, the field is emitted. fieldpass is not available for outputs. +* **fielddrop**: The inverse of pass, if a field name matches, it is not emitted. +fielddrop is not available for outputs. +* **tagpass**: tag names and arrays of strings that are used to filter +measurements by the current input. Each string in the array is tested as a glob +match against the tag name, and if it matches the measurement is emitted. +* **tagdrop**: The inverse of tagpass. If a tag matches, the measurement is not +emitted. This is tested on measurements that have passed the tagpass test. +* **tagexclude**: tagexclude can be used to exclude a tag from measurement(s). +As opposed to tagdrop, which will drop an entire measurement based on it's +tags, tagexclude simply strips the given tag keys from the measurement. This +can be used on inputs & outputs, but it is _recommended_ to be used on inputs, +as it is more efficient to filter out tags at the ingestion point. +* **taginclude**: taginclude is the inverse of tagexclude. It will only include +the tag keys in the final measurement. + +## Input Configuration + +Some configuration options are configurable per input: * **name_override**: Override the base name of the measurement. (Default is the name of the input). @@ -60,24 +95,6 @@ There are some configuration options that are configurable per input: global interval, but if one particular input should be run less or more often, you can configure that here. -#### Input Filters - -There are also filters that can be configured per input: - -* **namepass**: An array of strings that is used to filter metrics generated by the -current input. Each string in the array is tested as a glob match against -measurement names and if it matches, the field is emitted. -* **namedrop**: The inverse of pass, if a measurement name matches, it is not emitted. -* **fieldpass**: An array of strings that is used to filter metrics generated by the -current input. Each string in the array is tested as a glob match against field names -and if it matches, the field is emitted. -* **fielddrop**: The inverse of pass, if a field name matches, it is not emitted. -* **tagpass**: tag names and arrays of strings that are used to filter -measurements by the current input. Each string in the array is tested as a glob -match against the tag name, and if it matches the measurement is emitted. -* **tagdrop**: The inverse of tagpass. If a tag matches, the measurement is not -emitted. This is tested on measurements that have passed the tagpass test. - #### Input Configuration Examples This is a full working config that will output CPU data to an InfluxDB instance @@ -155,6 +172,20 @@ fields which begin with `time_`. namepass = ["rest_client_*"] ``` +#### Input Config: taginclude and tagexclude + +```toml +# Only include the "cpu" tag in the measurements for the cpu plugin. +[[inputs.cpu]] + percpu = true + totalcpu = true + taginclude = ["cpu"] + +# Exclude the "fstype" tag from the measurements for the disk plugin. +[[inputs.disk]] + tagexclude = ["fstype"] +``` + #### Input config: prefix, suffix, and override This plugin will emit measurements with the name `cpu_total` @@ -180,6 +211,9 @@ This will emit measurements with the name `foobar` This plugin will emit measurements with two additional tags: `tag1=foo` and `tag2=bar` +NOTE: Order matters, the `[inputs.cpu.tags]` table must be at the _end_ of the +plugin definition. + ```toml [[inputs.cpu]] percpu = false @@ -208,15 +242,12 @@ to avoid measurement collisions: fielddrop = ["cpu_time*"] ``` -## `[outputs.xxx]` Configuration +## Output Configuration Telegraf also supports specifying multiple output sinks to send data to, configuring each output sink is different, but examples can be found by running `telegraf -sample-config`. -Outputs also support the same configurable options as inputs -(namepass, namedrop, tagpass, tagdrop) - ```toml [[outputs.influxdb]] urls = [ "http://localhost:8086" ] diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index c8f3b0926..d448872f6 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -28,6 +28,5 @@ - github.com/wvanbergen/kazoo-go [MIT LICENSE](https://github.com/wvanbergen/kazoo-go/blob/master/MIT-LICENSE) - gopkg.in/dancannon/gorethink.v1 [APACHE LICENSE](https://github.com/dancannon/gorethink/blob/v1.1.2/LICENSE) - gopkg.in/mgo.v2 [BSD LICENSE](https://github.com/go-mgo/mgo/blob/v2/LICENSE) -- golang.org/x/crypto/* [BSD LICENSE](https://github.com/golang/crypto/blob/master/LICENSE) -- internal Glob function [MIT LICENSE](https://github.com/ryanuber/go-glob/blob/master/LICENSE) +- golang.org/x/crypto/ [BSD LICENSE](https://github.com/golang/crypto/blob/master/LICENSE) diff --git a/internal/config/config.go b/internal/config/config.go index cfd6c9593..5d0836964 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -580,9 +580,9 @@ func (c *Config) addInput(name string, table *ast.Table) error { // buildFilter builds a Filter // (tagpass/tagdrop/namepass/namedrop/fieldpass/fielddrop) to -// be inserted into the internal_models.OutputConfig/internal_models.InputConfig to be used for prefix -// filtering on tags and measurements -func buildFilter(tbl *ast.Table) internal_models.Filter { +// be inserted into the internal_models.OutputConfig/internal_models.InputConfig +// to be used for glob filtering on tags and measurements +func buildFilter(tbl *ast.Table) (internal_models.Filter, error) { f := internal_models.Filter{} if node, ok := tbl.Fields["namepass"]; ok { @@ -681,6 +681,33 @@ func buildFilter(tbl *ast.Table) internal_models.Filter { } } + if node, ok := tbl.Fields["tagexclude"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + f.TagExclude = append(f.TagExclude, str.Value) + } + } + } + } + } + + if node, ok := tbl.Fields["taginclude"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + f.TagInclude = append(f.TagInclude, str.Value) + } + } + } + } + } + if err := f.CompileFilter(); err != nil { + return f, err + } + delete(tbl.Fields, "namedrop") delete(tbl.Fields, "namepass") delete(tbl.Fields, "fielddrop") @@ -689,7 +716,9 @@ func buildFilter(tbl *ast.Table) internal_models.Filter { delete(tbl.Fields, "pass") delete(tbl.Fields, "tagdrop") delete(tbl.Fields, "tagpass") - return f + delete(tbl.Fields, "tagexclude") + delete(tbl.Fields, "taginclude") + return f, nil } // buildInput parses input specific items from the ast.Table, @@ -748,7 +777,11 @@ func buildInput(name string, tbl *ast.Table) (*internal_models.InputConfig, erro delete(tbl.Fields, "name_override") delete(tbl.Fields, "interval") delete(tbl.Fields, "tags") - cp.Filter = buildFilter(tbl) + var err error + cp.Filter, err = buildFilter(tbl) + if err != nil { + return cp, err + } return cp, nil } @@ -864,13 +897,18 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error return serializers.NewSerializer(c) } -// buildOutput parses output specific items from the ast.Table, builds the filter and returns an +// buildOutput parses output specific items from the ast.Table, +// builds the filter and returns an // internal_models.OutputConfig to be inserted into internal_models.RunningInput // Note: error exists in the return for future calls that might require error func buildOutput(name string, tbl *ast.Table) (*internal_models.OutputConfig, error) { + filter, err := buildFilter(tbl) + if err != nil { + return nil, err + } oc := &internal_models.OutputConfig{ Name: name, - Filter: buildFilter(tbl), + Filter: filter, } // Outputs don't support FieldDrop/FieldPass, so set to NameDrop/NamePass if len(oc.Filter.FieldDrop) > 0 { diff --git a/internal/config/config_test.go b/internal/config/config_test.go index d78a8d6b8..1659cd6ec 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -26,27 +26,29 @@ func TestConfig_LoadSingleInputWithEnvVars(t *testing.T) { memcached := inputs.Inputs["memcached"]().(*memcached.Memcached) memcached.Servers = []string{"192.168.1.1"} - mConfig := &internal_models.InputConfig{ - Name: "memcached", - Filter: internal_models.Filter{ - NameDrop: []string{"metricname2"}, - NamePass: []string{"metricname1"}, - FieldDrop: []string{"other", "stuff"}, - FieldPass: []string{"some", "strings"}, - TagDrop: []internal_models.TagFilter{ - internal_models.TagFilter{ - Name: "badtag", - Filter: []string{"othertag"}, - }, + filter := internal_models.Filter{ + NameDrop: []string{"metricname2"}, + NamePass: []string{"metricname1"}, + FieldDrop: []string{"other", "stuff"}, + FieldPass: []string{"some", "strings"}, + TagDrop: []internal_models.TagFilter{ + internal_models.TagFilter{ + Name: "badtag", + Filter: []string{"othertag"}, }, - TagPass: []internal_models.TagFilter{ - internal_models.TagFilter{ - Name: "goodtag", - Filter: []string{"mytag"}, - }, - }, - IsActive: true, }, + TagPass: []internal_models.TagFilter{ + internal_models.TagFilter{ + Name: "goodtag", + Filter: []string{"mytag"}, + }, + }, + IsActive: true, + } + assert.NoError(t, filter.CompileFilter()) + mConfig := &internal_models.InputConfig{ + Name: "memcached", + Filter: filter, Interval: 10 * time.Second, } mConfig.Tags = make(map[string]string) @@ -64,27 +66,29 @@ func TestConfig_LoadSingleInput(t *testing.T) { memcached := inputs.Inputs["memcached"]().(*memcached.Memcached) memcached.Servers = []string{"localhost"} - mConfig := &internal_models.InputConfig{ - Name: "memcached", - Filter: internal_models.Filter{ - NameDrop: []string{"metricname2"}, - NamePass: []string{"metricname1"}, - FieldDrop: []string{"other", "stuff"}, - FieldPass: []string{"some", "strings"}, - TagDrop: []internal_models.TagFilter{ - internal_models.TagFilter{ - Name: "badtag", - Filter: []string{"othertag"}, - }, + filter := internal_models.Filter{ + NameDrop: []string{"metricname2"}, + NamePass: []string{"metricname1"}, + FieldDrop: []string{"other", "stuff"}, + FieldPass: []string{"some", "strings"}, + TagDrop: []internal_models.TagFilter{ + internal_models.TagFilter{ + Name: "badtag", + Filter: []string{"othertag"}, }, - TagPass: []internal_models.TagFilter{ - internal_models.TagFilter{ - Name: "goodtag", - Filter: []string{"mytag"}, - }, - }, - IsActive: true, }, + TagPass: []internal_models.TagFilter{ + internal_models.TagFilter{ + Name: "goodtag", + Filter: []string{"mytag"}, + }, + }, + IsActive: true, + } + assert.NoError(t, filter.CompileFilter()) + mConfig := &internal_models.InputConfig{ + Name: "memcached", + Filter: filter, Interval: 5 * time.Second, } mConfig.Tags = make(map[string]string) @@ -109,27 +113,29 @@ func TestConfig_LoadDirectory(t *testing.T) { memcached := inputs.Inputs["memcached"]().(*memcached.Memcached) memcached.Servers = []string{"localhost"} - mConfig := &internal_models.InputConfig{ - Name: "memcached", - Filter: internal_models.Filter{ - NameDrop: []string{"metricname2"}, - NamePass: []string{"metricname1"}, - FieldDrop: []string{"other", "stuff"}, - FieldPass: []string{"some", "strings"}, - TagDrop: []internal_models.TagFilter{ - internal_models.TagFilter{ - Name: "badtag", - Filter: []string{"othertag"}, - }, + filter := internal_models.Filter{ + NameDrop: []string{"metricname2"}, + NamePass: []string{"metricname1"}, + FieldDrop: []string{"other", "stuff"}, + FieldPass: []string{"some", "strings"}, + TagDrop: []internal_models.TagFilter{ + internal_models.TagFilter{ + Name: "badtag", + Filter: []string{"othertag"}, }, - TagPass: []internal_models.TagFilter{ - internal_models.TagFilter{ - Name: "goodtag", - Filter: []string{"mytag"}, - }, - }, - IsActive: true, }, + TagPass: []internal_models.TagFilter{ + internal_models.TagFilter{ + Name: "goodtag", + Filter: []string{"mytag"}, + }, + }, + IsActive: true, + } + assert.NoError(t, filter.CompileFilter()) + mConfig := &internal_models.InputConfig{ + Name: "memcached", + Filter: filter, Interval: 5 * time.Second, } mConfig.Tags = make(map[string]string) diff --git a/internal/internal.go b/internal/internal.go index ff73aae84..4b8e1536f 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -139,59 +139,3 @@ func SnakeCase(in string) string { return string(out) } - -// Glob will test a string pattern, potentially containing globs, against a -// subject string. The result is a simple true/false, determining whether or -// not the glob pattern matched the subject text. -// -// Adapted from https://github.com/ryanuber/go-glob/blob/master/glob.go -// thanks Ryan Uber! -func Glob(pattern, measurement string) bool { - // Empty pattern can only match empty subject - if pattern == "" { - return measurement == pattern - } - - // If the pattern _is_ a glob, it matches everything - if pattern == "*" { - return true - } - - parts := strings.Split(pattern, "*") - - if len(parts) == 1 { - // No globs in pattern, so test for match - return pattern == measurement - } - - leadingGlob := strings.HasPrefix(pattern, "*") - trailingGlob := strings.HasSuffix(pattern, "*") - end := len(parts) - 1 - - for i, part := range parts { - switch i { - case 0: - if leadingGlob { - continue - } - if !strings.HasPrefix(measurement, part) { - return false - } - case end: - if len(measurement) > 0 { - return trailingGlob || strings.HasSuffix(measurement, part) - } - default: - if !strings.Contains(measurement, part) { - return false - } - } - - // Trim evaluated text from measurement as we loop over the pattern. - idx := strings.Index(measurement, part) + len(part) - measurement = measurement[idx:] - } - - // All parts of the pattern matched - return true -} diff --git a/internal/internal_test.go b/internal/internal_test.go index e4a5eed14..7ff64e87b 100644 --- a/internal/internal_test.go +++ b/internal/internal_test.go @@ -2,47 +2,6 @@ package internal import "testing" -func testGlobMatch(t *testing.T, pattern, subj string) { - if !Glob(pattern, subj) { - t.Errorf("%s should match %s", pattern, subj) - } -} - -func testGlobNoMatch(t *testing.T, pattern, subj string) { - if Glob(pattern, subj) { - t.Errorf("%s should not match %s", pattern, subj) - } -} - -func TestEmptyPattern(t *testing.T) { - testGlobMatch(t, "", "") - testGlobNoMatch(t, "", "test") -} - -func TestPatternWithoutGlobs(t *testing.T) { - testGlobMatch(t, "test", "test") -} - -func TestGlob(t *testing.T) { - for _, pattern := range []string{ - "*test", // Leading glob - "this*", // Trailing glob - "*is*a*", // Lots of globs - "**test**", // Double glob characters - "**is**a***test*", // Varying number of globs - } { - testGlobMatch(t, pattern, "this_is_a_test") - } - - for _, pattern := range []string{ - "test*", // Implicit substring match should fail - "*is", // Partial match should fail - "*no*", // Globs without a match between them should fail - } { - testGlobNoMatch(t, pattern, "this_is_a_test") - } -} - type SnakeTest struct { input string output string diff --git a/internal/models/filter.go b/internal/models/filter.go index e2b1377f4..d78492a5d 100644 --- a/internal/models/filter.go +++ b/internal/models/filter.go @@ -1,33 +1,104 @@ package internal_models import ( + "fmt" "strings" + "github.com/gobwas/glob" + "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" ) // TagFilter is the name of a tag, and the values on which to filter type TagFilter struct { Name string Filter []string + filter glob.Glob } // Filter containing drop/pass and tagdrop/tagpass rules type Filter struct { NameDrop []string + nameDrop glob.Glob NamePass []string + namePass glob.Glob FieldDrop []string + fieldDrop glob.Glob FieldPass []string + fieldPass glob.Glob TagDrop []TagFilter TagPass []TagFilter + TagExclude []string + tagExclude glob.Glob + TagInclude []string + tagInclude glob.Glob + IsActive bool } -func (f Filter) ShouldMetricPass(metric telegraf.Metric) bool { +// Compile all Filter lists into glob.Glob objects. +func (f *Filter) CompileFilter() error { + var err error + f.nameDrop, err = compileFilter(f.NameDrop) + if err != nil { + return fmt.Errorf("Error compiling 'namedrop', %s", err) + } + f.namePass, err = compileFilter(f.NamePass) + if err != nil { + return fmt.Errorf("Error compiling 'namepass', %s", err) + } + + f.fieldDrop, err = compileFilter(f.FieldDrop) + if err != nil { + return fmt.Errorf("Error compiling 'fielddrop', %s", err) + } + f.fieldPass, err = compileFilter(f.FieldPass) + if err != nil { + return fmt.Errorf("Error compiling 'fieldpass', %s", err) + } + + f.tagExclude, err = compileFilter(f.TagExclude) + if err != nil { + return fmt.Errorf("Error compiling 'tagexclude', %s", err) + } + f.tagInclude, err = compileFilter(f.TagInclude) + if err != nil { + return fmt.Errorf("Error compiling 'taginclude', %s", err) + } + + for i, _ := range f.TagDrop { + f.TagDrop[i].filter, err = compileFilter(f.TagDrop[i].Filter) + if err != nil { + return fmt.Errorf("Error compiling 'tagdrop', %s", err) + } + } + for i, _ := range f.TagPass { + f.TagPass[i].filter, err = compileFilter(f.TagPass[i].Filter) + if err != nil { + return fmt.Errorf("Error compiling 'tagpass', %s", err) + } + } + return nil +} + +func compileFilter(filter []string) (glob.Glob, error) { + if len(filter) == 0 { + return nil, nil + } + var g glob.Glob + var err error + if len(filter) == 1 { + g, err = glob.Compile(filter[0]) + } else { + g, err = glob.Compile("{" + strings.Join(filter, ",") + "}") + } + return g, err +} + +func (f *Filter) ShouldMetricPass(metric telegraf.Metric) bool { if f.ShouldNamePass(metric.Name()) && f.ShouldTagsPass(metric.Tags()) { return true } @@ -36,70 +107,51 @@ func (f Filter) ShouldMetricPass(metric telegraf.Metric) bool { // ShouldFieldsPass returns true if the metric should pass, false if should drop // based on the drop/pass filter parameters -func (f Filter) ShouldNamePass(key string) bool { - if f.NamePass != nil { - for _, pat := range f.NamePass { - // TODO remove HasPrefix check, leaving it for now for legacy support. - // Cam, 2015-12-07 - if strings.HasPrefix(key, pat) || internal.Glob(pat, key) { - return true - } +func (f *Filter) ShouldNamePass(key string) bool { + if f.namePass != nil { + if f.namePass.Match(key) { + return true } return false } - if f.NameDrop != nil { - for _, pat := range f.NameDrop { - // TODO remove HasPrefix check, leaving it for now for legacy support. - // Cam, 2015-12-07 - if strings.HasPrefix(key, pat) || internal.Glob(pat, key) { - return false - } + if f.nameDrop != nil { + if f.nameDrop.Match(key) { + return false } - - return true } return true } // ShouldFieldsPass returns true if the metric should pass, false if should drop // based on the drop/pass filter parameters -func (f Filter) ShouldFieldsPass(key string) bool { - if f.FieldPass != nil { - for _, pat := range f.FieldPass { - // TODO remove HasPrefix check, leaving it for now for legacy support. - // Cam, 2015-12-07 - if strings.HasPrefix(key, pat) || internal.Glob(pat, key) { - return true - } +func (f *Filter) ShouldFieldsPass(key string) bool { + if f.fieldPass != nil { + if f.fieldPass.Match(key) { + return true } return false } - if f.FieldDrop != nil { - for _, pat := range f.FieldDrop { - // TODO remove HasPrefix check, leaving it for now for legacy support. - // Cam, 2015-12-07 - if strings.HasPrefix(key, pat) || internal.Glob(pat, key) { - return false - } + if f.fieldDrop != nil { + if f.fieldDrop.Match(key) { + return false } - - return true } return true } // ShouldTagsPass returns true if the metric should pass, false if should drop // based on the tagdrop/tagpass filter parameters -func (f Filter) ShouldTagsPass(tags map[string]string) bool { +func (f *Filter) ShouldTagsPass(tags map[string]string) bool { if f.TagPass != nil { for _, pat := range f.TagPass { + if pat.filter == nil { + continue + } if tagval, ok := tags[pat.Name]; ok { - for _, filter := range pat.Filter { - if internal.Glob(filter, tagval) { - return true - } + if pat.filter.Match(tagval) { + return true } } } @@ -108,11 +160,12 @@ func (f Filter) ShouldTagsPass(tags map[string]string) bool { if f.TagDrop != nil { for _, pat := range f.TagDrop { + if pat.filter == nil { + continue + } if tagval, ok := tags[pat.Name]; ok { - for _, filter := range pat.Filter { - if internal.Glob(filter, tagval) { - return false - } + if pat.filter.Match(tagval) { + return false } } } @@ -121,3 +174,23 @@ func (f Filter) ShouldTagsPass(tags map[string]string) bool { return true } + +// Apply TagInclude and TagExclude filters. +// modifies the tags map in-place. +func (f *Filter) FilterTags(tags map[string]string) { + if f.tagInclude != nil { + for k, _ := range tags { + if !f.tagInclude.Match(k) { + delete(tags, k) + } + } + } + + if f.tagExclude != nil { + for k, _ := range tags { + if f.tagExclude.Match(k) { + delete(tags, k) + } + } + } +} diff --git a/internal/models/filter_test.go b/internal/models/filter_test.go index c69398494..a37416095 100644 --- a/internal/models/filter_test.go +++ b/internal/models/filter_test.go @@ -2,6 +2,11 @@ package internal_models import ( "testing" + + "github.com/influxdata/telegraf/testutil" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestFilter_Empty(t *testing.T) { @@ -28,6 +33,7 @@ func TestFilter_NamePass(t *testing.T) { f := Filter{ NamePass: []string{"foo*", "cpu_usage_idle"}, } + require.NoError(t, f.CompileFilter()) passes := []string{ "foo", @@ -61,6 +67,7 @@ func TestFilter_NameDrop(t *testing.T) { f := Filter{ NameDrop: []string{"foo*", "cpu_usage_idle"}, } + require.NoError(t, f.CompileFilter()) drops := []string{ "foo", @@ -94,6 +101,7 @@ func TestFilter_FieldPass(t *testing.T) { f := Filter{ FieldPass: []string{"foo*", "cpu_usage_idle"}, } + require.NoError(t, f.CompileFilter()) passes := []string{ "foo", @@ -127,6 +135,7 @@ func TestFilter_FieldDrop(t *testing.T) { f := Filter{ FieldDrop: []string{"foo*", "cpu_usage_idle"}, } + require.NoError(t, f.CompileFilter()) drops := []string{ "foo", @@ -169,6 +178,7 @@ func TestFilter_TagPass(t *testing.T) { f := Filter{ TagPass: filters, } + require.NoError(t, f.CompileFilter()) passes := []map[string]string{ {"cpu": "cpu-total"}, @@ -212,6 +222,7 @@ func TestFilter_TagDrop(t *testing.T) { f := Filter{ TagDrop: filters, } + require.NoError(t, f.CompileFilter()) drops := []map[string]string{ {"cpu": "cpu-total"}, @@ -241,3 +252,115 @@ func TestFilter_TagDrop(t *testing.T) { } } } + +func TestFilter_CompileFilterError(t *testing.T) { + f := Filter{ + NameDrop: []string{"", ""}, + } + assert.Error(t, f.CompileFilter()) + f = Filter{ + NamePass: []string{"", ""}, + } + assert.Error(t, f.CompileFilter()) + f = Filter{ + FieldDrop: []string{"", ""}, + } + assert.Error(t, f.CompileFilter()) + f = Filter{ + FieldPass: []string{"", ""}, + } + assert.Error(t, f.CompileFilter()) + f = Filter{ + TagExclude: []string{"", ""}, + } + assert.Error(t, f.CompileFilter()) + f = Filter{ + TagInclude: []string{"", ""}, + } + assert.Error(t, f.CompileFilter()) + filters := []TagFilter{ + TagFilter{ + Name: "cpu", + Filter: []string{"{foobar}"}, + }} + f = Filter{ + TagDrop: filters, + } + require.Error(t, f.CompileFilter()) + filters = []TagFilter{ + TagFilter{ + Name: "cpu", + Filter: []string{"{foobar}"}, + }} + f = Filter{ + TagPass: filters, + } + require.Error(t, f.CompileFilter()) +} + +func TestFilter_ShouldMetricsPass(t *testing.T) { + m := testutil.TestMetric(1, "testmetric") + f := Filter{ + NameDrop: []string{"foobar"}, + } + require.NoError(t, f.CompileFilter()) + require.True(t, f.ShouldMetricPass(m)) + + m = testutil.TestMetric(1, "foobar") + require.False(t, f.ShouldMetricPass(m)) +} + +func TestFilter_FilterTagsNoMatches(t *testing.T) { + pretags := map[string]string{ + "host": "localhost", + "mytag": "foobar", + } + f := Filter{ + TagExclude: []string{"nomatch"}, + } + require.NoError(t, f.CompileFilter()) + + f.FilterTags(pretags) + assert.Equal(t, map[string]string{ + "host": "localhost", + "mytag": "foobar", + }, pretags) + + f = Filter{ + TagInclude: []string{"nomatch"}, + } + require.NoError(t, f.CompileFilter()) + + f.FilterTags(pretags) + assert.Equal(t, map[string]string{}, pretags) +} + +func TestFilter_FilterTagsMatches(t *testing.T) { + pretags := map[string]string{ + "host": "localhost", + "mytag": "foobar", + } + f := Filter{ + TagExclude: []string{"ho*"}, + } + require.NoError(t, f.CompileFilter()) + + f.FilterTags(pretags) + assert.Equal(t, map[string]string{ + "mytag": "foobar", + }, pretags) + + pretags = map[string]string{ + "host": "localhost", + "mytag": "foobar", + } + f = Filter{ + TagInclude: []string{"my*"}, + } + require.NoError(t, f.CompileFilter()) + + f.FilterTags(pretags) + assert.Equal(t, map[string]string{ + "mytag": "foobar", + }, pretags) +} diff --git a/internal/models/running_output.go b/internal/models/running_output.go index 1e3d44a61..c76dffcdf 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -59,6 +59,19 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) { ro.Lock() defer ro.Unlock() + // Filter any tagexclude/taginclude parameters before adding metric + if len(ro.Config.Filter.TagExclude) != 0 || len(ro.Config.Filter.TagInclude) != 0 { + // In order to filter out tags, we need to create a new metric, since + // metrics are immutable once created. + tags := metric.Tags() + fields := metric.Fields() + t := metric.Time() + name := metric.Name() + ro.Config.Filter.FilterTags(tags) + // error is not possible if creating from another metric, so ignore. + metric, _ = telegraf.NewMetric(name, tags, fields, t) + } + if len(ro.metrics) < ro.MetricBufferLimit { ro.metrics = append(ro.metrics, metric) } else { diff --git a/internal/models/running_output_test.go b/internal/models/running_output_test.go index 6eee3bd11..9607f2417 100644 --- a/internal/models/running_output_test.go +++ b/internal/models/running_output_test.go @@ -29,6 +29,146 @@ var next5 = []telegraf.Metric{ testutil.TestMetric(101, "metric10"), } +// Test that NameDrop filters ger properly applied. +func TestRunningOutput_DropFilter(t *testing.T) { + conf := &OutputConfig{ + Filter: Filter{ + IsActive: true, + NameDrop: []string{"metric1", "metric2"}, + }, + } + assert.NoError(t, conf.Filter.CompileFilter()) + + m := &mockOutput{} + ro := NewRunningOutput("test", m, conf) + + for _, metric := range first5 { + ro.AddMetric(metric) + } + for _, metric := range next5 { + ro.AddMetric(metric) + } + assert.Len(t, m.Metrics(), 0) + + err := ro.Write() + assert.NoError(t, err) + assert.Len(t, m.Metrics(), 8) +} + +// Test that NameDrop filters without a match do nothing. +func TestRunningOutput_PassFilter(t *testing.T) { + conf := &OutputConfig{ + Filter: Filter{ + IsActive: true, + NameDrop: []string{"metric1000", "foo*"}, + }, + } + assert.NoError(t, conf.Filter.CompileFilter()) + + m := &mockOutput{} + ro := NewRunningOutput("test", m, conf) + + for _, metric := range first5 { + ro.AddMetric(metric) + } + for _, metric := range next5 { + ro.AddMetric(metric) + } + assert.Len(t, m.Metrics(), 0) + + err := ro.Write() + assert.NoError(t, err) + assert.Len(t, m.Metrics(), 10) +} + +// Test that tags are properly included +func TestRunningOutput_TagIncludeNoMatch(t *testing.T) { + conf := &OutputConfig{ + Filter: Filter{ + IsActive: true, + TagInclude: []string{"nothing*"}, + }, + } + assert.NoError(t, conf.Filter.CompileFilter()) + + m := &mockOutput{} + ro := NewRunningOutput("test", m, conf) + + ro.AddMetric(first5[0]) + assert.Len(t, m.Metrics(), 0) + + err := ro.Write() + assert.NoError(t, err) + assert.Len(t, m.Metrics(), 1) + assert.Empty(t, m.Metrics()[0].Tags()) +} + +// Test that tags are properly excluded +func TestRunningOutput_TagExcludeMatch(t *testing.T) { + conf := &OutputConfig{ + Filter: Filter{ + IsActive: true, + TagExclude: []string{"tag*"}, + }, + } + assert.NoError(t, conf.Filter.CompileFilter()) + + m := &mockOutput{} + ro := NewRunningOutput("test", m, conf) + + ro.AddMetric(first5[0]) + assert.Len(t, m.Metrics(), 0) + + err := ro.Write() + assert.NoError(t, err) + assert.Len(t, m.Metrics(), 1) + assert.Len(t, m.Metrics()[0].Tags(), 0) +} + +// Test that tags are properly Excluded +func TestRunningOutput_TagExcludeNoMatch(t *testing.T) { + conf := &OutputConfig{ + Filter: Filter{ + IsActive: true, + TagExclude: []string{"nothing*"}, + }, + } + assert.NoError(t, conf.Filter.CompileFilter()) + + m := &mockOutput{} + ro := NewRunningOutput("test", m, conf) + + ro.AddMetric(first5[0]) + assert.Len(t, m.Metrics(), 0) + + err := ro.Write() + assert.NoError(t, err) + assert.Len(t, m.Metrics(), 1) + assert.Len(t, m.Metrics()[0].Tags(), 1) +} + +// Test that tags are properly included +func TestRunningOutput_TagIncludeMatch(t *testing.T) { + conf := &OutputConfig{ + Filter: Filter{ + IsActive: true, + TagInclude: []string{"tag*"}, + }, + } + assert.NoError(t, conf.Filter.CompileFilter()) + + m := &mockOutput{} + ro := NewRunningOutput("test", m, conf) + + ro.AddMetric(first5[0]) + assert.Len(t, m.Metrics(), 0) + + err := ro.Write() + assert.NoError(t, err) + assert.Len(t, m.Metrics(), 1) + assert.Len(t, m.Metrics()[0].Tags(), 1) +} + // Test that we can write metrics with simple default setup. func TestRunningOutputDefault(t *testing.T) { conf := &OutputConfig{