From 00382052668c9f4b213f864f001b78a10e80ce48 Mon Sep 17 00:00:00 2001 From: igomura <52306882+igomura@users.noreply.github.com> Date: Tue, 17 Mar 2020 15:53:03 -0700 Subject: [PATCH] Add dedup processor (#6792) --- plugins/processors/all/all.go | 1 + plugins/processors/dedup/README.md | 17 +++ plugins/processors/dedup/dedup.go | 105 +++++++++++++++++ plugins/processors/dedup/dedup_test.go | 154 +++++++++++++++++++++++++ 4 files changed, 277 insertions(+) create mode 100644 plugins/processors/dedup/README.md create mode 100644 plugins/processors/dedup/dedup.go create mode 100644 plugins/processors/dedup/dedup_test.go diff --git a/plugins/processors/all/all.go b/plugins/processors/all/all.go index 98e9ccbfa..360b37dd0 100644 --- a/plugins/processors/all/all.go +++ b/plugins/processors/all/all.go @@ -4,6 +4,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/processors/clone" _ "github.com/influxdata/telegraf/plugins/processors/converter" _ "github.com/influxdata/telegraf/plugins/processors/date" + _ "github.com/influxdata/telegraf/plugins/processors/dedup" _ "github.com/influxdata/telegraf/plugins/processors/enum" _ "github.com/influxdata/telegraf/plugins/processors/override" _ "github.com/influxdata/telegraf/plugins/processors/parser" diff --git a/plugins/processors/dedup/README.md b/plugins/processors/dedup/README.md new file mode 100644 index 000000000..5e808bcd3 --- /dev/null +++ b/plugins/processors/dedup/README.md @@ -0,0 +1,17 @@ +# Dedup Processor Plugin + +If a metric sends the same value over successive intervals, suppress sending +the same value to the TSD until this many seconds have elapsed. This helps +graphs over narrow time ranges still see timeseries with suppressed datapoints. + +This feature can be used to reduce traffic when metric's value does not change over +time while maintain proper precision when value gets changed rapidly + +### Configuration + +```toml +[[processors.dedup]] + ## Maximum time to suppress output + dedup_interval = "600s" +``` + diff --git a/plugins/processors/dedup/dedup.go b/plugins/processors/dedup/dedup.go new file mode 100644 index 000000000..d3e04e070 --- /dev/null +++ b/plugins/processors/dedup/dedup.go @@ -0,0 +1,105 @@ +package dedup + +import ( + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/processors" +) + +var sampleConfig = ` + ## Maximum time to suppress output + dedup_interval = "600s" +` + +type Dedup struct { + DedupInterval internal.Duration `toml:"dedup_interval"` + FlushTime time.Time + Cache map[uint64]telegraf.Metric +} + +func (d *Dedup) SampleConfig() string { + return sampleConfig +} + +func (d *Dedup) Description() string { + return "Deduplicate repetitive metrics" +} + +// Remove single item from slice +func remove(slice []telegraf.Metric, i int) []telegraf.Metric { + slice[len(slice)-1], slice[i] = slice[i], slice[len(slice)-1] + return slice[:len(slice)-1] +} + +// Remove expired items from cache +func (d *Dedup) cleanup() { + // No need to cleanup cache too often. Lets save some CPU + if time.Since(d.FlushTime) < d.DedupInterval.Duration { + return + } + d.FlushTime = time.Now() + keep := make(map[uint64]telegraf.Metric, 0) + for id, metric := range d.Cache { + if time.Since(metric.Time()) < d.DedupInterval.Duration { + keep[id] = metric + } + } + d.Cache = keep +} + +// Save item to cache +func (d *Dedup) save(metric telegraf.Metric, id uint64) { + d.Cache[id] = metric.Copy() + d.Cache[id].Accept() +} + +// main processing method +func (d *Dedup) Apply(metrics ...telegraf.Metric) []telegraf.Metric { + for idx, metric := range metrics { + id := metric.HashID() + m, ok := d.Cache[id] + + // If not in cache then just save it + if !ok { + d.save(metric, id) + continue + } + + // If cache item has expired then refresh it + if time.Since(m.Time()) >= d.DedupInterval.Duration { + d.save(metric, id) + continue + } + + // For each filed compare value with the cached one + changed := false + for _, f := range metric.FieldList() { + if value, ok := m.GetField(f.Key); ok && value != f.Value { + changed = true + continue + } + } + // If any field value has changed then refresh the cache + if changed { + d.save(metric, id) + continue + } + + // In any other case remove metric from the output + metrics = remove(metrics, idx) + } + d.cleanup() + return metrics +} + +func init() { + processors.Add("dedup", func() telegraf.Processor { + return &Dedup{ + DedupInterval: internal.Duration{Duration: 10 * time.Minute}, + FlushTime: time.Now(), + Cache: make(map[uint64]telegraf.Metric), + } + }) +} diff --git a/plugins/processors/dedup/dedup_test.go b/plugins/processors/dedup/dedup_test.go new file mode 100644 index 000000000..20a94ed30 --- /dev/null +++ b/plugins/processors/dedup/dedup_test.go @@ -0,0 +1,154 @@ +package dedup + +import ( + "github.com/stretchr/testify/require" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/metric" +) + +func createMetric(name string, value int64, when time.Time) telegraf.Metric { + m, _ := metric.New(name, + map[string]string{"tag": "tag_value"}, + map[string]interface{}{"value": value}, + when, + ) + return m +} + +func createDedup(initTime time.Time) Dedup { + return Dedup{ + DedupInterval: internal.Duration{Duration: 10 * time.Minute}, + FlushTime: initTime, + Cache: make(map[uint64]telegraf.Metric), + } +} + +func assertCacheRefresh(t *testing.T, proc *Dedup, item telegraf.Metric) { + id := item.HashID() + name := item.Name() + // cache is not empty + require.NotEqual(t, 0, len(proc.Cache)) + // cache has metric with proper id + cache, present := proc.Cache[id] + require.True(t, present) + // cache has metric with proper name + require.Equal(t, name, cache.Name()) + // cached metric has proper field + cValue, present := cache.GetField("value") + require.True(t, present) + iValue, _ := item.GetField("value") + require.Equal(t, cValue, iValue) + // cached metric has proper timestamp + require.Equal(t, cache.Time(), item.Time()) +} + +func assertCacheHit(t *testing.T, proc *Dedup, item telegraf.Metric) { + id := item.HashID() + name := item.Name() + // cache is not empty + require.NotEqual(t, 0, len(proc.Cache)) + // cache has metric with proper id + cache, present := proc.Cache[id] + require.True(t, present) + // cache has metric with proper name + require.Equal(t, name, cache.Name()) + // cached metric has proper field + cValue, present := cache.GetField("value") + require.True(t, present) + iValue, _ := item.GetField("value") + require.Equal(t, cValue, iValue) + // cached metric did NOT change timestamp + require.NotEqual(t, cache.Time(), item.Time()) +} + +func assertMetricPassed(t *testing.T, target []telegraf.Metric, source telegraf.Metric) { + // target is not empty + require.NotEqual(t, 0, len(target)) + // target has metric with proper name + require.Equal(t, "m1", target[0].Name()) + // target metric has proper field + tValue, present := target[0].GetField("value") + require.True(t, present) + sValue, present := source.GetField("value") + require.Equal(t, tValue, sValue) + // target metric has proper timestamp + require.Equal(t, target[0].Time(), source.Time()) +} + +func assertMetricSuppressed(t *testing.T, target []telegraf.Metric, source telegraf.Metric) { + // target is empty + require.Equal(t, 0, len(target)) +} + +func TestProcRetainsMetric(t *testing.T) { + deduplicate := createDedup(time.Now()) + source := createMetric("m1", 1, time.Now()) + target := deduplicate.Apply(source) + + assertCacheRefresh(t, &deduplicate, source) + assertMetricPassed(t, target, source) +} + +func TestSuppressRepeatedValue(t *testing.T) { + deduplicate := createDedup(time.Now()) + // Create metric in the past + source := createMetric("m1", 1, time.Now().Add(-1*time.Second)) + target := deduplicate.Apply(source) + source = createMetric("m1", 1, time.Now()) + target = deduplicate.Apply(source) + + assertCacheHit(t, &deduplicate, source) + assertMetricSuppressed(t, target, source) +} + +func TestPassUpdatedValue(t *testing.T) { + deduplicate := createDedup(time.Now()) + // Create metric in the past + source := createMetric("m1", 1, time.Now().Add(-1*time.Second)) + target := deduplicate.Apply(source) + source = createMetric("m1", 2, time.Now()) + target = deduplicate.Apply(source) + + assertCacheRefresh(t, &deduplicate, source) + assertMetricPassed(t, target, source) +} + +func TestPassAfterCacheExpire(t *testing.T) { + deduplicate := createDedup(time.Now()) + // Create metric in the past + source := createMetric("m1", 1, time.Now().Add(-1*time.Hour)) + target := deduplicate.Apply(source) + source = createMetric("m1", 1, time.Now()) + target = deduplicate.Apply(source) + + assertCacheRefresh(t, &deduplicate, source) + assertMetricPassed(t, target, source) +} + +func TestCacheRetainsMetrics(t *testing.T) { + deduplicate := createDedup(time.Now()) + // Create metric in the past 3sec + source := createMetric("m1", 1, time.Now().Add(-3*time.Hour)) + deduplicate.Apply(source) + // Create metric in the past 2sec + source = createMetric("m1", 1, time.Now().Add(-2*time.Hour)) + deduplicate.Apply(source) + source = createMetric("m1", 1, time.Now()) + deduplicate.Apply(source) + + assertCacheRefresh(t, &deduplicate, source) +} + +func TestCacheShrink(t *testing.T) { + // Time offset is more than 2 * DedupInterval + deduplicate := createDedup(time.Now().Add(-2 * time.Hour)) + // Time offset is more than 1 * DedupInterval + source := createMetric("m1", 1, time.Now().Add(-1*time.Hour)) + deduplicate.Apply(source) + + require.Equal(t, 0, len(deduplicate.Cache)) +}