Add tag_limit processor (#6086)
This commit is contained in:
parent
edb05b58a0
commit
46b9000ef6
|
@ -11,6 +11,7 @@ import (
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/regex"
|
_ "github.com/influxdata/telegraf/plugins/processors/regex"
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/rename"
|
_ "github.com/influxdata/telegraf/plugins/processors/rename"
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/strings"
|
_ "github.com/influxdata/telegraf/plugins/processors/strings"
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/processors/tag_limit"
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/topk"
|
_ "github.com/influxdata/telegraf/plugins/processors/topk"
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/unpivot"
|
_ "github.com/influxdata/telegraf/plugins/processors/unpivot"
|
||||||
)
|
)
|
||||||
|
|
|
@ -0,0 +1,27 @@
|
||||||
|
# Tag Limit Processor Plugin
|
||||||
|
|
||||||
|
Use the `tag_limit` processor to ensure that only a certain number of tags are
|
||||||
|
preserved for any given metric, and to choose the tags to preserve when the
|
||||||
|
number of tags appended by the data source is over the limit.
|
||||||
|
|
||||||
|
This can be useful when dealing with output systems (e.g. Stackdriver) that
|
||||||
|
impose hard limits on the number of tags/labels per metric or where high
|
||||||
|
levels of cardinality are computationally and/or financially expensive.
|
||||||
|
|
||||||
|
### Configuration
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[processors.tag_limit]]
|
||||||
|
## Maximum number of tags to preserve
|
||||||
|
limit = 3
|
||||||
|
|
||||||
|
## List of tags to preferentially preserve
|
||||||
|
keep = ["environment", "region"]
|
||||||
|
```
|
||||||
|
|
||||||
|
### Example
|
||||||
|
|
||||||
|
```diff
|
||||||
|
+ throughput month=Jun,environment=qa,region=us-east1,lower=10i,upper=1000i,mean=500i 1560540094000000000
|
||||||
|
+ throughput environment=qa,region=us-east1,lower=10i 1560540094000000000
|
||||||
|
```
|
|
@ -0,0 +1,86 @@
|
||||||
|
package taglimit
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/plugins/processors"
|
||||||
|
"log"
|
||||||
|
)
|
||||||
|
|
||||||
|
const sampleConfig = `
|
||||||
|
## Maximum number of tags to preserve
|
||||||
|
limit = 10
|
||||||
|
|
||||||
|
## List of tags to preferentially preserve
|
||||||
|
keep = ["foo", "bar", "baz"]
|
||||||
|
`
|
||||||
|
|
||||||
|
type TagLimit struct {
|
||||||
|
Limit int `toml:"limit"`
|
||||||
|
Keep []string `toml:"keep"`
|
||||||
|
init bool
|
||||||
|
keepTags map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *TagLimit) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *TagLimit) Description() string {
|
||||||
|
return "Restricts the number of tags that can pass through this filter and chooses which tags to preserve when over the limit."
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *TagLimit) initOnce() error {
|
||||||
|
if d.init {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if len(d.Keep) > d.Limit {
|
||||||
|
return fmt.Errorf("%d keep tags is greater than %d total tag limit", len(d.Keep), d.Limit)
|
||||||
|
}
|
||||||
|
d.keepTags = make(map[string]string)
|
||||||
|
// convert list of tags-to-keep to a map so we can do constant-time lookups
|
||||||
|
for _, tag_key := range d.Keep {
|
||||||
|
d.keepTags[tag_key] = ""
|
||||||
|
}
|
||||||
|
d.init = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *TagLimit) Apply(in ...telegraf.Metric) []telegraf.Metric {
|
||||||
|
err := d.initOnce()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("E! [processors.tag_limit] could not create tag_limit processor: %v", err)
|
||||||
|
return in
|
||||||
|
}
|
||||||
|
for _, point := range in {
|
||||||
|
pointOriginalTags := point.TagList()
|
||||||
|
lenPointTags := len(pointOriginalTags)
|
||||||
|
if lenPointTags <= d.Limit {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
tagsToRemove := make([]string, lenPointTags-d.Limit)
|
||||||
|
removeIdx := 0
|
||||||
|
// remove extraneous tags, stop once we're at the limit
|
||||||
|
for _, t := range pointOriginalTags {
|
||||||
|
if _, ok := d.keepTags[t.Key]; !ok {
|
||||||
|
tagsToRemove[removeIdx] = t.Key
|
||||||
|
removeIdx++
|
||||||
|
lenPointTags--
|
||||||
|
}
|
||||||
|
if lenPointTags <= d.Limit {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, t := range tagsToRemove {
|
||||||
|
point.RemoveTag(t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return in
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
processors.Add("tag_limit", func() telegraf.Processor {
|
||||||
|
return &TagLimit{}
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,86 @@
|
||||||
|
package taglimit
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func MustMetric(name string, tags map[string]string, fields map[string]interface{}, metricTime time.Time) telegraf.Metric {
|
||||||
|
if tags == nil {
|
||||||
|
tags = map[string]string{}
|
||||||
|
}
|
||||||
|
if fields == nil {
|
||||||
|
fields = map[string]interface{}{}
|
||||||
|
}
|
||||||
|
m, _ := metric.New(name, tags, fields, metricTime)
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUnderLimit(t *testing.T) {
|
||||||
|
currentTime := time.Now()
|
||||||
|
|
||||||
|
oneTags := make(map[string]string)
|
||||||
|
oneTags["foo"] = "bar"
|
||||||
|
|
||||||
|
tenTags := make(map[string]string)
|
||||||
|
tenTags["a"] = "bar"
|
||||||
|
tenTags["b"] = "bar"
|
||||||
|
tenTags["c"] = "bar"
|
||||||
|
tenTags["d"] = "bar"
|
||||||
|
tenTags["e"] = "bar"
|
||||||
|
tenTags["f"] = "bar"
|
||||||
|
tenTags["g"] = "bar"
|
||||||
|
tenTags["h"] = "bar"
|
||||||
|
tenTags["i"] = "bar"
|
||||||
|
tenTags["j"] = "bar"
|
||||||
|
|
||||||
|
tagLimitConfig := TagLimit{
|
||||||
|
Limit: 10,
|
||||||
|
Keep: []string{"foo", "bar"},
|
||||||
|
}
|
||||||
|
|
||||||
|
m1 := MustMetric("foo", oneTags, nil, currentTime)
|
||||||
|
m2 := MustMetric("bar", tenTags, nil, currentTime)
|
||||||
|
limitApply := tagLimitConfig.Apply(m1, m2)
|
||||||
|
assert.Equal(t, oneTags, limitApply[0].Tags(), "one tag")
|
||||||
|
assert.Equal(t, tenTags, limitApply[1].Tags(), "ten tags")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTrim(t *testing.T) {
|
||||||
|
currentTime := time.Now()
|
||||||
|
|
||||||
|
threeTags := make(map[string]string)
|
||||||
|
threeTags["a"] = "foo"
|
||||||
|
threeTags["b"] = "bar"
|
||||||
|
threeTags["z"] = "baz"
|
||||||
|
|
||||||
|
tenTags := make(map[string]string)
|
||||||
|
tenTags["a"] = "foo"
|
||||||
|
tenTags["b"] = "bar"
|
||||||
|
tenTags["c"] = "baz"
|
||||||
|
tenTags["d"] = "abc"
|
||||||
|
tenTags["e"] = "def"
|
||||||
|
tenTags["f"] = "ghi"
|
||||||
|
tenTags["g"] = "jkl"
|
||||||
|
tenTags["h"] = "mno"
|
||||||
|
tenTags["i"] = "pqr"
|
||||||
|
tenTags["j"] = "stu"
|
||||||
|
|
||||||
|
tagLimitConfig := TagLimit{
|
||||||
|
Limit: 3,
|
||||||
|
Keep: []string{"a", "b"},
|
||||||
|
}
|
||||||
|
|
||||||
|
m1 := MustMetric("foo", threeTags, nil, currentTime)
|
||||||
|
m2 := MustMetric("bar", tenTags, nil, currentTime)
|
||||||
|
limitApply := tagLimitConfig.Apply(m1, m2)
|
||||||
|
assert.Equal(t, threeTags, limitApply[0].Tags(), "three tags")
|
||||||
|
trimmedTags := limitApply[1].Tags()
|
||||||
|
assert.Equal(t, 3, len(trimmedTags), "ten tags")
|
||||||
|
assert.Equal(t, "foo", trimmedTags["a"], "preserved: a")
|
||||||
|
assert.Equal(t, "bar", trimmedTags["b"], "preserved: b")
|
||||||
|
}
|
Loading…
Reference in New Issue