Remove metric recreation when filtering (#4767)

This commit is contained in:
Daniel Nelson
2018-09-28 14:48:20 -07:00
committed by GitHub
parent cc64b14ab4
commit 7553c8fd13
15 changed files with 635 additions and 606 deletions

View File

@@ -884,14 +884,6 @@ func (c *Config) addInput(name string, table *ast.Table) error {
// builds the filter and returns a
// models.AggregatorConfig to be inserted into models.RunningAggregator
func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, error) {
unsupportedFields := []string{"tagexclude", "taginclude"}
for _, field := range unsupportedFields {
if _, ok := tbl.Fields[field]; ok {
return nil, fmt.Errorf("%s is not supported for aggregator plugins (%s).",
field, name)
}
}
conf := &models.AggregatorConfig{
Name: name,
Delay: time.Millisecond * 100,
@@ -989,13 +981,6 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err
// models.ProcessorConfig to be inserted into models.RunningProcessor
func buildProcessor(name string, tbl *ast.Table) (*models.ProcessorConfig, error) {
conf := &models.ProcessorConfig{Name: name}
unsupportedFields := []string{"tagexclude", "taginclude", "fielddrop", "fieldpass"}
for _, field := range unsupportedFields {
if _, ok := tbl.Fields[field]; ok {
return nil, fmt.Errorf("%s is not supported for processor plugins (%s).",
field, name)
}
}
if node, ok := tbl.Fields["order"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {

View File

@@ -3,6 +3,7 @@ package models
import (
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
)
@@ -93,45 +94,35 @@ func (f *Filter) Compile() error {
return nil
}
// Apply applies the filter to the given measurement name, fields map, and
// tags map. It will return false if the metric should be "filtered out", and
// true if the metric should "pass".
// It will modify tags & fields in-place if they need to be deleted.
func (f *Filter) Apply(
measurement string,
fields map[string]interface{},
tags map[string]string,
) bool {
// Select returns true if the metric matches according to the
// namepass/namedrop and tagpass/tagdrop filters. The metric is not modified.
func (f *Filter) Select(metric telegraf.Metric) bool {
if !f.isActive {
return true
}
// check if the measurement name should pass
if !f.shouldNamePass(measurement) {
if !f.shouldNamePass(metric.Name()) {
return false
}
// check if the tags should pass
if !f.shouldTagsPass(tags) {
if !f.shouldTagsPass(metric.TagList()) {
return false
}
// filter fields
for fieldkey, _ := range fields {
if !f.shouldFieldPass(fieldkey) {
delete(fields, fieldkey)
}
}
if len(fields) == 0 {
return false
}
// filter tags
f.filterTags(tags)
return true
}
// Modify removes any tags and fields from the metric according to the
// fieldpass/fielddrop and taginclude/tagexclude filters.
func (f *Filter) Modify(metric telegraf.Metric) {
if !f.isActive {
return
}
f.filterFields(metric)
f.filterTags(metric)
}
// IsActive checking if filter is active
func (f *Filter) IsActive() bool {
return f.isActive
@@ -140,7 +131,6 @@ func (f *Filter) IsActive() bool {
// shouldNamePass returns true if the metric should pass, false if should drop
// based on the drop/pass filter parameters
func (f *Filter) shouldNamePass(key string) bool {
pass := func(f *Filter) bool {
if f.namePass.Match(key) {
return true
@@ -169,44 +159,29 @@ func (f *Filter) shouldNamePass(key string) bool {
// shouldFieldPass returns true if the metric should pass, false if should drop
// based on the drop/pass filter parameters
func (f *Filter) shouldFieldPass(key string) bool {
pass := func(f *Filter) bool {
if f.fieldPass.Match(key) {
return true
}
return false
}
drop := func(f *Filter) bool {
if f.fieldDrop.Match(key) {
return false
}
return true
}
if f.fieldPass != nil && f.fieldDrop != nil {
return pass(f) && drop(f)
return f.fieldPass.Match(key) && !f.fieldDrop.Match(key)
} else if f.fieldPass != nil {
return pass(f)
return f.fieldPass.Match(key)
} else if f.fieldDrop != nil {
return drop(f)
return !f.fieldDrop.Match(key)
}
return true
}
// shouldTagsPass returns true if the metric should pass, false if should drop
// based on the tagdrop/tagpass filter parameters
func (f *Filter) shouldTagsPass(tags map[string]string) bool {
func (f *Filter) shouldTagsPass(tags []*telegraf.Tag) bool {
pass := func(f *Filter) bool {
for _, pat := range f.TagPass {
if pat.filter == nil {
continue
}
if tagval, ok := tags[pat.Name]; ok {
if pat.filter.Match(tagval) {
return true
for _, tag := range tags {
if tag.Key == pat.Name {
if pat.filter.Match(tag.Value) {
return true
}
}
}
}
@@ -218,9 +193,11 @@ func (f *Filter) shouldTagsPass(tags map[string]string) bool {
if pat.filter == nil {
continue
}
if tagval, ok := tags[pat.Name]; ok {
if pat.filter.Match(tagval) {
return false
for _, tag := range tags {
if tag.Key == pat.Name {
if pat.filter.Match(tag.Value) {
return false
}
}
}
}
@@ -242,22 +219,42 @@ func (f *Filter) shouldTagsPass(tags map[string]string) bool {
return true
}
// Apply TagInclude and TagExclude filters.
// modifies the tags map in-place.
func (f *Filter) filterTags(tags map[string]string) {
if f.tagInclude != nil {
for k, _ := range tags {
if !f.tagInclude.Match(k) {
delete(tags, k)
}
// filterFields removes fields according to fieldpass/fielddrop.
func (f *Filter) filterFields(metric telegraf.Metric) {
filterKeys := []string{}
for _, field := range metric.FieldList() {
if !f.shouldFieldPass(field.Key) {
filterKeys = append(filterKeys, field.Key)
}
}
if f.tagExclude != nil {
for k, _ := range tags {
if f.tagExclude.Match(k) {
delete(tags, k)
for _, key := range filterKeys {
metric.RemoveField(key)
}
}
// filterTags removes tags according to taginclude/tagexclude.
func (f *Filter) filterTags(metric telegraf.Metric) {
filterKeys := []string{}
if f.tagInclude != nil {
for _, tag := range metric.TagList() {
if !f.tagInclude.Match(tag.Key) {
filterKeys = append(filterKeys, tag.Key)
}
}
}
for _, key := range filterKeys {
metric.RemoveTag(key)
}
if f.tagExclude != nil {
for _, tag := range metric.TagList() {
if f.tagExclude.Match(tag.Key) {
filterKeys = append(filterKeys, tag.Key)
}
}
}
for _, key := range filterKeys {
metric.RemoveTag(key)
}
}

View File

@@ -2,17 +2,24 @@ package models
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/require"
)
func TestFilter_ApplyEmpty(t *testing.T) {
f := Filter{}
require.NoError(t, f.Compile())
assert.False(t, f.IsActive())
require.False(t, f.IsActive())
assert.True(t, f.Apply("m", map[string]interface{}{"value": int64(1)}, map[string]string{}))
m, err := metric.New("m",
map[string]string{},
map[string]interface{}{"value": int64(1)},
time.Now())
require.NoError(t, err)
require.True(t, f.Select(m))
}
func TestFilter_ApplyTagsDontPass(t *testing.T) {
@@ -27,11 +34,14 @@ func TestFilter_ApplyTagsDontPass(t *testing.T) {
}
require.NoError(t, f.Compile())
require.NoError(t, f.Compile())
assert.True(t, f.IsActive())
require.True(t, f.IsActive())
assert.False(t, f.Apply("m",
m, err := metric.New("m",
map[string]string{"cpu": "cpu-total"},
map[string]interface{}{"value": int64(1)},
map[string]string{"cpu": "cpu-total"}))
time.Now())
require.NoError(t, err)
require.False(t, f.Select(m))
}
func TestFilter_ApplyDeleteFields(t *testing.T) {
@@ -40,11 +50,19 @@ func TestFilter_ApplyDeleteFields(t *testing.T) {
}
require.NoError(t, f.Compile())
require.NoError(t, f.Compile())
assert.True(t, f.IsActive())
require.True(t, f.IsActive())
fields := map[string]interface{}{"value": int64(1), "value2": int64(2)}
assert.True(t, f.Apply("m", fields, nil))
assert.Equal(t, map[string]interface{}{"value2": int64(2)}, fields)
m, err := metric.New("m",
map[string]string{},
map[string]interface{}{
"value": int64(1),
"value2": int64(2),
},
time.Now())
require.NoError(t, err)
require.True(t, f.Select(m))
f.Modify(m)
require.Equal(t, map[string]interface{}{"value2": int64(2)}, m.Fields())
}
func TestFilter_ApplyDeleteAllFields(t *testing.T) {
@@ -53,10 +71,19 @@ func TestFilter_ApplyDeleteAllFields(t *testing.T) {
}
require.NoError(t, f.Compile())
require.NoError(t, f.Compile())
assert.True(t, f.IsActive())
require.True(t, f.IsActive())
fields := map[string]interface{}{"value": int64(1), "value2": int64(2)}
assert.False(t, f.Apply("m", fields, nil))
m, err := metric.New("m",
map[string]string{},
map[string]interface{}{
"value": int64(1),
"value2": int64(2),
},
time.Now())
require.NoError(t, err)
require.True(t, f.Select(m))
f.Modify(m)
require.Len(t, m.FieldList(), 0)
}
func TestFilter_Empty(t *testing.T) {
@@ -230,20 +257,20 @@ func TestFilter_TagPass(t *testing.T) {
}
require.NoError(t, f.Compile())
passes := []map[string]string{
{"cpu": "cpu-total"},
{"cpu": "cpu-0"},
{"cpu": "cpu-1"},
{"cpu": "cpu-2"},
{"mem": "mem_free"},
passes := [][]*telegraf.Tag{
[]*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu-total"}},
[]*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu-0"}},
[]*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu-1"}},
[]*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu-2"}},
[]*telegraf.Tag{&telegraf.Tag{Key: "mem", Value: "mem_free"}},
}
drops := []map[string]string{
{"cpu": "cputotal"},
{"cpu": "cpu0"},
{"cpu": "cpu1"},
{"cpu": "cpu2"},
{"mem": "mem_used"},
drops := [][]*telegraf.Tag{
[]*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cputotal"}},
[]*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu0"}},
[]*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu1"}},
[]*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu2"}},
[]*telegraf.Tag{&telegraf.Tag{Key: "mem", Value: "mem_used"}},
}
for _, tags := range passes {
@@ -274,20 +301,20 @@ func TestFilter_TagDrop(t *testing.T) {
}
require.NoError(t, f.Compile())
drops := []map[string]string{
{"cpu": "cpu-total"},
{"cpu": "cpu-0"},
{"cpu": "cpu-1"},
{"cpu": "cpu-2"},
{"mem": "mem_free"},
drops := [][]*telegraf.Tag{
[]*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu-total"}},
[]*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu-0"}},
[]*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu-1"}},
[]*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu-2"}},
[]*telegraf.Tag{&telegraf.Tag{Key: "mem", Value: "mem_free"}},
}
passes := []map[string]string{
{"cpu": "cputotal"},
{"cpu": "cpu0"},
{"cpu": "cpu1"},
{"cpu": "cpu2"},
{"mem": "mem_used"},
passes := [][]*telegraf.Tag{
[]*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cputotal"}},
[]*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu0"}},
[]*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu1"}},
[]*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu2"}},
[]*telegraf.Tag{&telegraf.Tag{Key: "mem", Value: "mem_used"}},
}
for _, tags := range passes {
@@ -304,58 +331,70 @@ func TestFilter_TagDrop(t *testing.T) {
}
func TestFilter_FilterTagsNoMatches(t *testing.T) {
pretags := map[string]string{
"host": "localhost",
"mytag": "foobar",
}
m, err := metric.New("m",
map[string]string{
"host": "localhost",
"mytag": "foobar",
},
map[string]interface{}{"value": int64(1)},
time.Now())
require.NoError(t, err)
f := Filter{
TagExclude: []string{"nomatch"},
}
require.NoError(t, f.Compile())
f.filterTags(pretags)
assert.Equal(t, map[string]string{
f.filterTags(m)
require.Equal(t, map[string]string{
"host": "localhost",
"mytag": "foobar",
}, pretags)
}, m.Tags())
f = Filter{
TagInclude: []string{"nomatch"},
}
require.NoError(t, f.Compile())
f.filterTags(pretags)
assert.Equal(t, map[string]string{}, pretags)
f.filterTags(m)
require.Equal(t, map[string]string{}, m.Tags())
}
func TestFilter_FilterTagsMatches(t *testing.T) {
pretags := map[string]string{
"host": "localhost",
"mytag": "foobar",
}
m, err := metric.New("m",
map[string]string{
"host": "localhost",
"mytag": "foobar",
},
map[string]interface{}{"value": int64(1)},
time.Now())
require.NoError(t, err)
f := Filter{
TagExclude: []string{"ho*"},
}
require.NoError(t, f.Compile())
f.filterTags(pretags)
assert.Equal(t, map[string]string{
f.filterTags(m)
require.Equal(t, map[string]string{
"mytag": "foobar",
}, pretags)
}, m.Tags())
pretags = map[string]string{
"host": "localhost",
"mytag": "foobar",
}
m, err = metric.New("m",
map[string]string{
"host": "localhost",
"mytag": "foobar",
},
map[string]interface{}{"value": int64(1)},
time.Now())
require.NoError(t, err)
f = Filter{
TagInclude: []string{"my*"},
}
require.NoError(t, f.Compile())
f.filterTags(pretags)
assert.Equal(t, map[string]string{
f.filterTags(m)
require.Equal(t, map[string]string{
"mytag": "foobar",
}, pretags)
}, m.Tags())
}
// TestFilter_FilterNamePassAndDrop used for check case when
@@ -374,7 +413,7 @@ func TestFilter_FilterNamePassAndDrop(t *testing.T) {
require.NoError(t, f.Compile())
for i, name := range inputData {
assert.Equal(t, f.shouldNamePass(name), expectedResult[i])
require.Equal(t, f.shouldNamePass(name), expectedResult[i])
}
}
@@ -394,7 +433,7 @@ func TestFilter_FilterFieldPassAndDrop(t *testing.T) {
require.NoError(t, f.Compile())
for i, field := range inputData {
assert.Equal(t, f.shouldFieldPass(field), expectedResult[i])
require.Equal(t, f.shouldFieldPass(field), expectedResult[i])
}
}
@@ -402,12 +441,11 @@ func TestFilter_FilterFieldPassAndDrop(t *testing.T) {
// both parameters were defined
// see: https://github.com/influxdata/telegraf/issues/2860
func TestFilter_FilterTagsPassAndDrop(t *testing.T) {
inputData := []map[string]string{
{"tag1": "1", "tag2": "3"},
{"tag1": "1", "tag2": "2"},
{"tag1": "2", "tag2": "1"},
{"tag1": "4", "tag2": "1"},
inputData := [][]*telegraf.Tag{
[]*telegraf.Tag{&telegraf.Tag{Key: "tag1", Value: "1"}, &telegraf.Tag{Key: "tag2", Value: "3"}},
[]*telegraf.Tag{&telegraf.Tag{Key: "tag1", Value: "1"}, &telegraf.Tag{Key: "tag2", Value: "2"}},
[]*telegraf.Tag{&telegraf.Tag{Key: "tag1", Value: "2"}, &telegraf.Tag{Key: "tag2", Value: "1"}},
[]*telegraf.Tag{&telegraf.Tag{Key: "tag1", Value: "4"}, &telegraf.Tag{Key: "tag2", Value: "1"}},
}
expectedResult := []bool{false, true, false, false}
@@ -438,7 +476,7 @@ func TestFilter_FilterTagsPassAndDrop(t *testing.T) {
require.NoError(t, f.Compile())
for i, tag := range inputData {
assert.Equal(t, f.shouldTagsPass(tag), expectedResult[i])
require.Equal(t, f.shouldTagsPass(tag), expectedResult[i])
}
}

View File

@@ -1,86 +1,42 @@
package models
import (
"log"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
// makemetric is used by both RunningAggregator & RunningInput
// to make metrics.
// nameOverride: override the name of the measurement being made.
// namePrefix: add this prefix to each measurement name.
// nameSuffix: add this suffix to each measurement name.
// pluginTags: these are tags that are specific to this plugin.
// daemonTags: these are daemon-wide global tags, and get applied after pluginTags.
// filter: this is a filter to apply to each metric being made.
// applyFilter: if false, the above filter is not applied to each metric.
// This is used by Aggregators, because aggregators use filters
// on incoming metrics instead of on created metrics.
// TODO refactor this to not have such a huge func signature.
// Makemetric applies new metric plugin and agent measurement and tag
// settings.
func makemetric(
measurement string,
fields map[string]interface{},
tags map[string]string,
metric telegraf.Metric,
nameOverride string,
namePrefix string,
nameSuffix string,
pluginTags map[string]string,
daemonTags map[string]string,
filter Filter,
applyFilter bool,
mType telegraf.ValueType,
t time.Time,
tags map[string]string,
globalTags map[string]string,
) telegraf.Metric {
if len(fields) == 0 || len(measurement) == 0 {
return nil
}
if tags == nil {
tags = make(map[string]string)
if len(nameOverride) != 0 {
metric.SetName(nameOverride)
}
// Override measurement name if set
if len(nameOverride) != 0 {
measurement = nameOverride
}
// Apply measurement prefix and suffix if set
if len(namePrefix) != 0 {
measurement = namePrefix + measurement
metric.AddPrefix(namePrefix)
}
if len(nameSuffix) != 0 {
measurement = measurement + nameSuffix
metric.AddSuffix(nameSuffix)
}
// Apply plugin-wide tags if set
for k, v := range pluginTags {
if _, ok := tags[k]; !ok {
tags[k] = v
// Apply plugin-wide tags
for k, v := range tags {
if _, ok := metric.GetTag(k); !ok {
metric.AddTag(k, v)
}
}
// Apply daemon-wide tags if set
for k, v := range daemonTags {
if _, ok := tags[k]; !ok {
tags[k] = v
// Apply global tags
for k, v := range globalTags {
if _, ok := metric.GetTag(k); !ok {
metric.AddTag(k, v)
}
}
// Apply the metric filter(s)
// for aggregators, the filter does not get applied when the metric is made.
// instead, the filter is applied to metric incoming into the plugin.
// ie, it gets applied in the RunningAggregator.Apply function.
if applyFilter {
if ok := filter.Apply(measurement, fields, tags); !ok {
return nil
}
}
m, err := metric.New(measurement, tags, fields, t, mType)
if err != nil {
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
return nil
}
return m
return metric
}

View File

@@ -5,7 +5,6 @@ import (
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
type RunningAggregator struct {
@@ -29,47 +28,32 @@ func NewRunningAggregator(
}
}
// AggregatorConfig containing configuration parameters for the running
// aggregator plugin.
// AggregatorConfig is the common config for all aggregators.
type AggregatorConfig struct {
Name string
Name string
DropOriginal bool
Period time.Duration
Delay time.Duration
DropOriginal bool
NameOverride string
MeasurementPrefix string
MeasurementSuffix string
Tags map[string]string
Filter Filter
Period time.Duration
Delay time.Duration
}
func (r *RunningAggregator) Name() string {
return "aggregators." + r.Config.Name
}
func (r *RunningAggregator) MakeMetric(
measurement string,
fields map[string]interface{},
tags map[string]string,
mType telegraf.ValueType,
t time.Time,
) telegraf.Metric {
func (r *RunningAggregator) MakeMetric(metric telegraf.Metric) telegraf.Metric {
m := makemetric(
measurement,
fields,
tags,
metric,
r.Config.NameOverride,
r.Config.MeasurementPrefix,
r.Config.MeasurementSuffix,
r.Config.Tags,
nil,
r.Config.Filter,
false,
mType,
t,
)
nil)
if m != nil {
m.SetAggregate(true)
@@ -78,27 +62,23 @@ func (r *RunningAggregator) MakeMetric(
return m
}
// Add applies the given metric to the aggregator.
// Before applying to the plugin, it will run any defined filters on the metric.
// Apply returns true if the original metric should be dropped.
func (r *RunningAggregator) Add(in telegraf.Metric) bool {
if r.Config.Filter.IsActive() {
// check if the aggregator should apply this metric
name := in.Name()
fields := in.Fields()
tags := in.Tags()
t := in.Time()
if ok := r.Config.Filter.Apply(name, fields, tags); !ok {
// aggregator should not apply this metric
return false
}
in, _ = metric.New(name, tags, fields, t)
// Add a metric to the aggregator and return true if the original metric
// should be dropped.
func (r *RunningAggregator) Add(metric telegraf.Metric) bool {
if ok := r.Config.Filter.Select(metric); !ok {
return false
}
r.metrics <- in
r.Config.Filter.Modify(metric)
if len(metric.FieldList()) == 0 {
return r.Config.DropOriginal
}
r.metrics <- metric
return r.Config.DropOriginal
}
func (r *RunningAggregator) add(in telegraf.Metric) {
r.a.Add(in)
}

View File

@@ -7,9 +7,11 @@ import (
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestAdd(t *testing.T) {
@@ -25,13 +27,15 @@ func TestAdd(t *testing.T) {
acc := testutil.Accumulator{}
go ra.Run(&acc, make(chan struct{}))
m := ra.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
m, err := metric.New("RITest",
map[string]string{},
telegraf.Untyped,
map[string]interface{}{
"value": int64(101),
},
time.Now().Add(time.Millisecond*150),
)
telegraf.Untyped)
require.NoError(t, err)
assert.False(t, ra.Add(m))
for {
@@ -56,34 +60,37 @@ func TestAddMetricsOutsideCurrentPeriod(t *testing.T) {
acc := testutil.Accumulator{}
go ra.Run(&acc, make(chan struct{}))
// metric before current period
m := ra.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
m, err := metric.New("RITest",
map[string]string{},
telegraf.Untyped,
map[string]interface{}{
"value": int64(101),
},
time.Now().Add(-time.Hour),
)
telegraf.Untyped)
require.NoError(t, err)
assert.False(t, ra.Add(m))
// metric after current period
m = ra.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
m, err = metric.New("RITest",
map[string]string{},
telegraf.Untyped,
map[string]interface{}{
"value": int64(101),
},
time.Now().Add(time.Hour),
)
telegraf.Untyped)
require.NoError(t, err)
assert.False(t, ra.Add(m))
// "now" metric
m = ra.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
m, err = metric.New("RITest",
map[string]string{},
telegraf.Untyped,
map[string]interface{}{
"value": int64(101),
},
time.Now().Add(time.Millisecond*50),
)
telegraf.Untyped)
require.NoError(t, err)
assert.False(t, ra.Add(m))
for {
@@ -115,13 +122,14 @@ func TestAddAndPushOnePeriod(t *testing.T) {
ra.Run(&acc, shutdown)
}()
m := ra.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
m, err := metric.New("RITest",
map[string]string{},
telegraf.Untyped,
map[string]interface{}{
"value": int64(101),
},
time.Now().Add(time.Millisecond*100),
)
telegraf.Untyped)
require.NoError(t, err)
assert.False(t, ra.Add(m))
for {
@@ -146,23 +154,25 @@ func TestAddDropOriginal(t *testing.T) {
})
assert.NoError(t, ra.Config.Filter.Compile())
m := ra.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
m, err := metric.New("RITest",
map[string]string{},
telegraf.Untyped,
map[string]interface{}{
"value": int64(101),
},
time.Now(),
)
telegraf.Untyped)
require.NoError(t, err)
assert.True(t, ra.Add(m))
// this metric name doesn't match the filter, so Add will return false
m2 := ra.MakeMetric(
"foobar",
map[string]interface{}{"value": int(101)},
m2, err := metric.New("foobar",
map[string]string{},
telegraf.Untyped,
map[string]interface{}{
"value": int64(101),
},
time.Now(),
)
telegraf.Untyped)
require.NoError(t, err)
assert.False(t, ra.Add(m2))
}

View File

@@ -36,44 +36,39 @@ func NewRunningInput(
}
}
// InputConfig containing a name, interval, and filter
// InputConfig is the common config for all inputs.
type InputConfig struct {
Name string
Name string
Interval time.Duration
NameOverride string
MeasurementPrefix string
MeasurementSuffix string
Tags map[string]string
Filter Filter
Interval time.Duration
}
func (r *RunningInput) Name() string {
return "inputs." + r.Config.Name
}
// MakeMetric either returns a metric, or returns nil if the metric doesn't
// need to be created (because of filtering, an error, etc.)
func (r *RunningInput) MakeMetric(
measurement string,
fields map[string]interface{},
tags map[string]string,
mType telegraf.ValueType,
t time.Time,
) telegraf.Metric {
func (r *RunningInput) MakeMetric(metric telegraf.Metric) telegraf.Metric {
if ok := r.Config.Filter.Select(metric); !ok {
return nil
}
r.Config.Filter.Modify(metric)
if len(metric.FieldList()) == 0 {
return nil
}
m := makemetric(
measurement,
fields,
tags,
metric,
r.Config.NameOverride,
r.Config.MeasurementPrefix,
r.Config.MeasurementSuffix,
r.Config.Tags,
r.defaultTags,
r.Config.Filter,
true,
mType,
t,
)
r.defaultTags)
if r.trace && m != nil {
s := influx.NewSerializer()

View File

@@ -17,13 +17,13 @@ func TestMakeMetricNoFields(t *testing.T) {
Name: "TestRunningInput",
})
m := ri.MakeMetric(
"RITest",
map[string]interface{}{},
m, err := metric.New("RITest",
map[string]string{},
telegraf.Untyped,
map[string]interface{}{},
now,
)
telegraf.Untyped)
m = ri.MakeMetric(m)
require.NoError(t, err)
assert.Nil(t, m)
}
@@ -34,16 +34,16 @@ func TestMakeMetricNilFields(t *testing.T) {
Name: "TestRunningInput",
})
m := ri.MakeMetric(
"RITest",
m, err := metric.New("RITest",
map[string]string{},
map[string]interface{}{
"value": int(101),
"value": int64(101),
"nil": nil,
},
map[string]string{},
telegraf.Untyped,
now,
)
telegraf.Untyped)
require.NoError(t, err)
m = ri.MakeMetric(m)
expected, err := metric.New("RITest",
map[string]string{},
@@ -69,13 +69,15 @@ func TestMakeMetricWithPluginTags(t *testing.T) {
ri.SetTrace(true)
assert.Equal(t, true, ri.Trace())
m := ri.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
nil,
telegraf.Untyped,
m, err := metric.New("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
now,
)
telegraf.Untyped)
require.NoError(t, err)
m = ri.MakeMetric(m)
expected, err := metric.New("RITest",
map[string]string{
@@ -104,13 +106,15 @@ func TestMakeMetricFilteredOut(t *testing.T) {
assert.Equal(t, true, ri.Trace())
assert.NoError(t, ri.Config.Filter.Compile())
m := ri.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
nil,
telegraf.Untyped,
m, err := metric.New("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
now,
)
telegraf.Untyped)
m = ri.MakeMetric(m)
require.NoError(t, err)
assert.Nil(t, m)
}
@@ -126,13 +130,15 @@ func TestMakeMetricWithDaemonTags(t *testing.T) {
ri.SetTrace(true)
assert.Equal(t, true, ri.Trace())
m := ri.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
m, err := metric.New("RITest",
map[string]string{},
telegraf.Untyped,
map[string]interface{}{
"value": int64(101),
},
now,
)
telegraf.Untyped)
require.NoError(t, err)
m = ri.MakeMetric(m)
expected, err := metric.New("RITest",
map[string]string{
"foo": "bar",
@@ -153,13 +159,15 @@ func TestMakeMetricNameOverride(t *testing.T) {
NameOverride: "foobar",
})
m := ri.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
m, err := metric.New("RITest",
map[string]string{},
telegraf.Untyped,
map[string]interface{}{
"value": int64(101),
},
now,
)
telegraf.Untyped)
require.NoError(t, err)
m = ri.MakeMetric(m)
expected, err := metric.New("foobar",
nil,
map[string]interface{}{
@@ -178,13 +186,15 @@ func TestMakeMetricNamePrefix(t *testing.T) {
MeasurementPrefix: "foobar_",
})
m := ri.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
m, err := metric.New("RITest",
map[string]string{},
telegraf.Untyped,
map[string]interface{}{
"value": int64(101),
},
now,
)
telegraf.Untyped)
require.NoError(t, err)
m = ri.MakeMetric(m)
expected, err := metric.New("foobar_RITest",
nil,
map[string]interface{}{
@@ -203,13 +213,15 @@ func TestMakeMetricNameSuffix(t *testing.T) {
MeasurementSuffix: "_foobar",
})
m := ri.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
m, err := metric.New("RITest",
map[string]string{},
telegraf.Untyped,
map[string]interface{}{
"value": int64(101),
},
now,
)
telegraf.Untyped)
require.NoError(t, err)
m = ri.MakeMetric(m)
expected, err := metric.New("RITest_foobar",
nil,
map[string]interface{}{

View File

@@ -7,7 +7,6 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/buffer"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/selfstat"
)
@@ -42,6 +41,12 @@ type RunningOutput struct {
writeMutex sync.Mutex
}
// OutputConfig containing name and filter
type OutputConfig struct {
Name string
Filter Filter
}
func NewRunningOutput(
name string,
output telegraf.Output,
@@ -95,36 +100,25 @@ func NewRunningOutput(
// AddMetric adds a metric to the output. This function can also write cached
// points if FlushBufferWhenFull is true.
func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
if m == nil {
func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
if ok := ro.Config.Filter.Select(metric); !ok {
ro.MetricsFiltered.Incr(1)
return
}
// Filter any tagexclude/taginclude parameters before adding metric
if ro.Config.Filter.IsActive() {
// In order to filter out tags, we need to create a new metric, since
// metrics are immutable once created.
name := m.Name()
tags := m.Tags()
fields := m.Fields()
t := m.Time()
tp := m.Type()
if ok := ro.Config.Filter.Apply(name, fields, tags); !ok {
ro.MetricsFiltered.Incr(1)
return
}
// error is not possible if creating from another metric, so ignore.
m, _ = metric.New(name, tags, fields, t, tp)
ro.Config.Filter.Modify(metric)
if len(metric.FieldList()) == 0 {
return
}
if output, ok := ro.Output.(telegraf.AggregatingOutput); ok {
ro.aggMutex.Lock()
output.Add(m)
output.Add(metric)
ro.aggMutex.Unlock()
return
}
ro.metrics.Add(m)
ro.metrics.Add(metric)
if ro.metrics.Len() == ro.MetricBatchSize {
batch := ro.metrics.Batch(ro.MetricBatchSize)
err := ro.write(batch)
@@ -206,9 +200,3 @@ func (ro *RunningOutput) write(metrics []telegraf.Metric) error {
}
return err
}
// OutputConfig containing name and filter
type OutputConfig struct {
Name string
Filter Filter
}

View File

@@ -75,23 +75,6 @@ func BenchmarkRunningOutputAddFailWrites(b *testing.B) {
}
}
func TestAddingNilMetric(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{},
}
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf, 1000, 10000)
ro.AddMetric(nil)
ro.AddMetric(nil)
ro.AddMetric(nil)
err := ro.Write()
assert.NoError(t, err)
assert.Len(t, m.Metrics(), 0)
}
// Test that NameDrop filters ger properly applied.
func TestRunningOutput_DropFilter(t *testing.T) {
conf := &OutputConfig{

View File

@@ -34,14 +34,18 @@ func (rp *RunningProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {
ret := []telegraf.Metric{}
for _, metric := range in {
if rp.Config.Filter.IsActive() {
// check if the filter should be applied to this metric
if ok := rp.Config.Filter.Apply(metric.Name(), metric.Fields(), metric.Tags()); !ok {
// this means filter should not be applied
ret = append(ret, metric)
continue
}
// In processors when a filter selects a metric it is sent through the
// processor. Otherwise the metric continues downstream unmodified.
if ok := rp.Config.Filter.Select(metric); !ok {
ret = append(ret, metric)
continue
}
rp.Config.Filter.Modify(metric)
if len(metric.FieldList()) == 0 {
continue
}
// This metric should pass through the filter, so call the filter Apply
// function and append results to the output slice.
ret = append(ret, rp.Processor.Apply(metric)...)

View File

@@ -1,117 +1,203 @@
package models
import (
"sort"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type TestProcessor struct {
// MockProcessor is a Processor with an overrideable Apply implementation.
type MockProcessor struct {
ApplyF func(in ...telegraf.Metric) []telegraf.Metric
}
func (f *TestProcessor) SampleConfig() string { return "" }
func (f *TestProcessor) Description() string { return "" }
// Apply renames:
// "foo" to "fuz"
// "bar" to "baz"
// And it also drops measurements named "dropme"
func (f *TestProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {
out := make([]telegraf.Metric, 0)
for _, m := range in {
switch m.Name() {
case "foo":
out = append(out, testutil.TestMetric(1, "fuz"))
case "bar":
out = append(out, testutil.TestMetric(1, "baz"))
case "dropme":
// drop the metric!
default:
out = append(out, m)
}
}
return out
func (p *MockProcessor) SampleConfig() string {
return ""
}
func NewTestRunningProcessor() *RunningProcessor {
out := &RunningProcessor{
Name: "test",
Processor: &TestProcessor{},
Config: &ProcessorConfig{Filter: Filter{}},
}
return out
func (p *MockProcessor) Description() string {
return ""
}
func TestRunningProcessor(t *testing.T) {
inmetrics := []telegraf.Metric{
testutil.TestMetric(1, "foo"),
testutil.TestMetric(1, "bar"),
testutil.TestMetric(1, "baz"),
}
expectedNames := []string{
"fuz",
"baz",
"baz",
}
rfp := NewTestRunningProcessor()
filteredMetrics := rfp.Apply(inmetrics...)
actualNames := []string{
filteredMetrics[0].Name(),
filteredMetrics[1].Name(),
filteredMetrics[2].Name(),
}
assert.Equal(t, expectedNames, actualNames)
func (p *MockProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {
return p.ApplyF(in...)
}
func TestRunningProcessor_WithNameDrop(t *testing.T) {
inmetrics := []telegraf.Metric{
testutil.TestMetric(1, "foo"),
testutil.TestMetric(1, "bar"),
testutil.TestMetric(1, "baz"),
// TagProcessor returns a Processor whose Apply function adds the tag and
// value.
func TagProcessor(key, value string) *MockProcessor {
return &MockProcessor{
ApplyF: func(in ...telegraf.Metric) []telegraf.Metric {
for _, m := range in {
m.AddTag(key, value)
}
return in
},
}
expectedNames := []string{
"foo",
"baz",
"baz",
}
rfp := NewTestRunningProcessor()
rfp.Config.Filter.NameDrop = []string{"foo"}
assert.NoError(t, rfp.Config.Filter.Compile())
filteredMetrics := rfp.Apply(inmetrics...)
actualNames := []string{
filteredMetrics[0].Name(),
filteredMetrics[1].Name(),
filteredMetrics[2].Name(),
}
assert.Equal(t, expectedNames, actualNames)
}
func TestRunningProcessor_DroppedMetric(t *testing.T) {
inmetrics := []telegraf.Metric{
testutil.TestMetric(1, "dropme"),
testutil.TestMetric(1, "foo"),
testutil.TestMetric(1, "bar"),
func Metric(
name string,
tags map[string]string,
fields map[string]interface{},
tm time.Time,
tp ...telegraf.ValueType,
) telegraf.Metric {
m, err := metric.New(name, tags, fields, tm, tp...)
if err != nil {
panic(err)
}
expectedNames := []string{
"fuz",
"baz",
}
rfp := NewTestRunningProcessor()
filteredMetrics := rfp.Apply(inmetrics...)
actualNames := []string{
filteredMetrics[0].Name(),
filteredMetrics[1].Name(),
}
assert.Equal(t, expectedNames, actualNames)
return m
}
func TestRunningProcessor_Apply(t *testing.T) {
type args struct {
Processor telegraf.Processor
Config *ProcessorConfig
}
tests := []struct {
name string
args args
input []telegraf.Metric
expected []telegraf.Metric
}{
{
name: "inactive filter applies metrics",
args: args{
Processor: TagProcessor("apply", "true"),
Config: &ProcessorConfig{
Filter: Filter{},
},
},
input: []telegraf.Metric{
Metric(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
expected: []telegraf.Metric{
Metric(
"cpu",
map[string]string{
"apply": "true",
},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
},
{
name: "filter applies",
args: args{
Processor: TagProcessor("apply", "true"),
Config: &ProcessorConfig{
Filter: Filter{
NamePass: []string{"cpu"},
},
},
},
input: []telegraf.Metric{
Metric(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
expected: []telegraf.Metric{
Metric(
"cpu",
map[string]string{
"apply": "true",
},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
},
{
name: "filter doesn't apply",
args: args{
Processor: TagProcessor("apply", "true"),
Config: &ProcessorConfig{
Filter: Filter{
NameDrop: []string{"cpu"},
},
},
},
input: []telegraf.Metric{
Metric(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
expected: []telegraf.Metric{
Metric(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rp := &RunningProcessor{
Processor: tt.args.Processor,
Config: tt.args.Config,
}
rp.Config.Filter.Compile()
actual := rp.Apply(tt.input...)
require.Equal(t, tt.expected, actual)
})
}
}
func TestRunningProcessor_Order(t *testing.T) {
rp1 := &RunningProcessor{
Config: &ProcessorConfig{
Order: 1,
},
}
rp2 := &RunningProcessor{
Config: &ProcessorConfig{
Order: 2,
},
}
rp3 := &RunningProcessor{
Config: &ProcessorConfig{
Order: 3,
},
}
procs := RunningProcessors{rp2, rp3, rp1}
sort.Sort(procs)
require.Equal(t,
RunningProcessors{rp1, rp2, rp3},
procs)
}