Adds support for removing/keeping tags from metrics

closes #706
This commit is contained in:
Cameron Sparr
2016-04-12 17:06:27 -06:00
parent f76739cb1b
commit 81d0a64d46
14 changed files with 611 additions and 236 deletions

View File

@@ -1,33 +1,104 @@
package internal_models
import (
"fmt"
"strings"
"github.com/gobwas/glob"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
)
// TagFilter is the name of a tag, and the values on which to filter
type TagFilter struct {
Name string
Filter []string
filter glob.Glob
}
// Filter containing drop/pass and tagdrop/tagpass rules
type Filter struct {
NameDrop []string
nameDrop glob.Glob
NamePass []string
namePass glob.Glob
FieldDrop []string
fieldDrop glob.Glob
FieldPass []string
fieldPass glob.Glob
TagDrop []TagFilter
TagPass []TagFilter
TagExclude []string
tagExclude glob.Glob
TagInclude []string
tagInclude glob.Glob
IsActive bool
}
func (f Filter) ShouldMetricPass(metric telegraf.Metric) bool {
// Compile all Filter lists into glob.Glob objects.
func (f *Filter) CompileFilter() error {
var err error
f.nameDrop, err = compileFilter(f.NameDrop)
if err != nil {
return fmt.Errorf("Error compiling 'namedrop', %s", err)
}
f.namePass, err = compileFilter(f.NamePass)
if err != nil {
return fmt.Errorf("Error compiling 'namepass', %s", err)
}
f.fieldDrop, err = compileFilter(f.FieldDrop)
if err != nil {
return fmt.Errorf("Error compiling 'fielddrop', %s", err)
}
f.fieldPass, err = compileFilter(f.FieldPass)
if err != nil {
return fmt.Errorf("Error compiling 'fieldpass', %s", err)
}
f.tagExclude, err = compileFilter(f.TagExclude)
if err != nil {
return fmt.Errorf("Error compiling 'tagexclude', %s", err)
}
f.tagInclude, err = compileFilter(f.TagInclude)
if err != nil {
return fmt.Errorf("Error compiling 'taginclude', %s", err)
}
for i, _ := range f.TagDrop {
f.TagDrop[i].filter, err = compileFilter(f.TagDrop[i].Filter)
if err != nil {
return fmt.Errorf("Error compiling 'tagdrop', %s", err)
}
}
for i, _ := range f.TagPass {
f.TagPass[i].filter, err = compileFilter(f.TagPass[i].Filter)
if err != nil {
return fmt.Errorf("Error compiling 'tagpass', %s", err)
}
}
return nil
}
func compileFilter(filter []string) (glob.Glob, error) {
if len(filter) == 0 {
return nil, nil
}
var g glob.Glob
var err error
if len(filter) == 1 {
g, err = glob.Compile(filter[0])
} else {
g, err = glob.Compile("{" + strings.Join(filter, ",") + "}")
}
return g, err
}
func (f *Filter) ShouldMetricPass(metric telegraf.Metric) bool {
if f.ShouldNamePass(metric.Name()) && f.ShouldTagsPass(metric.Tags()) {
return true
}
@@ -36,70 +107,51 @@ func (f Filter) ShouldMetricPass(metric telegraf.Metric) bool {
// ShouldFieldsPass returns true if the metric should pass, false if should drop
// based on the drop/pass filter parameters
func (f Filter) ShouldNamePass(key string) bool {
if f.NamePass != nil {
for _, pat := range f.NamePass {
// TODO remove HasPrefix check, leaving it for now for legacy support.
// Cam, 2015-12-07
if strings.HasPrefix(key, pat) || internal.Glob(pat, key) {
return true
}
func (f *Filter) ShouldNamePass(key string) bool {
if f.namePass != nil {
if f.namePass.Match(key) {
return true
}
return false
}
if f.NameDrop != nil {
for _, pat := range f.NameDrop {
// TODO remove HasPrefix check, leaving it for now for legacy support.
// Cam, 2015-12-07
if strings.HasPrefix(key, pat) || internal.Glob(pat, key) {
return false
}
if f.nameDrop != nil {
if f.nameDrop.Match(key) {
return false
}
return true
}
return true
}
// ShouldFieldsPass returns true if the metric should pass, false if should drop
// based on the drop/pass filter parameters
func (f Filter) ShouldFieldsPass(key string) bool {
if f.FieldPass != nil {
for _, pat := range f.FieldPass {
// TODO remove HasPrefix check, leaving it for now for legacy support.
// Cam, 2015-12-07
if strings.HasPrefix(key, pat) || internal.Glob(pat, key) {
return true
}
func (f *Filter) ShouldFieldsPass(key string) bool {
if f.fieldPass != nil {
if f.fieldPass.Match(key) {
return true
}
return false
}
if f.FieldDrop != nil {
for _, pat := range f.FieldDrop {
// TODO remove HasPrefix check, leaving it for now for legacy support.
// Cam, 2015-12-07
if strings.HasPrefix(key, pat) || internal.Glob(pat, key) {
return false
}
if f.fieldDrop != nil {
if f.fieldDrop.Match(key) {
return false
}
return true
}
return true
}
// ShouldTagsPass returns true if the metric should pass, false if should drop
// based on the tagdrop/tagpass filter parameters
func (f Filter) ShouldTagsPass(tags map[string]string) bool {
func (f *Filter) ShouldTagsPass(tags map[string]string) bool {
if f.TagPass != nil {
for _, pat := range f.TagPass {
if pat.filter == nil {
continue
}
if tagval, ok := tags[pat.Name]; ok {
for _, filter := range pat.Filter {
if internal.Glob(filter, tagval) {
return true
}
if pat.filter.Match(tagval) {
return true
}
}
}
@@ -108,11 +160,12 @@ func (f Filter) ShouldTagsPass(tags map[string]string) bool {
if f.TagDrop != nil {
for _, pat := range f.TagDrop {
if pat.filter == nil {
continue
}
if tagval, ok := tags[pat.Name]; ok {
for _, filter := range pat.Filter {
if internal.Glob(filter, tagval) {
return false
}
if pat.filter.Match(tagval) {
return false
}
}
}
@@ -121,3 +174,23 @@ func (f Filter) ShouldTagsPass(tags map[string]string) bool {
return true
}
// Apply TagInclude and TagExclude filters.
// modifies the tags map in-place.
func (f *Filter) FilterTags(tags map[string]string) {
if f.tagInclude != nil {
for k, _ := range tags {
if !f.tagInclude.Match(k) {
delete(tags, k)
}
}
}
if f.tagExclude != nil {
for k, _ := range tags {
if f.tagExclude.Match(k) {
delete(tags, k)
}
}
}
}

View File

@@ -2,6 +2,11 @@ package internal_models
import (
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestFilter_Empty(t *testing.T) {
@@ -28,6 +33,7 @@ func TestFilter_NamePass(t *testing.T) {
f := Filter{
NamePass: []string{"foo*", "cpu_usage_idle"},
}
require.NoError(t, f.CompileFilter())
passes := []string{
"foo",
@@ -61,6 +67,7 @@ func TestFilter_NameDrop(t *testing.T) {
f := Filter{
NameDrop: []string{"foo*", "cpu_usage_idle"},
}
require.NoError(t, f.CompileFilter())
drops := []string{
"foo",
@@ -94,6 +101,7 @@ func TestFilter_FieldPass(t *testing.T) {
f := Filter{
FieldPass: []string{"foo*", "cpu_usage_idle"},
}
require.NoError(t, f.CompileFilter())
passes := []string{
"foo",
@@ -127,6 +135,7 @@ func TestFilter_FieldDrop(t *testing.T) {
f := Filter{
FieldDrop: []string{"foo*", "cpu_usage_idle"},
}
require.NoError(t, f.CompileFilter())
drops := []string{
"foo",
@@ -169,6 +178,7 @@ func TestFilter_TagPass(t *testing.T) {
f := Filter{
TagPass: filters,
}
require.NoError(t, f.CompileFilter())
passes := []map[string]string{
{"cpu": "cpu-total"},
@@ -212,6 +222,7 @@ func TestFilter_TagDrop(t *testing.T) {
f := Filter{
TagDrop: filters,
}
require.NoError(t, f.CompileFilter())
drops := []map[string]string{
{"cpu": "cpu-total"},
@@ -241,3 +252,115 @@ func TestFilter_TagDrop(t *testing.T) {
}
}
}
func TestFilter_CompileFilterError(t *testing.T) {
f := Filter{
NameDrop: []string{"", ""},
}
assert.Error(t, f.CompileFilter())
f = Filter{
NamePass: []string{"", ""},
}
assert.Error(t, f.CompileFilter())
f = Filter{
FieldDrop: []string{"", ""},
}
assert.Error(t, f.CompileFilter())
f = Filter{
FieldPass: []string{"", ""},
}
assert.Error(t, f.CompileFilter())
f = Filter{
TagExclude: []string{"", ""},
}
assert.Error(t, f.CompileFilter())
f = Filter{
TagInclude: []string{"", ""},
}
assert.Error(t, f.CompileFilter())
filters := []TagFilter{
TagFilter{
Name: "cpu",
Filter: []string{"{foobar}"},
}}
f = Filter{
TagDrop: filters,
}
require.Error(t, f.CompileFilter())
filters = []TagFilter{
TagFilter{
Name: "cpu",
Filter: []string{"{foobar}"},
}}
f = Filter{
TagPass: filters,
}
require.Error(t, f.CompileFilter())
}
func TestFilter_ShouldMetricsPass(t *testing.T) {
m := testutil.TestMetric(1, "testmetric")
f := Filter{
NameDrop: []string{"foobar"},
}
require.NoError(t, f.CompileFilter())
require.True(t, f.ShouldMetricPass(m))
m = testutil.TestMetric(1, "foobar")
require.False(t, f.ShouldMetricPass(m))
}
func TestFilter_FilterTagsNoMatches(t *testing.T) {
pretags := map[string]string{
"host": "localhost",
"mytag": "foobar",
}
f := Filter{
TagExclude: []string{"nomatch"},
}
require.NoError(t, f.CompileFilter())
f.FilterTags(pretags)
assert.Equal(t, map[string]string{
"host": "localhost",
"mytag": "foobar",
}, pretags)
f = Filter{
TagInclude: []string{"nomatch"},
}
require.NoError(t, f.CompileFilter())
f.FilterTags(pretags)
assert.Equal(t, map[string]string{}, pretags)
}
func TestFilter_FilterTagsMatches(t *testing.T) {
pretags := map[string]string{
"host": "localhost",
"mytag": "foobar",
}
f := Filter{
TagExclude: []string{"ho*"},
}
require.NoError(t, f.CompileFilter())
f.FilterTags(pretags)
assert.Equal(t, map[string]string{
"mytag": "foobar",
}, pretags)
pretags = map[string]string{
"host": "localhost",
"mytag": "foobar",
}
f = Filter{
TagInclude: []string{"my*"},
}
require.NoError(t, f.CompileFilter())
f.FilterTags(pretags)
assert.Equal(t, map[string]string{
"mytag": "foobar",
}, pretags)
}

View File

@@ -59,6 +59,19 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
ro.Lock()
defer ro.Unlock()
// Filter any tagexclude/taginclude parameters before adding metric
if len(ro.Config.Filter.TagExclude) != 0 || len(ro.Config.Filter.TagInclude) != 0 {
// In order to filter out tags, we need to create a new metric, since
// metrics are immutable once created.
tags := metric.Tags()
fields := metric.Fields()
t := metric.Time()
name := metric.Name()
ro.Config.Filter.FilterTags(tags)
// error is not possible if creating from another metric, so ignore.
metric, _ = telegraf.NewMetric(name, tags, fields, t)
}
if len(ro.metrics) < ro.MetricBufferLimit {
ro.metrics = append(ro.metrics, metric)
} else {

View File

@@ -29,6 +29,146 @@ var next5 = []telegraf.Metric{
testutil.TestMetric(101, "metric10"),
}
// Test that NameDrop filters ger properly applied.
func TestRunningOutput_DropFilter(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
IsActive: true,
NameDrop: []string{"metric1", "metric2"},
},
}
assert.NoError(t, conf.Filter.CompileFilter())
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
for _, metric := range first5 {
ro.AddMetric(metric)
}
for _, metric := range next5 {
ro.AddMetric(metric)
}
assert.Len(t, m.Metrics(), 0)
err := ro.Write()
assert.NoError(t, err)
assert.Len(t, m.Metrics(), 8)
}
// Test that NameDrop filters without a match do nothing.
func TestRunningOutput_PassFilter(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
IsActive: true,
NameDrop: []string{"metric1000", "foo*"},
},
}
assert.NoError(t, conf.Filter.CompileFilter())
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
for _, metric := range first5 {
ro.AddMetric(metric)
}
for _, metric := range next5 {
ro.AddMetric(metric)
}
assert.Len(t, m.Metrics(), 0)
err := ro.Write()
assert.NoError(t, err)
assert.Len(t, m.Metrics(), 10)
}
// Test that tags are properly included
func TestRunningOutput_TagIncludeNoMatch(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
IsActive: true,
TagInclude: []string{"nothing*"},
},
}
assert.NoError(t, conf.Filter.CompileFilter())
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
ro.AddMetric(first5[0])
assert.Len(t, m.Metrics(), 0)
err := ro.Write()
assert.NoError(t, err)
assert.Len(t, m.Metrics(), 1)
assert.Empty(t, m.Metrics()[0].Tags())
}
// Test that tags are properly excluded
func TestRunningOutput_TagExcludeMatch(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
IsActive: true,
TagExclude: []string{"tag*"},
},
}
assert.NoError(t, conf.Filter.CompileFilter())
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
ro.AddMetric(first5[0])
assert.Len(t, m.Metrics(), 0)
err := ro.Write()
assert.NoError(t, err)
assert.Len(t, m.Metrics(), 1)
assert.Len(t, m.Metrics()[0].Tags(), 0)
}
// Test that tags are properly Excluded
func TestRunningOutput_TagExcludeNoMatch(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
IsActive: true,
TagExclude: []string{"nothing*"},
},
}
assert.NoError(t, conf.Filter.CompileFilter())
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
ro.AddMetric(first5[0])
assert.Len(t, m.Metrics(), 0)
err := ro.Write()
assert.NoError(t, err)
assert.Len(t, m.Metrics(), 1)
assert.Len(t, m.Metrics()[0].Tags(), 1)
}
// Test that tags are properly included
func TestRunningOutput_TagIncludeMatch(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
IsActive: true,
TagInclude: []string{"tag*"},
},
}
assert.NoError(t, conf.Filter.CompileFilter())
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
ro.AddMetric(first5[0])
assert.Len(t, m.Metrics(), 0)
err := ro.Write()
assert.NoError(t, err)
assert.Len(t, m.Metrics(), 1)
assert.Len(t, m.Metrics()[0].Tags(), 1)
}
// Test that we can write metrics with simple default setup.
func TestRunningOutputDefault(t *testing.T) {
conf := &OutputConfig{