Add clone processor (#6529)

This commit is contained in:
Adrián López 2019-10-21 20:59:32 +02:00 committed by Daniel Nelson
parent 59adbe8b39
commit 89c4c1d024
3 changed files with 181 additions and 0 deletions

View File

@ -0,0 +1,38 @@
# Clone Processor Plugin
The clone processor plugin create a copy of each metric passing through it,
preserving untouched the original metric and allowing modifications in the
copied one.
The modifications allowed are the ones supported by input plugins and aggregators:
* name_override
* name_prefix
* name_suffix
* tags
Select the metrics to modify using the standard
[measurement filtering](https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md#measurement-filtering)
options.
Values of *name_override*, *name_prefix*, *name_suffix* and already present
*tags* with conflicting keys will be overwritten. Absent *tags* will be
created.
A typical use-case is gathering metrics once and cloning them to simulate
having several hosts (modifying ``host`` tag).
### Configuration:
```toml
# Apply metric modifications using override semantics.
[[processors.clone]]
## All modifications on inputs and aggregators can be overridden:
# name_override = "new_name"
# name_prefix = "new_name_prefix"
# name_suffix = "new_name_suffix"
## Tags to be added (all values must be strings)
# [processors.clone.tags]
# additional_tag = "tag_value"
```

View File

@ -0,0 +1,60 @@
package clone
import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/processors"
)
var sampleConfig = `
## All modifications on inputs and aggregators can be overridden:
# name_override = "new_name"
# name_prefix = "new_name_prefix"
# name_suffix = "new_name_suffix"
## Tags to be added (all values must be strings)
# [processors.clone.tags]
# additional_tag = "tag_value"
`
type Clone struct {
NameOverride string
NamePrefix string
NameSuffix string
Tags map[string]string
}
func (c *Clone) SampleConfig() string {
return sampleConfig
}
func (c *Clone) Description() string {
return "Clone metrics and apply modifications."
}
func (c *Clone) Apply(in ...telegraf.Metric) []telegraf.Metric {
cloned := []telegraf.Metric{}
for _, metric := range in {
cloned = append(cloned, metric.Copy())
if len(c.NameOverride) > 0 {
metric.SetName(c.NameOverride)
}
if len(c.NamePrefix) > 0 {
metric.AddPrefix(c.NamePrefix)
}
if len(c.NameSuffix) > 0 {
metric.AddSuffix(c.NameSuffix)
}
for key, value := range c.Tags {
metric.AddTag(key, value)
}
}
return append(in, cloned...)
}
func init() {
processors.Add("clone", func() telegraf.Processor {
return &Clone{}
})
}

View File

@ -0,0 +1,83 @@
package clone
import (
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert"
)
func createTestMetric() telegraf.Metric {
metric, _ := metric.New("m1",
map[string]string{"metric_tag": "from_metric"},
map[string]interface{}{"value": int64(1)},
time.Now(),
)
return metric
}
func calculateProcessedTags(processor Clone, metric telegraf.Metric) map[string]string {
processed := processor.Apply(metric)
return processed[0].Tags()
}
func TestRetainsTags(t *testing.T) {
processor := Clone{}
tags := calculateProcessedTags(processor, createTestMetric())
value, present := tags["metric_tag"]
assert.True(t, present, "Tag of metric was not present")
assert.Equal(t, "from_metric", value, "Value of Tag was changed")
}
func TestAddTags(t *testing.T) {
processor := Clone{Tags: map[string]string{"added_tag": "from_config", "another_tag": ""}}
tags := calculateProcessedTags(processor, createTestMetric())
value, present := tags["added_tag"]
assert.True(t, present, "Additional Tag of metric was not present")
assert.Equal(t, "from_config", value, "Value of Tag was changed")
assert.Equal(t, 3, len(tags), "Should have one previous and two added tags.")
}
func TestOverwritesPresentTagValues(t *testing.T) {
processor := Clone{Tags: map[string]string{"metric_tag": "from_config"}}
tags := calculateProcessedTags(processor, createTestMetric())
value, present := tags["metric_tag"]
assert.True(t, present, "Tag of metric was not present")
assert.Equal(t, 1, len(tags), "Should only have one tag.")
assert.Equal(t, "from_config", value, "Value of Tag was not changed")
}
func TestOverridesName(t *testing.T) {
processor := Clone{NameOverride: "overridden"}
processed := processor.Apply(createTestMetric())
assert.Equal(t, "overridden", processed[0].Name(), "Name was not overridden")
assert.Equal(t, "m1", processed[1].Name(), "Original metric was modified")
}
func TestNamePrefix(t *testing.T) {
processor := Clone{NamePrefix: "Pre-"}
processed := processor.Apply(createTestMetric())
assert.Equal(t, "Pre-m1", processed[0].Name(), "Prefix was not applied")
assert.Equal(t, "m1", processed[1].Name(), "Original metric was modified")
}
func TestNameSuffix(t *testing.T) {
processor := Clone{NameSuffix: "-suff"}
processed := processor.Apply(createTestMetric())
assert.Equal(t, "m1-suff", processed[0].Name(), "Suffix was not applied")
assert.Equal(t, "m1", processed[1].Name(), "Original metric was modified")
}