Fix filtering when both pass and drop match an item (#3036)

Adjust logic in functions responsible for passing metrics in order to be able
to process them correctly in case where pass and drop are defined together.
This commit is contained in:
DanKans 2017-07-21 18:53:57 +01:00 committed by Daniel Nelson
parent a2d4453269
commit d903a9142d
3 changed files with 74 additions and 11 deletions

View File

@ -134,8 +134,8 @@ is tested on points after they have passed the `namepass` test.
An array of glob pattern strings. Only fields whose field key matches a An array of glob pattern strings. Only fields whose field key matches a
pattern in this list are emitted. Not available for outputs. pattern in this list are emitted. Not available for outputs.
* **fielddrop**: * **fielddrop**:
The inverse of `fieldpass`. Fields with a field key matching one of the The inverse of `fieldpass`. Fields with a field key matching one of the
patterns will be discarded from the point. Not available for outputs. patterns will be discarded from the point. This is tested on points after they have passed the `fieldpass` test. Not available for outputs.
* **tagpass**: * **tagpass**:
A table mapping tag keys to arrays of glob pattern strings. Only points A table mapping tag keys to arrays of glob pattern strings. Only points
that contain a tag key in the table and a tag value matching one of its that contain a tag key in the table and a tag value matching one of its

View File

@ -132,6 +132,7 @@ func (f *Filter) Apply(
return true return true
} }
// IsActive checking if filter is active
func (f *Filter) IsActive() bool { func (f *Filter) IsActive() bool {
return f.isActive return f.isActive
} }
@ -139,36 +140,58 @@ func (f *Filter) IsActive() bool {
// shouldNamePass returns true if the metric should pass, false if should drop // shouldNamePass returns true if the metric should pass, false if should drop
// based on the drop/pass filter parameters // based on the drop/pass filter parameters
func (f *Filter) shouldNamePass(key string) bool { func (f *Filter) shouldNamePass(key string) bool {
if f.namePass != nil {
pass := func(f *Filter) bool {
if f.namePass.Match(key) { if f.namePass.Match(key) {
return true return true
} }
return false return false
} }
if f.nameDrop != nil { drop := func(f *Filter) bool {
if f.nameDrop.Match(key) { if f.nameDrop.Match(key) {
return false return false
} }
return true
} }
if f.namePass != nil && f.nameDrop != nil {
return pass(f) && drop(f)
} else if f.namePass != nil {
return pass(f)
} else if f.nameDrop != nil {
return drop(f)
}
return true return true
} }
// shouldFieldPass returns true if the metric should pass, false if should drop // shouldFieldPass returns true if the metric should pass, false if should drop
// based on the drop/pass filter parameters // based on the drop/pass filter parameters
func (f *Filter) shouldFieldPass(key string) bool { func (f *Filter) shouldFieldPass(key string) bool {
if f.fieldPass != nil {
pass := func(f *Filter) bool {
if f.fieldPass.Match(key) { if f.fieldPass.Match(key) {
return true return true
} }
return false return false
} }
if f.fieldDrop != nil { drop := func(f *Filter) bool {
if f.fieldDrop.Match(key) { if f.fieldDrop.Match(key) {
return false return false
} }
return true
} }
if f.fieldPass != nil && f.fieldDrop != nil {
return pass(f) && drop(f)
} else if f.fieldPass != nil {
return pass(f)
} else if f.fieldDrop != nil {
return drop(f)
}
return true return true
} }
@ -176,7 +199,7 @@ func (f *Filter) shouldFieldPass(key string) bool {
// based on the tagdrop/tagpass filter parameters // based on the tagdrop/tagpass filter parameters
func (f *Filter) shouldTagsPass(tags map[string]string) bool { func (f *Filter) shouldTagsPass(tags map[string]string) bool {
tagPass := func(f *Filter) bool { pass := func(f *Filter) bool {
for _, pat := range f.TagPass { for _, pat := range f.TagPass {
if pat.filter == nil { if pat.filter == nil {
continue continue
@ -190,7 +213,7 @@ func (f *Filter) shouldTagsPass(tags map[string]string) bool {
return false return false
} }
tagDrop := func(f *Filter) bool { drop := func(f *Filter) bool {
for _, pat := range f.TagDrop { for _, pat := range f.TagDrop {
if pat.filter == nil { if pat.filter == nil {
continue continue
@ -209,11 +232,11 @@ func (f *Filter) shouldTagsPass(tags map[string]string) bool {
if f.TagPass != nil && f.TagDrop != nil { if f.TagPass != nil && f.TagDrop != nil {
// return true only in case when tag pass and won't be dropped (true, true). // return true only in case when tag pass and won't be dropped (true, true).
// in case when the same tag should be passed and dropped it will be dropped (true, false). // in case when the same tag should be passed and dropped it will be dropped (true, false).
return tagPass(f) && tagDrop(f) return pass(f) && drop(f)
} else if f.TagPass != nil { } else if f.TagPass != nil {
return tagPass(f) return pass(f)
} else if f.TagDrop != nil { } else if f.TagDrop != nil {
return tagDrop(f) return drop(f)
} }
return true return true

View File

@ -358,6 +358,46 @@ func TestFilter_FilterTagsMatches(t *testing.T) {
}, pretags) }, pretags)
} }
// TestFilter_FilterNamePassAndDrop used for check case when
// both parameters were defined
// see: https://github.com/influxdata/telegraf/issues/2860
func TestFilter_FilterNamePassAndDrop(t *testing.T) {
inputData := []string{"name1", "name2", "name3", "name4"}
expectedResult := []bool{false, true, false, false}
f := Filter{
NamePass: []string{"name1", "name2"},
NameDrop: []string{"name1", "name3"},
}
require.NoError(t, f.Compile())
for i, name := range inputData {
assert.Equal(t, f.shouldNamePass(name), expectedResult[i])
}
}
// TestFilter_FilterFieldPassAndDrop used for check case when
// both parameters were defined
// see: https://github.com/influxdata/telegraf/issues/2860
func TestFilter_FilterFieldPassAndDrop(t *testing.T) {
inputData := []string{"field1", "field2", "field3", "field4"}
expectedResult := []bool{false, true, false, false}
f := Filter{
FieldPass: []string{"field1", "field2"},
FieldDrop: []string{"field1", "field3"},
}
require.NoError(t, f.Compile())
for i, field := range inputData {
assert.Equal(t, f.shouldFieldPass(field), expectedResult[i])
}
}
// TestFilter_FilterTagsPassAndDrop used for check case when // TestFilter_FilterTagsPassAndDrop used for check case when
// both parameters were defined // both parameters were defined
// see: https://github.com/influxdata/telegraf/issues/2860 // see: https://github.com/influxdata/telegraf/issues/2860