From 7553c8fd1362618a11f04c5578e8c9feb4e9c73d Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Fri, 28 Sep 2018 14:48:20 -0700 Subject: [PATCH] Remove metric recreation when filtering (#4767) --- agent/accumulator.go | 56 ++--- agent/accumulator_test.go | 26 +- docs/CONFIGURATION.md | 127 ++++++---- internal/config/config.go | 15 -- internal/models/filter.go | 129 +++++----- internal/models/filter_test.go | 176 +++++++------ internal/models/makemetric.go | 80 ++---- internal/models/running_aggregator.go | 62 ++--- internal/models/running_aggregator_test.go | 82 +++--- internal/models/running_input.go | 37 ++- internal/models/running_input_test.go | 98 ++++---- internal/models/running_output.go | 42 ++-- internal/models/running_output_test.go | 17 -- internal/models/running_processor.go | 18 +- internal/models/running_processor_test.go | 276 ++++++++++++++------- 15 files changed, 635 insertions(+), 606 deletions(-) diff --git a/agent/accumulator.go b/agent/accumulator.go index 51c213a81..05e99350b 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -5,6 +5,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/selfstat" ) @@ -14,13 +15,13 @@ var ( type MetricMaker interface { Name() string - MakeMetric( - measurement string, - fields map[string]interface{}, - tags map[string]string, - mType telegraf.ValueType, - t time.Time, - ) telegraf.Metric + MakeMetric(metric telegraf.Metric) telegraf.Metric +} + +type accumulator struct { + maker MetricMaker + metrics chan telegraf.Metric + precision time.Duration } func NewAccumulator( @@ -35,23 +36,13 @@ func NewAccumulator( return &acc } -type accumulator struct { - metrics chan telegraf.Metric - - maker MetricMaker - - precision time.Duration -} - func (ac *accumulator) AddFields( measurement string, fields map[string]interface{}, tags map[string]string, t ...time.Time, ) { - if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Untyped, ac.getTime(t)); m != nil { - ac.metrics <- m - } + ac.addMetric(measurement, tags, fields, telegraf.Untyped, t...) } func (ac *accumulator) AddGauge( @@ -60,9 +51,7 @@ func (ac *accumulator) AddGauge( tags map[string]string, t ...time.Time, ) { - if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Gauge, ac.getTime(t)); m != nil { - ac.metrics <- m - } + ac.addMetric(measurement, tags, fields, telegraf.Gauge, t...) } func (ac *accumulator) AddCounter( @@ -71,9 +60,7 @@ func (ac *accumulator) AddCounter( tags map[string]string, t ...time.Time, ) { - if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Counter, ac.getTime(t)); m != nil { - ac.metrics <- m - } + ac.addMetric(measurement, tags, fields, telegraf.Counter, t...) } func (ac *accumulator) AddSummary( @@ -82,9 +69,7 @@ func (ac *accumulator) AddSummary( tags map[string]string, t ...time.Time, ) { - if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Summary, ac.getTime(t)); m != nil { - ac.metrics <- m - } + ac.addMetric(measurement, tags, fields, telegraf.Summary, t...) } func (ac *accumulator) AddHistogram( @@ -93,7 +78,21 @@ func (ac *accumulator) AddHistogram( tags map[string]string, t ...time.Time, ) { - if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Histogram, ac.getTime(t)); m != nil { + ac.addMetric(measurement, tags, fields, telegraf.Histogram, t...) +} + +func (ac *accumulator) addMetric( + measurement string, + tags map[string]string, + fields map[string]interface{}, + tp telegraf.ValueType, + t ...time.Time, +) { + m, err := metric.New(measurement, tags, fields, ac.getTime(t), tp) + if err != nil { + return + } + if m := ac.maker.MakeMetric(m); m != nil { ac.metrics <- m } } @@ -105,7 +104,6 @@ func (ac *accumulator) AddError(err error) { return } NErrors.Incr(1) - //TODO suppress/throttle consecutive duplicate errors? log.Printf("E! Error in plugin [%s]: %s", ac.maker.Name(), err) } diff --git a/agent/accumulator_test.go b/agent/accumulator_test.go index 22fa3e409..2bb08920f 100644 --- a/agent/accumulator_test.go +++ b/agent/accumulator_test.go @@ -9,7 +9,6 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/metric" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -134,26 +133,7 @@ type TestMetricMaker struct { func (tm *TestMetricMaker) Name() string { return "TestPlugin" } -func (tm *TestMetricMaker) MakeMetric( - measurement string, - fields map[string]interface{}, - tags map[string]string, - mType telegraf.ValueType, - t time.Time, -) telegraf.Metric { - switch mType { - case telegraf.Untyped: - if m, err := metric.New(measurement, tags, fields, t); err == nil { - return m - } - case telegraf.Counter: - if m, err := metric.New(measurement, tags, fields, t, telegraf.Counter); err == nil { - return m - } - case telegraf.Gauge: - if m, err := metric.New(measurement, tags, fields, t, telegraf.Gauge); err == nil { - return m - } - } - return nil + +func (tm *TestMetricMaker) MakeMetric(metric telegraf.Metric) telegraf.Metric { + return metric } diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 39825376d..0698721e4 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -1,11 +1,14 @@ -# Telegraf Configuration +# Configuration -You can see the latest config file with all available plugins here: -[telegraf.conf](https://github.com/influxdata/telegraf/blob/master/etc/telegraf.conf) +Telegraf's configuration file is written using +[TOML](https://github.com/toml-lang/toml#toml). + +[View the telegraf.conf config file with all available +plugins](/etc/telegraf.conf). ## Generating a Configuration File -A default Telegraf config file can be auto-generated by telegraf: +A default config file can be generated by telegraf: ``` telegraf config > telegraf.conf @@ -18,7 +21,7 @@ To generate a file with specific inputs and outputs, you can use the telegraf --input-filter cpu:mem:net:swap --output-filter influxdb:kafka config ``` -## Environment Variables +### Environment Variables Environment variables can be used anywhere in the config file, simply prepend them with $. For strings the variable must be within quotes (ie, "$STR_VAR"), @@ -27,7 +30,7 @@ for numbers and booleans they should be plain (ie, $INT_VAR, $BOOL_VAR) When using the `.deb` or `.rpm` packages, you can define environment variables in the `/etc/default/telegraf` file. -## Configuration file locations +### Configuration file locations The location of the configuration file can be set via the `--config` command line flag. @@ -40,13 +43,13 @@ On most systems, the default locations are `/etc/telegraf/telegraf.conf` for the main configuration file and `/etc/telegraf/telegraf.d` for the directory of configuration files. -# Global Tags +### Global Tags Global tags can be specified in the `[global_tags]` section of the config file in key="value" format. All metrics being gathered on this host will be tagged with the tags specified here. -## Agent Configuration +### Agent Configuration Telegraf has a few options you can configure under the `[agent]` section of the config. @@ -85,7 +88,7 @@ ie, a jitter of 5s and flush_interval 10s means flushes will happen every 10-15s * **hostname**: Override default hostname, if empty use os.Hostname(). * **omit_hostname**: If true, do no set the "host" tag in the telegraf agent. -## Input Configuration +### Input Configuration The following config parameters are available for all inputs: @@ -98,15 +101,15 @@ you can configure that here. * **name_suffix**: Specifies a suffix to attach to the measurement name. * **tags**: A map of tags to apply to a specific input's measurements. -The [measurement filtering](#measurement-filtering) parameters can be used to -limit what metrics are emitted from the input plugin. +The [metric filtering](#metric-filtering) parameters can be used to limit what metrics are +emitted from the input plugin. -## Output Configuration +### Output Configuration -The [measurement filtering](#measurement-filtering) parameters can be used to -limit what metrics are emitted from the output plugin. +The [metric filtering](#metric-filtering) parameters can be used to limit what metrics are +emitted from the output plugin. -## Aggregator Configuration +### Aggregator Configuration The following config parameters are available for all aggregators: @@ -125,63 +128,77 @@ aggregator and will not get sent to the output plugins. * **name_suffix**: Specifies a suffix to attach to the measurement name. * **tags**: A map of tags to apply to a specific input's measurements. -The [measurement filtering](#measurement-filtering) parameters can be used to -limit what metrics are handled by the aggregator. Excluded metrics are passed -downstream to the next aggregator. +The [metric filtering](#metric-filtering) parameters can be used to limit what metrics are +handled by the aggregator. Excluded metrics are passed downstream to the next +aggregator. -## Processor Configuration +### Processor Configuration The following config parameters are available for all processors: * **order**: This is the order in which the processor(s) get executed. If this is not specified then processor execution order will be random. -The [measurement filtering](#measurement-filtering) parameters can be used -to limit what metrics are handled by the processor. Excluded metrics are -passed downstream to the next processor. +The [metric filtering](#metric-filtering) parameters can be used to limit what metrics are +handled by the processor. Excluded metrics are passed downstream to the next +processor. -#### Measurement Filtering + +### Metric Filtering -Filters can be configured per input, output, processor, or aggregator, -see below for examples. +Metric filtering can be configured per plugin on any input, output, processor, +and aggregator plugin. Filters fall under two categories: Selectors and +Modifiers. -* **namepass**: -An array of glob pattern strings. Only points whose measurement name matches +#### Selectors + +Selector filters include or exclude entire metrics. When a metric is excluded +from a Input or an Output plugin, the metric is dropped. If a metric is +excluded from a Processor or Aggregator plugin, it is skips the plugin and is +sent onwards to the next stage of processing. + +- **namepass**: +An array of glob pattern strings. Only metrics whose measurement name matches a pattern in this list are emitted. -* **namedrop**: -The inverse of `namepass`. If a match is found the point is discarded. This -is tested on points after they have passed the `namepass` test. -* **fieldpass**: -An array of glob pattern strings. Only fields whose field key matches a -pattern in this list are emitted. -* **fielddrop**: -The inverse of `fieldpass`. Fields with a field key matching one of the -patterns will be discarded from the point. This is tested on points after -they have passed the `fieldpass` test. -* **tagpass**: -A table mapping tag keys to arrays of glob pattern strings. Only points + +- **namedrop**: +The inverse of `namepass`. If a match is found the metric is discarded. This +is tested on metrics after they have passed the `namepass` test. + +- **tagpass**: +A table mapping tag keys to arrays of glob pattern strings. Only metrics that contain a tag key in the table and a tag value matching one of its patterns is emitted. -* **tagdrop**: -The inverse of `tagpass`. If a match is found the point is discarded. This -is tested on points after they have passed the `tagpass` test. -* **taginclude**: + +- **tagdrop**: +The inverse of `tagpass`. If a match is found the metric is discarded. This +is tested on metrics after they have passed the `tagpass` test. + +#### Modifiers + +Modifier filters remove tags and fields from a metric. If all fields are +removed the metric is removed. + +- **fieldpass**: +An array of glob pattern strings. Only fields whose field key matches a +pattern in this list are emitted. + +- **fielddrop**: +The inverse of `fieldpass`. Fields with a field key matching one of the +patterns will be discarded from the metric. This is tested on metrics after +they have passed the `fieldpass` test. + +- **taginclude**: An array of glob pattern strings. Only tags with a tag key matching one of the patterns are emitted. In contrast to `tagpass`, which will pass an entire -point based on its tag, `taginclude` removes all non matching tags from the -point. This filter can be used on both inputs & outputs, but it is -_recommended_ to be used on inputs, as it is more efficient to filter out tags -at the ingestion point. -* **tagexclude**: +metric based on its tag, `taginclude` removes all non matching tags from the +metric. + +- **tagexclude**: The inverse of `taginclude`. Tags with a tag key matching one of the patterns -will be discarded from the point. +will be discarded from the metric. -**NOTE** Due to the way TOML is parsed, `tagpass` and `tagdrop` parameters -must be defined at the _end_ of the plugin definition, otherwise subsequent -plugin config options will be interpreted as part of the tagpass/tagdrop -tables. - -#### Input Configuration Examples +### Input Configuration Examples This is a full working config that will output CPU data to an InfluxDB instance at 192.168.59.103:8086, tagging measurements with dc="denver-1". It will output diff --git a/internal/config/config.go b/internal/config/config.go index d62536cf9..3d0510978 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -884,14 +884,6 @@ func (c *Config) addInput(name string, table *ast.Table) error { // builds the filter and returns a // models.AggregatorConfig to be inserted into models.RunningAggregator func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, error) { - unsupportedFields := []string{"tagexclude", "taginclude"} - for _, field := range unsupportedFields { - if _, ok := tbl.Fields[field]; ok { - return nil, fmt.Errorf("%s is not supported for aggregator plugins (%s).", - field, name) - } - } - conf := &models.AggregatorConfig{ Name: name, Delay: time.Millisecond * 100, @@ -989,13 +981,6 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err // models.ProcessorConfig to be inserted into models.RunningProcessor func buildProcessor(name string, tbl *ast.Table) (*models.ProcessorConfig, error) { conf := &models.ProcessorConfig{Name: name} - unsupportedFields := []string{"tagexclude", "taginclude", "fielddrop", "fieldpass"} - for _, field := range unsupportedFields { - if _, ok := tbl.Fields[field]; ok { - return nil, fmt.Errorf("%s is not supported for processor plugins (%s).", - field, name) - } - } if node, ok := tbl.Fields["order"]; ok { if kv, ok := node.(*ast.KeyValue); ok { diff --git a/internal/models/filter.go b/internal/models/filter.go index 2848ccf09..664a6ff06 100644 --- a/internal/models/filter.go +++ b/internal/models/filter.go @@ -3,6 +3,7 @@ package models import ( "fmt" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/filter" ) @@ -93,45 +94,35 @@ func (f *Filter) Compile() error { return nil } -// Apply applies the filter to the given measurement name, fields map, and -// tags map. It will return false if the metric should be "filtered out", and -// true if the metric should "pass". -// It will modify tags & fields in-place if they need to be deleted. -func (f *Filter) Apply( - measurement string, - fields map[string]interface{}, - tags map[string]string, -) bool { +// Select returns true if the metric matches according to the +// namepass/namedrop and tagpass/tagdrop filters. The metric is not modified. +func (f *Filter) Select(metric telegraf.Metric) bool { if !f.isActive { return true } - // check if the measurement name should pass - if !f.shouldNamePass(measurement) { + if !f.shouldNamePass(metric.Name()) { return false } - // check if the tags should pass - if !f.shouldTagsPass(tags) { + if !f.shouldTagsPass(metric.TagList()) { return false } - // filter fields - for fieldkey, _ := range fields { - if !f.shouldFieldPass(fieldkey) { - delete(fields, fieldkey) - } - } - if len(fields) == 0 { - return false - } - - // filter tags - f.filterTags(tags) - return true } +// Modify removes any tags and fields from the metric according to the +// fieldpass/fielddrop and taginclude/tagexclude filters. +func (f *Filter) Modify(metric telegraf.Metric) { + if !f.isActive { + return + } + + f.filterFields(metric) + f.filterTags(metric) +} + // IsActive checking if filter is active func (f *Filter) IsActive() bool { return f.isActive @@ -140,7 +131,6 @@ func (f *Filter) IsActive() bool { // shouldNamePass returns true if the metric should pass, false if should drop // based on the drop/pass filter parameters func (f *Filter) shouldNamePass(key string) bool { - pass := func(f *Filter) bool { if f.namePass.Match(key) { return true @@ -169,44 +159,29 @@ func (f *Filter) shouldNamePass(key string) bool { // shouldFieldPass returns true if the metric should pass, false if should drop // based on the drop/pass filter parameters func (f *Filter) shouldFieldPass(key string) bool { - - pass := func(f *Filter) bool { - if f.fieldPass.Match(key) { - return true - } - return false - } - - drop := func(f *Filter) bool { - if f.fieldDrop.Match(key) { - return false - } - return true - } - if f.fieldPass != nil && f.fieldDrop != nil { - return pass(f) && drop(f) + return f.fieldPass.Match(key) && !f.fieldDrop.Match(key) } else if f.fieldPass != nil { - return pass(f) + return f.fieldPass.Match(key) } else if f.fieldDrop != nil { - return drop(f) + return !f.fieldDrop.Match(key) } - return true } // shouldTagsPass returns true if the metric should pass, false if should drop // based on the tagdrop/tagpass filter parameters -func (f *Filter) shouldTagsPass(tags map[string]string) bool { - +func (f *Filter) shouldTagsPass(tags []*telegraf.Tag) bool { pass := func(f *Filter) bool { for _, pat := range f.TagPass { if pat.filter == nil { continue } - if tagval, ok := tags[pat.Name]; ok { - if pat.filter.Match(tagval) { - return true + for _, tag := range tags { + if tag.Key == pat.Name { + if pat.filter.Match(tag.Value) { + return true + } } } } @@ -218,9 +193,11 @@ func (f *Filter) shouldTagsPass(tags map[string]string) bool { if pat.filter == nil { continue } - if tagval, ok := tags[pat.Name]; ok { - if pat.filter.Match(tagval) { - return false + for _, tag := range tags { + if tag.Key == pat.Name { + if pat.filter.Match(tag.Value) { + return false + } } } } @@ -242,22 +219,42 @@ func (f *Filter) shouldTagsPass(tags map[string]string) bool { return true } -// Apply TagInclude and TagExclude filters. -// modifies the tags map in-place. -func (f *Filter) filterTags(tags map[string]string) { - if f.tagInclude != nil { - for k, _ := range tags { - if !f.tagInclude.Match(k) { - delete(tags, k) - } +// filterFields removes fields according to fieldpass/fielddrop. +func (f *Filter) filterFields(metric telegraf.Metric) { + filterKeys := []string{} + for _, field := range metric.FieldList() { + if !f.shouldFieldPass(field.Key) { + filterKeys = append(filterKeys, field.Key) } } - if f.tagExclude != nil { - for k, _ := range tags { - if f.tagExclude.Match(k) { - delete(tags, k) + for _, key := range filterKeys { + metric.RemoveField(key) + } +} + +// filterTags removes tags according to taginclude/tagexclude. +func (f *Filter) filterTags(metric telegraf.Metric) { + filterKeys := []string{} + if f.tagInclude != nil { + for _, tag := range metric.TagList() { + if !f.tagInclude.Match(tag.Key) { + filterKeys = append(filterKeys, tag.Key) } } } + for _, key := range filterKeys { + metric.RemoveTag(key) + } + + if f.tagExclude != nil { + for _, tag := range metric.TagList() { + if f.tagExclude.Match(tag.Key) { + filterKeys = append(filterKeys, tag.Key) + } + } + } + for _, key := range filterKeys { + metric.RemoveTag(key) + } } diff --git a/internal/models/filter_test.go b/internal/models/filter_test.go index 46f16e835..16a147cad 100644 --- a/internal/models/filter_test.go +++ b/internal/models/filter_test.go @@ -2,17 +2,24 @@ package models import ( "testing" + "time" - "github.com/stretchr/testify/assert" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/stretchr/testify/require" ) func TestFilter_ApplyEmpty(t *testing.T) { f := Filter{} require.NoError(t, f.Compile()) - assert.False(t, f.IsActive()) + require.False(t, f.IsActive()) - assert.True(t, f.Apply("m", map[string]interface{}{"value": int64(1)}, map[string]string{})) + m, err := metric.New("m", + map[string]string{}, + map[string]interface{}{"value": int64(1)}, + time.Now()) + require.NoError(t, err) + require.True(t, f.Select(m)) } func TestFilter_ApplyTagsDontPass(t *testing.T) { @@ -27,11 +34,14 @@ func TestFilter_ApplyTagsDontPass(t *testing.T) { } require.NoError(t, f.Compile()) require.NoError(t, f.Compile()) - assert.True(t, f.IsActive()) + require.True(t, f.IsActive()) - assert.False(t, f.Apply("m", + m, err := metric.New("m", + map[string]string{"cpu": "cpu-total"}, map[string]interface{}{"value": int64(1)}, - map[string]string{"cpu": "cpu-total"})) + time.Now()) + require.NoError(t, err) + require.False(t, f.Select(m)) } func TestFilter_ApplyDeleteFields(t *testing.T) { @@ -40,11 +50,19 @@ func TestFilter_ApplyDeleteFields(t *testing.T) { } require.NoError(t, f.Compile()) require.NoError(t, f.Compile()) - assert.True(t, f.IsActive()) + require.True(t, f.IsActive()) - fields := map[string]interface{}{"value": int64(1), "value2": int64(2)} - assert.True(t, f.Apply("m", fields, nil)) - assert.Equal(t, map[string]interface{}{"value2": int64(2)}, fields) + m, err := metric.New("m", + map[string]string{}, + map[string]interface{}{ + "value": int64(1), + "value2": int64(2), + }, + time.Now()) + require.NoError(t, err) + require.True(t, f.Select(m)) + f.Modify(m) + require.Equal(t, map[string]interface{}{"value2": int64(2)}, m.Fields()) } func TestFilter_ApplyDeleteAllFields(t *testing.T) { @@ -53,10 +71,19 @@ func TestFilter_ApplyDeleteAllFields(t *testing.T) { } require.NoError(t, f.Compile()) require.NoError(t, f.Compile()) - assert.True(t, f.IsActive()) + require.True(t, f.IsActive()) - fields := map[string]interface{}{"value": int64(1), "value2": int64(2)} - assert.False(t, f.Apply("m", fields, nil)) + m, err := metric.New("m", + map[string]string{}, + map[string]interface{}{ + "value": int64(1), + "value2": int64(2), + }, + time.Now()) + require.NoError(t, err) + require.True(t, f.Select(m)) + f.Modify(m) + require.Len(t, m.FieldList(), 0) } func TestFilter_Empty(t *testing.T) { @@ -230,20 +257,20 @@ func TestFilter_TagPass(t *testing.T) { } require.NoError(t, f.Compile()) - passes := []map[string]string{ - {"cpu": "cpu-total"}, - {"cpu": "cpu-0"}, - {"cpu": "cpu-1"}, - {"cpu": "cpu-2"}, - {"mem": "mem_free"}, + passes := [][]*telegraf.Tag{ + []*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu-total"}}, + []*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu-0"}}, + []*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu-1"}}, + []*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu-2"}}, + []*telegraf.Tag{&telegraf.Tag{Key: "mem", Value: "mem_free"}}, } - drops := []map[string]string{ - {"cpu": "cputotal"}, - {"cpu": "cpu0"}, - {"cpu": "cpu1"}, - {"cpu": "cpu2"}, - {"mem": "mem_used"}, + drops := [][]*telegraf.Tag{ + []*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cputotal"}}, + []*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu0"}}, + []*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu1"}}, + []*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu2"}}, + []*telegraf.Tag{&telegraf.Tag{Key: "mem", Value: "mem_used"}}, } for _, tags := range passes { @@ -274,20 +301,20 @@ func TestFilter_TagDrop(t *testing.T) { } require.NoError(t, f.Compile()) - drops := []map[string]string{ - {"cpu": "cpu-total"}, - {"cpu": "cpu-0"}, - {"cpu": "cpu-1"}, - {"cpu": "cpu-2"}, - {"mem": "mem_free"}, + drops := [][]*telegraf.Tag{ + []*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu-total"}}, + []*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu-0"}}, + []*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu-1"}}, + []*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu-2"}}, + []*telegraf.Tag{&telegraf.Tag{Key: "mem", Value: "mem_free"}}, } - passes := []map[string]string{ - {"cpu": "cputotal"}, - {"cpu": "cpu0"}, - {"cpu": "cpu1"}, - {"cpu": "cpu2"}, - {"mem": "mem_used"}, + passes := [][]*telegraf.Tag{ + []*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cputotal"}}, + []*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu0"}}, + []*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu1"}}, + []*telegraf.Tag{&telegraf.Tag{Key: "cpu", Value: "cpu2"}}, + []*telegraf.Tag{&telegraf.Tag{Key: "mem", Value: "mem_used"}}, } for _, tags := range passes { @@ -304,58 +331,70 @@ func TestFilter_TagDrop(t *testing.T) { } func TestFilter_FilterTagsNoMatches(t *testing.T) { - pretags := map[string]string{ - "host": "localhost", - "mytag": "foobar", - } + m, err := metric.New("m", + map[string]string{ + "host": "localhost", + "mytag": "foobar", + }, + map[string]interface{}{"value": int64(1)}, + time.Now()) + require.NoError(t, err) f := Filter{ TagExclude: []string{"nomatch"}, } require.NoError(t, f.Compile()) - f.filterTags(pretags) - assert.Equal(t, map[string]string{ + f.filterTags(m) + require.Equal(t, map[string]string{ "host": "localhost", "mytag": "foobar", - }, pretags) + }, m.Tags()) f = Filter{ TagInclude: []string{"nomatch"}, } require.NoError(t, f.Compile()) - f.filterTags(pretags) - assert.Equal(t, map[string]string{}, pretags) + f.filterTags(m) + require.Equal(t, map[string]string{}, m.Tags()) } func TestFilter_FilterTagsMatches(t *testing.T) { - pretags := map[string]string{ - "host": "localhost", - "mytag": "foobar", - } + m, err := metric.New("m", + map[string]string{ + "host": "localhost", + "mytag": "foobar", + }, + map[string]interface{}{"value": int64(1)}, + time.Now()) + require.NoError(t, err) f := Filter{ TagExclude: []string{"ho*"}, } require.NoError(t, f.Compile()) - f.filterTags(pretags) - assert.Equal(t, map[string]string{ + f.filterTags(m) + require.Equal(t, map[string]string{ "mytag": "foobar", - }, pretags) + }, m.Tags()) - pretags = map[string]string{ - "host": "localhost", - "mytag": "foobar", - } + m, err = metric.New("m", + map[string]string{ + "host": "localhost", + "mytag": "foobar", + }, + map[string]interface{}{"value": int64(1)}, + time.Now()) + require.NoError(t, err) f = Filter{ TagInclude: []string{"my*"}, } require.NoError(t, f.Compile()) - f.filterTags(pretags) - assert.Equal(t, map[string]string{ + f.filterTags(m) + require.Equal(t, map[string]string{ "mytag": "foobar", - }, pretags) + }, m.Tags()) } // TestFilter_FilterNamePassAndDrop used for check case when @@ -374,7 +413,7 @@ func TestFilter_FilterNamePassAndDrop(t *testing.T) { require.NoError(t, f.Compile()) for i, name := range inputData { - assert.Equal(t, f.shouldNamePass(name), expectedResult[i]) + require.Equal(t, f.shouldNamePass(name), expectedResult[i]) } } @@ -394,7 +433,7 @@ func TestFilter_FilterFieldPassAndDrop(t *testing.T) { require.NoError(t, f.Compile()) for i, field := range inputData { - assert.Equal(t, f.shouldFieldPass(field), expectedResult[i]) + require.Equal(t, f.shouldFieldPass(field), expectedResult[i]) } } @@ -402,12 +441,11 @@ func TestFilter_FilterFieldPassAndDrop(t *testing.T) { // both parameters were defined // see: https://github.com/influxdata/telegraf/issues/2860 func TestFilter_FilterTagsPassAndDrop(t *testing.T) { - - inputData := []map[string]string{ - {"tag1": "1", "tag2": "3"}, - {"tag1": "1", "tag2": "2"}, - {"tag1": "2", "tag2": "1"}, - {"tag1": "4", "tag2": "1"}, + inputData := [][]*telegraf.Tag{ + []*telegraf.Tag{&telegraf.Tag{Key: "tag1", Value: "1"}, &telegraf.Tag{Key: "tag2", Value: "3"}}, + []*telegraf.Tag{&telegraf.Tag{Key: "tag1", Value: "1"}, &telegraf.Tag{Key: "tag2", Value: "2"}}, + []*telegraf.Tag{&telegraf.Tag{Key: "tag1", Value: "2"}, &telegraf.Tag{Key: "tag2", Value: "1"}}, + []*telegraf.Tag{&telegraf.Tag{Key: "tag1", Value: "4"}, &telegraf.Tag{Key: "tag2", Value: "1"}}, } expectedResult := []bool{false, true, false, false} @@ -438,7 +476,7 @@ func TestFilter_FilterTagsPassAndDrop(t *testing.T) { require.NoError(t, f.Compile()) for i, tag := range inputData { - assert.Equal(t, f.shouldTagsPass(tag), expectedResult[i]) + require.Equal(t, f.shouldTagsPass(tag), expectedResult[i]) } } diff --git a/internal/models/makemetric.go b/internal/models/makemetric.go index b74e236cd..29ef5f452 100644 --- a/internal/models/makemetric.go +++ b/internal/models/makemetric.go @@ -1,86 +1,42 @@ package models import ( - "log" - "time" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/metric" ) -// makemetric is used by both RunningAggregator & RunningInput -// to make metrics. -// nameOverride: override the name of the measurement being made. -// namePrefix: add this prefix to each measurement name. -// nameSuffix: add this suffix to each measurement name. -// pluginTags: these are tags that are specific to this plugin. -// daemonTags: these are daemon-wide global tags, and get applied after pluginTags. -// filter: this is a filter to apply to each metric being made. -// applyFilter: if false, the above filter is not applied to each metric. -// This is used by Aggregators, because aggregators use filters -// on incoming metrics instead of on created metrics. -// TODO refactor this to not have such a huge func signature. +// Makemetric applies new metric plugin and agent measurement and tag +// settings. func makemetric( - measurement string, - fields map[string]interface{}, - tags map[string]string, + metric telegraf.Metric, nameOverride string, namePrefix string, nameSuffix string, - pluginTags map[string]string, - daemonTags map[string]string, - filter Filter, - applyFilter bool, - mType telegraf.ValueType, - t time.Time, + tags map[string]string, + globalTags map[string]string, ) telegraf.Metric { - if len(fields) == 0 || len(measurement) == 0 { - return nil - } - if tags == nil { - tags = make(map[string]string) + if len(nameOverride) != 0 { + metric.SetName(nameOverride) } - // Override measurement name if set - if len(nameOverride) != 0 { - measurement = nameOverride - } - // Apply measurement prefix and suffix if set if len(namePrefix) != 0 { - measurement = namePrefix + measurement + metric.AddPrefix(namePrefix) } if len(nameSuffix) != 0 { - measurement = measurement + nameSuffix + metric.AddSuffix(nameSuffix) } - // Apply plugin-wide tags if set - for k, v := range pluginTags { - if _, ok := tags[k]; !ok { - tags[k] = v + // Apply plugin-wide tags + for k, v := range tags { + if _, ok := metric.GetTag(k); !ok { + metric.AddTag(k, v) } } - // Apply daemon-wide tags if set - for k, v := range daemonTags { - if _, ok := tags[k]; !ok { - tags[k] = v + // Apply global tags + for k, v := range globalTags { + if _, ok := metric.GetTag(k); !ok { + metric.AddTag(k, v) } } - // Apply the metric filter(s) - // for aggregators, the filter does not get applied when the metric is made. - // instead, the filter is applied to metric incoming into the plugin. - // ie, it gets applied in the RunningAggregator.Apply function. - if applyFilter { - if ok := filter.Apply(measurement, fields, tags); !ok { - return nil - } - } - - m, err := metric.New(measurement, tags, fields, t, mType) - if err != nil { - log.Printf("Error adding point [%s]: %s\n", measurement, err.Error()) - return nil - } - - return m + return metric } diff --git a/internal/models/running_aggregator.go b/internal/models/running_aggregator.go index 8cb04e4f6..960fd3131 100644 --- a/internal/models/running_aggregator.go +++ b/internal/models/running_aggregator.go @@ -5,7 +5,6 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/metric" ) type RunningAggregator struct { @@ -29,47 +28,32 @@ func NewRunningAggregator( } } -// AggregatorConfig containing configuration parameters for the running -// aggregator plugin. +// AggregatorConfig is the common config for all aggregators. type AggregatorConfig struct { - Name string + Name string + DropOriginal bool + Period time.Duration + Delay time.Duration - DropOriginal bool NameOverride string MeasurementPrefix string MeasurementSuffix string Tags map[string]string Filter Filter - - Period time.Duration - Delay time.Duration } func (r *RunningAggregator) Name() string { return "aggregators." + r.Config.Name } -func (r *RunningAggregator) MakeMetric( - measurement string, - fields map[string]interface{}, - tags map[string]string, - mType telegraf.ValueType, - t time.Time, -) telegraf.Metric { +func (r *RunningAggregator) MakeMetric(metric telegraf.Metric) telegraf.Metric { m := makemetric( - measurement, - fields, - tags, + metric, r.Config.NameOverride, r.Config.MeasurementPrefix, r.Config.MeasurementSuffix, r.Config.Tags, - nil, - r.Config.Filter, - false, - mType, - t, - ) + nil) if m != nil { m.SetAggregate(true) @@ -78,27 +62,23 @@ func (r *RunningAggregator) MakeMetric( return m } -// Add applies the given metric to the aggregator. -// Before applying to the plugin, it will run any defined filters on the metric. -// Apply returns true if the original metric should be dropped. -func (r *RunningAggregator) Add(in telegraf.Metric) bool { - if r.Config.Filter.IsActive() { - // check if the aggregator should apply this metric - name := in.Name() - fields := in.Fields() - tags := in.Tags() - t := in.Time() - if ok := r.Config.Filter.Apply(name, fields, tags); !ok { - // aggregator should not apply this metric - return false - } - - in, _ = metric.New(name, tags, fields, t) +// Add a metric to the aggregator and return true if the original metric +// should be dropped. +func (r *RunningAggregator) Add(metric telegraf.Metric) bool { + if ok := r.Config.Filter.Select(metric); !ok { + return false } - r.metrics <- in + r.Config.Filter.Modify(metric) + if len(metric.FieldList()) == 0 { + return r.Config.DropOriginal + } + + r.metrics <- metric + return r.Config.DropOriginal } + func (r *RunningAggregator) add(in telegraf.Metric) { r.a.Add(in) } diff --git a/internal/models/running_aggregator_test.go b/internal/models/running_aggregator_test.go index cf92fe675..34d513646 100644 --- a/internal/models/running_aggregator_test.go +++ b/internal/models/running_aggregator_test.go @@ -7,9 +7,11 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestAdd(t *testing.T) { @@ -25,13 +27,15 @@ func TestAdd(t *testing.T) { acc := testutil.Accumulator{} go ra.Run(&acc, make(chan struct{})) - m := ra.MakeMetric( - "RITest", - map[string]interface{}{"value": int(101)}, + m, err := metric.New("RITest", map[string]string{}, - telegraf.Untyped, + map[string]interface{}{ + "value": int64(101), + }, time.Now().Add(time.Millisecond*150), - ) + telegraf.Untyped) + require.NoError(t, err) + assert.False(t, ra.Add(m)) for { @@ -56,34 +60,37 @@ func TestAddMetricsOutsideCurrentPeriod(t *testing.T) { acc := testutil.Accumulator{} go ra.Run(&acc, make(chan struct{})) - // metric before current period - m := ra.MakeMetric( - "RITest", - map[string]interface{}{"value": int(101)}, + m, err := metric.New("RITest", map[string]string{}, - telegraf.Untyped, + map[string]interface{}{ + "value": int64(101), + }, time.Now().Add(-time.Hour), - ) + telegraf.Untyped) + require.NoError(t, err) + assert.False(t, ra.Add(m)) // metric after current period - m = ra.MakeMetric( - "RITest", - map[string]interface{}{"value": int(101)}, + m, err = metric.New("RITest", map[string]string{}, - telegraf.Untyped, + map[string]interface{}{ + "value": int64(101), + }, time.Now().Add(time.Hour), - ) + telegraf.Untyped) + require.NoError(t, err) assert.False(t, ra.Add(m)) // "now" metric - m = ra.MakeMetric( - "RITest", - map[string]interface{}{"value": int(101)}, + m, err = metric.New("RITest", map[string]string{}, - telegraf.Untyped, + map[string]interface{}{ + "value": int64(101), + }, time.Now().Add(time.Millisecond*50), - ) + telegraf.Untyped) + require.NoError(t, err) assert.False(t, ra.Add(m)) for { @@ -115,13 +122,14 @@ func TestAddAndPushOnePeriod(t *testing.T) { ra.Run(&acc, shutdown) }() - m := ra.MakeMetric( - "RITest", - map[string]interface{}{"value": int(101)}, + m, err := metric.New("RITest", map[string]string{}, - telegraf.Untyped, + map[string]interface{}{ + "value": int64(101), + }, time.Now().Add(time.Millisecond*100), - ) + telegraf.Untyped) + require.NoError(t, err) assert.False(t, ra.Add(m)) for { @@ -146,23 +154,25 @@ func TestAddDropOriginal(t *testing.T) { }) assert.NoError(t, ra.Config.Filter.Compile()) - m := ra.MakeMetric( - "RITest", - map[string]interface{}{"value": int(101)}, + m, err := metric.New("RITest", map[string]string{}, - telegraf.Untyped, + map[string]interface{}{ + "value": int64(101), + }, time.Now(), - ) + telegraf.Untyped) + require.NoError(t, err) assert.True(t, ra.Add(m)) // this metric name doesn't match the filter, so Add will return false - m2 := ra.MakeMetric( - "foobar", - map[string]interface{}{"value": int(101)}, + m2, err := metric.New("foobar", map[string]string{}, - telegraf.Untyped, + map[string]interface{}{ + "value": int64(101), + }, time.Now(), - ) + telegraf.Untyped) + require.NoError(t, err) assert.False(t, ra.Add(m2)) } diff --git a/internal/models/running_input.go b/internal/models/running_input.go index ffe0b5f59..fce2437ca 100644 --- a/internal/models/running_input.go +++ b/internal/models/running_input.go @@ -36,44 +36,39 @@ func NewRunningInput( } } -// InputConfig containing a name, interval, and filter +// InputConfig is the common config for all inputs. type InputConfig struct { - Name string + Name string + Interval time.Duration + NameOverride string MeasurementPrefix string MeasurementSuffix string Tags map[string]string Filter Filter - Interval time.Duration } func (r *RunningInput) Name() string { return "inputs." + r.Config.Name } -// MakeMetric either returns a metric, or returns nil if the metric doesn't -// need to be created (because of filtering, an error, etc.) -func (r *RunningInput) MakeMetric( - measurement string, - fields map[string]interface{}, - tags map[string]string, - mType telegraf.ValueType, - t time.Time, -) telegraf.Metric { +func (r *RunningInput) MakeMetric(metric telegraf.Metric) telegraf.Metric { + if ok := r.Config.Filter.Select(metric); !ok { + return nil + } + + r.Config.Filter.Modify(metric) + if len(metric.FieldList()) == 0 { + return nil + } + m := makemetric( - measurement, - fields, - tags, + metric, r.Config.NameOverride, r.Config.MeasurementPrefix, r.Config.MeasurementSuffix, r.Config.Tags, - r.defaultTags, - r.Config.Filter, - true, - mType, - t, - ) + r.defaultTags) if r.trace && m != nil { s := influx.NewSerializer() diff --git a/internal/models/running_input_test.go b/internal/models/running_input_test.go index 4d016851a..b83f75ea9 100644 --- a/internal/models/running_input_test.go +++ b/internal/models/running_input_test.go @@ -17,13 +17,13 @@ func TestMakeMetricNoFields(t *testing.T) { Name: "TestRunningInput", }) - m := ri.MakeMetric( - "RITest", - map[string]interface{}{}, + m, err := metric.New("RITest", map[string]string{}, - telegraf.Untyped, + map[string]interface{}{}, now, - ) + telegraf.Untyped) + m = ri.MakeMetric(m) + require.NoError(t, err) assert.Nil(t, m) } @@ -34,16 +34,16 @@ func TestMakeMetricNilFields(t *testing.T) { Name: "TestRunningInput", }) - m := ri.MakeMetric( - "RITest", + m, err := metric.New("RITest", + map[string]string{}, map[string]interface{}{ - "value": int(101), + "value": int64(101), "nil": nil, }, - map[string]string{}, - telegraf.Untyped, now, - ) + telegraf.Untyped) + require.NoError(t, err) + m = ri.MakeMetric(m) expected, err := metric.New("RITest", map[string]string{}, @@ -69,13 +69,15 @@ func TestMakeMetricWithPluginTags(t *testing.T) { ri.SetTrace(true) assert.Equal(t, true, ri.Trace()) - m := ri.MakeMetric( - "RITest", - map[string]interface{}{"value": int(101)}, - nil, - telegraf.Untyped, + m, err := metric.New("RITest", + map[string]string{}, + map[string]interface{}{ + "value": int64(101), + }, now, - ) + telegraf.Untyped) + require.NoError(t, err) + m = ri.MakeMetric(m) expected, err := metric.New("RITest", map[string]string{ @@ -104,13 +106,15 @@ func TestMakeMetricFilteredOut(t *testing.T) { assert.Equal(t, true, ri.Trace()) assert.NoError(t, ri.Config.Filter.Compile()) - m := ri.MakeMetric( - "RITest", - map[string]interface{}{"value": int(101)}, - nil, - telegraf.Untyped, + m, err := metric.New("RITest", + map[string]string{}, + map[string]interface{}{ + "value": int64(101), + }, now, - ) + telegraf.Untyped) + m = ri.MakeMetric(m) + require.NoError(t, err) assert.Nil(t, m) } @@ -126,13 +130,15 @@ func TestMakeMetricWithDaemonTags(t *testing.T) { ri.SetTrace(true) assert.Equal(t, true, ri.Trace()) - m := ri.MakeMetric( - "RITest", - map[string]interface{}{"value": int(101)}, + m, err := metric.New("RITest", map[string]string{}, - telegraf.Untyped, + map[string]interface{}{ + "value": int64(101), + }, now, - ) + telegraf.Untyped) + require.NoError(t, err) + m = ri.MakeMetric(m) expected, err := metric.New("RITest", map[string]string{ "foo": "bar", @@ -153,13 +159,15 @@ func TestMakeMetricNameOverride(t *testing.T) { NameOverride: "foobar", }) - m := ri.MakeMetric( - "RITest", - map[string]interface{}{"value": int(101)}, + m, err := metric.New("RITest", map[string]string{}, - telegraf.Untyped, + map[string]interface{}{ + "value": int64(101), + }, now, - ) + telegraf.Untyped) + require.NoError(t, err) + m = ri.MakeMetric(m) expected, err := metric.New("foobar", nil, map[string]interface{}{ @@ -178,13 +186,15 @@ func TestMakeMetricNamePrefix(t *testing.T) { MeasurementPrefix: "foobar_", }) - m := ri.MakeMetric( - "RITest", - map[string]interface{}{"value": int(101)}, + m, err := metric.New("RITest", map[string]string{}, - telegraf.Untyped, + map[string]interface{}{ + "value": int64(101), + }, now, - ) + telegraf.Untyped) + require.NoError(t, err) + m = ri.MakeMetric(m) expected, err := metric.New("foobar_RITest", nil, map[string]interface{}{ @@ -203,13 +213,15 @@ func TestMakeMetricNameSuffix(t *testing.T) { MeasurementSuffix: "_foobar", }) - m := ri.MakeMetric( - "RITest", - map[string]interface{}{"value": int(101)}, + m, err := metric.New("RITest", map[string]string{}, - telegraf.Untyped, + map[string]interface{}{ + "value": int64(101), + }, now, - ) + telegraf.Untyped) + require.NoError(t, err) + m = ri.MakeMetric(m) expected, err := metric.New("RITest_foobar", nil, map[string]interface{}{ diff --git a/internal/models/running_output.go b/internal/models/running_output.go index bad1f7659..0f2c138a6 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -7,7 +7,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/buffer" - "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/selfstat" ) @@ -42,6 +41,12 @@ type RunningOutput struct { writeMutex sync.Mutex } +// OutputConfig containing name and filter +type OutputConfig struct { + Name string + Filter Filter +} + func NewRunningOutput( name string, output telegraf.Output, @@ -95,36 +100,25 @@ func NewRunningOutput( // AddMetric adds a metric to the output. This function can also write cached // points if FlushBufferWhenFull is true. -func (ro *RunningOutput) AddMetric(m telegraf.Metric) { - - if m == nil { +func (ro *RunningOutput) AddMetric(metric telegraf.Metric) { + if ok := ro.Config.Filter.Select(metric); !ok { + ro.MetricsFiltered.Incr(1) return } - // Filter any tagexclude/taginclude parameters before adding metric - if ro.Config.Filter.IsActive() { - // In order to filter out tags, we need to create a new metric, since - // metrics are immutable once created. - name := m.Name() - tags := m.Tags() - fields := m.Fields() - t := m.Time() - tp := m.Type() - if ok := ro.Config.Filter.Apply(name, fields, tags); !ok { - ro.MetricsFiltered.Incr(1) - return - } - // error is not possible if creating from another metric, so ignore. - m, _ = metric.New(name, tags, fields, t, tp) + + ro.Config.Filter.Modify(metric) + if len(metric.FieldList()) == 0 { + return } if output, ok := ro.Output.(telegraf.AggregatingOutput); ok { ro.aggMutex.Lock() - output.Add(m) + output.Add(metric) ro.aggMutex.Unlock() return } - ro.metrics.Add(m) + ro.metrics.Add(metric) if ro.metrics.Len() == ro.MetricBatchSize { batch := ro.metrics.Batch(ro.MetricBatchSize) err := ro.write(batch) @@ -206,9 +200,3 @@ func (ro *RunningOutput) write(metrics []telegraf.Metric) error { } return err } - -// OutputConfig containing name and filter -type OutputConfig struct { - Name string - Filter Filter -} diff --git a/internal/models/running_output_test.go b/internal/models/running_output_test.go index bd39f2f9b..c55334218 100644 --- a/internal/models/running_output_test.go +++ b/internal/models/running_output_test.go @@ -75,23 +75,6 @@ func BenchmarkRunningOutputAddFailWrites(b *testing.B) { } } -func TestAddingNilMetric(t *testing.T) { - conf := &OutputConfig{ - Filter: Filter{}, - } - - m := &mockOutput{} - ro := NewRunningOutput("test", m, conf, 1000, 10000) - - ro.AddMetric(nil) - ro.AddMetric(nil) - ro.AddMetric(nil) - - err := ro.Write() - assert.NoError(t, err) - assert.Len(t, m.Metrics(), 0) -} - // Test that NameDrop filters ger properly applied. func TestRunningOutput_DropFilter(t *testing.T) { conf := &OutputConfig{ diff --git a/internal/models/running_processor.go b/internal/models/running_processor.go index 92d3d44d0..a210d9799 100644 --- a/internal/models/running_processor.go +++ b/internal/models/running_processor.go @@ -34,14 +34,18 @@ func (rp *RunningProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric { ret := []telegraf.Metric{} for _, metric := range in { - if rp.Config.Filter.IsActive() { - // check if the filter should be applied to this metric - if ok := rp.Config.Filter.Apply(metric.Name(), metric.Fields(), metric.Tags()); !ok { - // this means filter should not be applied - ret = append(ret, metric) - continue - } + // In processors when a filter selects a metric it is sent through the + // processor. Otherwise the metric continues downstream unmodified. + if ok := rp.Config.Filter.Select(metric); !ok { + ret = append(ret, metric) + continue } + + rp.Config.Filter.Modify(metric) + if len(metric.FieldList()) == 0 { + continue + } + // This metric should pass through the filter, so call the filter Apply // function and append results to the output slice. ret = append(ret, rp.Processor.Apply(metric)...) diff --git a/internal/models/running_processor_test.go b/internal/models/running_processor_test.go index 8a691a9b8..02db40460 100644 --- a/internal/models/running_processor_test.go +++ b/internal/models/running_processor_test.go @@ -1,117 +1,203 @@ package models import ( + "sort" "testing" + "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/testutil" + "github.com/influxdata/telegraf/metric" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -type TestProcessor struct { +// MockProcessor is a Processor with an overrideable Apply implementation. +type MockProcessor struct { + ApplyF func(in ...telegraf.Metric) []telegraf.Metric } -func (f *TestProcessor) SampleConfig() string { return "" } -func (f *TestProcessor) Description() string { return "" } - -// Apply renames: -// "foo" to "fuz" -// "bar" to "baz" -// And it also drops measurements named "dropme" -func (f *TestProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric { - out := make([]telegraf.Metric, 0) - for _, m := range in { - switch m.Name() { - case "foo": - out = append(out, testutil.TestMetric(1, "fuz")) - case "bar": - out = append(out, testutil.TestMetric(1, "baz")) - case "dropme": - // drop the metric! - default: - out = append(out, m) - } - } - return out +func (p *MockProcessor) SampleConfig() string { + return "" } -func NewTestRunningProcessor() *RunningProcessor { - out := &RunningProcessor{ - Name: "test", - Processor: &TestProcessor{}, - Config: &ProcessorConfig{Filter: Filter{}}, - } - return out +func (p *MockProcessor) Description() string { + return "" } -func TestRunningProcessor(t *testing.T) { - inmetrics := []telegraf.Metric{ - testutil.TestMetric(1, "foo"), - testutil.TestMetric(1, "bar"), - testutil.TestMetric(1, "baz"), - } - - expectedNames := []string{ - "fuz", - "baz", - "baz", - } - rfp := NewTestRunningProcessor() - filteredMetrics := rfp.Apply(inmetrics...) - - actualNames := []string{ - filteredMetrics[0].Name(), - filteredMetrics[1].Name(), - filteredMetrics[2].Name(), - } - assert.Equal(t, expectedNames, actualNames) +func (p *MockProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric { + return p.ApplyF(in...) } -func TestRunningProcessor_WithNameDrop(t *testing.T) { - inmetrics := []telegraf.Metric{ - testutil.TestMetric(1, "foo"), - testutil.TestMetric(1, "bar"), - testutil.TestMetric(1, "baz"), +// TagProcessor returns a Processor whose Apply function adds the tag and +// value. +func TagProcessor(key, value string) *MockProcessor { + return &MockProcessor{ + ApplyF: func(in ...telegraf.Metric) []telegraf.Metric { + for _, m := range in { + m.AddTag(key, value) + } + return in + }, } - - expectedNames := []string{ - "foo", - "baz", - "baz", - } - rfp := NewTestRunningProcessor() - - rfp.Config.Filter.NameDrop = []string{"foo"} - assert.NoError(t, rfp.Config.Filter.Compile()) - - filteredMetrics := rfp.Apply(inmetrics...) - - actualNames := []string{ - filteredMetrics[0].Name(), - filteredMetrics[1].Name(), - filteredMetrics[2].Name(), - } - assert.Equal(t, expectedNames, actualNames) } -func TestRunningProcessor_DroppedMetric(t *testing.T) { - inmetrics := []telegraf.Metric{ - testutil.TestMetric(1, "dropme"), - testutil.TestMetric(1, "foo"), - testutil.TestMetric(1, "bar"), +func Metric( + name string, + tags map[string]string, + fields map[string]interface{}, + tm time.Time, + tp ...telegraf.ValueType, +) telegraf.Metric { + m, err := metric.New(name, tags, fields, tm, tp...) + if err != nil { + panic(err) } - - expectedNames := []string{ - "fuz", - "baz", - } - rfp := NewTestRunningProcessor() - filteredMetrics := rfp.Apply(inmetrics...) - - actualNames := []string{ - filteredMetrics[0].Name(), - filteredMetrics[1].Name(), - } - assert.Equal(t, expectedNames, actualNames) + return m +} + +func TestRunningProcessor_Apply(t *testing.T) { + type args struct { + Processor telegraf.Processor + Config *ProcessorConfig + } + + tests := []struct { + name string + args args + input []telegraf.Metric + expected []telegraf.Metric + }{ + { + name: "inactive filter applies metrics", + args: args{ + Processor: TagProcessor("apply", "true"), + Config: &ProcessorConfig{ + Filter: Filter{}, + }, + }, + input: []telegraf.Metric{ + Metric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + Metric( + "cpu", + map[string]string{ + "apply": "true", + }, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "filter applies", + args: args{ + Processor: TagProcessor("apply", "true"), + Config: &ProcessorConfig{ + Filter: Filter{ + NamePass: []string{"cpu"}, + }, + }, + }, + input: []telegraf.Metric{ + Metric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + Metric( + "cpu", + map[string]string{ + "apply": "true", + }, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "filter doesn't apply", + args: args{ + Processor: TagProcessor("apply", "true"), + Config: &ProcessorConfig{ + Filter: Filter{ + NameDrop: []string{"cpu"}, + }, + }, + }, + input: []telegraf.Metric{ + Metric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + Metric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rp := &RunningProcessor{ + Processor: tt.args.Processor, + Config: tt.args.Config, + } + rp.Config.Filter.Compile() + + actual := rp.Apply(tt.input...) + require.Equal(t, tt.expected, actual) + }) + } +} + +func TestRunningProcessor_Order(t *testing.T) { + rp1 := &RunningProcessor{ + Config: &ProcessorConfig{ + Order: 1, + }, + } + rp2 := &RunningProcessor{ + Config: &ProcessorConfig{ + Order: 2, + }, + } + rp3 := &RunningProcessor{ + Config: &ProcessorConfig{ + Order: 3, + }, + } + + procs := RunningProcessors{rp2, rp3, rp1} + sort.Sort(procs) + require.Equal(t, + RunningProcessors{rp1, rp2, rp3}, + procs) }