From 46b9000ef6dafc9241e49eddcdd73128ae4cd4eb Mon Sep 17 00:00:00 2001 From: memory Date: Mon, 19 Aug 2019 23:54:40 -0400 Subject: [PATCH] Add tag_limit processor (#6086) --- plugins/processors/all/all.go | 1 + plugins/processors/tag_limit/README.md | 27 ++++++ plugins/processors/tag_limit/tag_limit.go | 86 +++++++++++++++++++ .../processors/tag_limit/tag_limit_test.go | 86 +++++++++++++++++++ 4 files changed, 200 insertions(+) create mode 100644 plugins/processors/tag_limit/README.md create mode 100644 plugins/processors/tag_limit/tag_limit.go create mode 100644 plugins/processors/tag_limit/tag_limit_test.go diff --git a/plugins/processors/all/all.go b/plugins/processors/all/all.go index 5a61a2e80..47ff83f54 100644 --- a/plugins/processors/all/all.go +++ b/plugins/processors/all/all.go @@ -11,6 +11,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/processors/regex" _ "github.com/influxdata/telegraf/plugins/processors/rename" _ "github.com/influxdata/telegraf/plugins/processors/strings" + _ "github.com/influxdata/telegraf/plugins/processors/tag_limit" _ "github.com/influxdata/telegraf/plugins/processors/topk" _ "github.com/influxdata/telegraf/plugins/processors/unpivot" ) diff --git a/plugins/processors/tag_limit/README.md b/plugins/processors/tag_limit/README.md new file mode 100644 index 000000000..b287f0f8d --- /dev/null +++ b/plugins/processors/tag_limit/README.md @@ -0,0 +1,27 @@ +# Tag Limit Processor Plugin + +Use the `tag_limit` processor to ensure that only a certain number of tags are +preserved for any given metric, and to choose the tags to preserve when the +number of tags appended by the data source is over the limit. + +This can be useful when dealing with output systems (e.g. Stackdriver) that +impose hard limits on the number of tags/labels per metric or where high +levels of cardinality are computationally and/or financially expensive. + +### Configuration + +```toml +[[processors.tag_limit]] + ## Maximum number of tags to preserve + limit = 3 + + ## List of tags to preferentially preserve + keep = ["environment", "region"] +``` + +### Example + +```diff ++ throughput month=Jun,environment=qa,region=us-east1,lower=10i,upper=1000i,mean=500i 1560540094000000000 ++ throughput environment=qa,region=us-east1,lower=10i 1560540094000000000 +``` diff --git a/plugins/processors/tag_limit/tag_limit.go b/plugins/processors/tag_limit/tag_limit.go new file mode 100644 index 000000000..41353a8f8 --- /dev/null +++ b/plugins/processors/tag_limit/tag_limit.go @@ -0,0 +1,86 @@ +package taglimit + +import ( + "fmt" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/processors" + "log" +) + +const sampleConfig = ` + ## Maximum number of tags to preserve + limit = 10 + + ## List of tags to preferentially preserve + keep = ["foo", "bar", "baz"] +` + +type TagLimit struct { + Limit int `toml:"limit"` + Keep []string `toml:"keep"` + init bool + keepTags map[string]string +} + +func (d *TagLimit) SampleConfig() string { + return sampleConfig +} + +func (d *TagLimit) Description() string { + return "Restricts the number of tags that can pass through this filter and chooses which tags to preserve when over the limit." +} + +func (d *TagLimit) initOnce() error { + if d.init { + return nil + } + if len(d.Keep) > d.Limit { + return fmt.Errorf("%d keep tags is greater than %d total tag limit", len(d.Keep), d.Limit) + } + d.keepTags = make(map[string]string) + // convert list of tags-to-keep to a map so we can do constant-time lookups + for _, tag_key := range d.Keep { + d.keepTags[tag_key] = "" + } + d.init = true + return nil +} + +func (d *TagLimit) Apply(in ...telegraf.Metric) []telegraf.Metric { + err := d.initOnce() + if err != nil { + log.Printf("E! [processors.tag_limit] could not create tag_limit processor: %v", err) + return in + } + for _, point := range in { + pointOriginalTags := point.TagList() + lenPointTags := len(pointOriginalTags) + if lenPointTags <= d.Limit { + continue + } + tagsToRemove := make([]string, lenPointTags-d.Limit) + removeIdx := 0 + // remove extraneous tags, stop once we're at the limit + for _, t := range pointOriginalTags { + if _, ok := d.keepTags[t.Key]; !ok { + tagsToRemove[removeIdx] = t.Key + removeIdx++ + lenPointTags-- + } + if lenPointTags <= d.Limit { + break + } + } + for _, t := range tagsToRemove { + point.RemoveTag(t) + } + } + + return in +} + +func init() { + processors.Add("tag_limit", func() telegraf.Processor { + return &TagLimit{} + }) +} diff --git a/plugins/processors/tag_limit/tag_limit_test.go b/plugins/processors/tag_limit/tag_limit_test.go new file mode 100644 index 000000000..9412d866b --- /dev/null +++ b/plugins/processors/tag_limit/tag_limit_test.go @@ -0,0 +1,86 @@ +package taglimit + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/stretchr/testify/assert" +) + +func MustMetric(name string, tags map[string]string, fields map[string]interface{}, metricTime time.Time) telegraf.Metric { + if tags == nil { + tags = map[string]string{} + } + if fields == nil { + fields = map[string]interface{}{} + } + m, _ := metric.New(name, tags, fields, metricTime) + return m +} + +func TestUnderLimit(t *testing.T) { + currentTime := time.Now() + + oneTags := make(map[string]string) + oneTags["foo"] = "bar" + + tenTags := make(map[string]string) + tenTags["a"] = "bar" + tenTags["b"] = "bar" + tenTags["c"] = "bar" + tenTags["d"] = "bar" + tenTags["e"] = "bar" + tenTags["f"] = "bar" + tenTags["g"] = "bar" + tenTags["h"] = "bar" + tenTags["i"] = "bar" + tenTags["j"] = "bar" + + tagLimitConfig := TagLimit{ + Limit: 10, + Keep: []string{"foo", "bar"}, + } + + m1 := MustMetric("foo", oneTags, nil, currentTime) + m2 := MustMetric("bar", tenTags, nil, currentTime) + limitApply := tagLimitConfig.Apply(m1, m2) + assert.Equal(t, oneTags, limitApply[0].Tags(), "one tag") + assert.Equal(t, tenTags, limitApply[1].Tags(), "ten tags") +} + +func TestTrim(t *testing.T) { + currentTime := time.Now() + + threeTags := make(map[string]string) + threeTags["a"] = "foo" + threeTags["b"] = "bar" + threeTags["z"] = "baz" + + tenTags := make(map[string]string) + tenTags["a"] = "foo" + tenTags["b"] = "bar" + tenTags["c"] = "baz" + tenTags["d"] = "abc" + tenTags["e"] = "def" + tenTags["f"] = "ghi" + tenTags["g"] = "jkl" + tenTags["h"] = "mno" + tenTags["i"] = "pqr" + tenTags["j"] = "stu" + + tagLimitConfig := TagLimit{ + Limit: 3, + Keep: []string{"a", "b"}, + } + + m1 := MustMetric("foo", threeTags, nil, currentTime) + m2 := MustMetric("bar", tenTags, nil, currentTime) + limitApply := tagLimitConfig.Apply(m1, m2) + assert.Equal(t, threeTags, limitApply[0].Tags(), "three tags") + trimmedTags := limitApply[1].Tags() + assert.Equal(t, 3, len(trimmedTags), "ten tags") + assert.Equal(t, "foo", trimmedTags["a"], "preserved: a") + assert.Equal(t, "bar", trimmedTags["b"], "preserved: b") +}