Add dedup processor (#6792)

This commit is contained in:
igomura 2020-03-17 15:53:03 -07:00 committed by GitHub
parent f6ea2598e5
commit 0038205266
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 277 additions and 0 deletions

View File

@ -4,6 +4,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/processors/clone"
_ "github.com/influxdata/telegraf/plugins/processors/converter"
_ "github.com/influxdata/telegraf/plugins/processors/date"
_ "github.com/influxdata/telegraf/plugins/processors/dedup"
_ "github.com/influxdata/telegraf/plugins/processors/enum"
_ "github.com/influxdata/telegraf/plugins/processors/override"
_ "github.com/influxdata/telegraf/plugins/processors/parser"

View File

@ -0,0 +1,17 @@
# Dedup Processor Plugin
If a metric sends the same value over successive intervals, suppress sending
the same value to the TSD until this many seconds have elapsed. This helps
graphs over narrow time ranges still see timeseries with suppressed datapoints.
This feature can be used to reduce traffic when metric's value does not change over
time while maintain proper precision when value gets changed rapidly
### Configuration
```toml
[[processors.dedup]]
## Maximum time to suppress output
dedup_interval = "600s"
```

View File

@ -0,0 +1,105 @@
package dedup
import (
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/processors"
)
var sampleConfig = `
## Maximum time to suppress output
dedup_interval = "600s"
`
type Dedup struct {
DedupInterval internal.Duration `toml:"dedup_interval"`
FlushTime time.Time
Cache map[uint64]telegraf.Metric
}
func (d *Dedup) SampleConfig() string {
return sampleConfig
}
func (d *Dedup) Description() string {
return "Deduplicate repetitive metrics"
}
// Remove single item from slice
func remove(slice []telegraf.Metric, i int) []telegraf.Metric {
slice[len(slice)-1], slice[i] = slice[i], slice[len(slice)-1]
return slice[:len(slice)-1]
}
// Remove expired items from cache
func (d *Dedup) cleanup() {
// No need to cleanup cache too often. Lets save some CPU
if time.Since(d.FlushTime) < d.DedupInterval.Duration {
return
}
d.FlushTime = time.Now()
keep := make(map[uint64]telegraf.Metric, 0)
for id, metric := range d.Cache {
if time.Since(metric.Time()) < d.DedupInterval.Duration {
keep[id] = metric
}
}
d.Cache = keep
}
// Save item to cache
func (d *Dedup) save(metric telegraf.Metric, id uint64) {
d.Cache[id] = metric.Copy()
d.Cache[id].Accept()
}
// main processing method
func (d *Dedup) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
for idx, metric := range metrics {
id := metric.HashID()
m, ok := d.Cache[id]
// If not in cache then just save it
if !ok {
d.save(metric, id)
continue
}
// If cache item has expired then refresh it
if time.Since(m.Time()) >= d.DedupInterval.Duration {
d.save(metric, id)
continue
}
// For each filed compare value with the cached one
changed := false
for _, f := range metric.FieldList() {
if value, ok := m.GetField(f.Key); ok && value != f.Value {
changed = true
continue
}
}
// If any field value has changed then refresh the cache
if changed {
d.save(metric, id)
continue
}
// In any other case remove metric from the output
metrics = remove(metrics, idx)
}
d.cleanup()
return metrics
}
func init() {
processors.Add("dedup", func() telegraf.Processor {
return &Dedup{
DedupInterval: internal.Duration{Duration: 10 * time.Minute},
FlushTime: time.Now(),
Cache: make(map[uint64]telegraf.Metric),
}
})
}

View File

@ -0,0 +1,154 @@
package dedup
import (
"github.com/stretchr/testify/require"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
)
func createMetric(name string, value int64, when time.Time) telegraf.Metric {
m, _ := metric.New(name,
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": value},
when,
)
return m
}
func createDedup(initTime time.Time) Dedup {
return Dedup{
DedupInterval: internal.Duration{Duration: 10 * time.Minute},
FlushTime: initTime,
Cache: make(map[uint64]telegraf.Metric),
}
}
func assertCacheRefresh(t *testing.T, proc *Dedup, item telegraf.Metric) {
id := item.HashID()
name := item.Name()
// cache is not empty
require.NotEqual(t, 0, len(proc.Cache))
// cache has metric with proper id
cache, present := proc.Cache[id]
require.True(t, present)
// cache has metric with proper name
require.Equal(t, name, cache.Name())
// cached metric has proper field
cValue, present := cache.GetField("value")
require.True(t, present)
iValue, _ := item.GetField("value")
require.Equal(t, cValue, iValue)
// cached metric has proper timestamp
require.Equal(t, cache.Time(), item.Time())
}
func assertCacheHit(t *testing.T, proc *Dedup, item telegraf.Metric) {
id := item.HashID()
name := item.Name()
// cache is not empty
require.NotEqual(t, 0, len(proc.Cache))
// cache has metric with proper id
cache, present := proc.Cache[id]
require.True(t, present)
// cache has metric with proper name
require.Equal(t, name, cache.Name())
// cached metric has proper field
cValue, present := cache.GetField("value")
require.True(t, present)
iValue, _ := item.GetField("value")
require.Equal(t, cValue, iValue)
// cached metric did NOT change timestamp
require.NotEqual(t, cache.Time(), item.Time())
}
func assertMetricPassed(t *testing.T, target []telegraf.Metric, source telegraf.Metric) {
// target is not empty
require.NotEqual(t, 0, len(target))
// target has metric with proper name
require.Equal(t, "m1", target[0].Name())
// target metric has proper field
tValue, present := target[0].GetField("value")
require.True(t, present)
sValue, present := source.GetField("value")
require.Equal(t, tValue, sValue)
// target metric has proper timestamp
require.Equal(t, target[0].Time(), source.Time())
}
func assertMetricSuppressed(t *testing.T, target []telegraf.Metric, source telegraf.Metric) {
// target is empty
require.Equal(t, 0, len(target))
}
func TestProcRetainsMetric(t *testing.T) {
deduplicate := createDedup(time.Now())
source := createMetric("m1", 1, time.Now())
target := deduplicate.Apply(source)
assertCacheRefresh(t, &deduplicate, source)
assertMetricPassed(t, target, source)
}
func TestSuppressRepeatedValue(t *testing.T) {
deduplicate := createDedup(time.Now())
// Create metric in the past
source := createMetric("m1", 1, time.Now().Add(-1*time.Second))
target := deduplicate.Apply(source)
source = createMetric("m1", 1, time.Now())
target = deduplicate.Apply(source)
assertCacheHit(t, &deduplicate, source)
assertMetricSuppressed(t, target, source)
}
func TestPassUpdatedValue(t *testing.T) {
deduplicate := createDedup(time.Now())
// Create metric in the past
source := createMetric("m1", 1, time.Now().Add(-1*time.Second))
target := deduplicate.Apply(source)
source = createMetric("m1", 2, time.Now())
target = deduplicate.Apply(source)
assertCacheRefresh(t, &deduplicate, source)
assertMetricPassed(t, target, source)
}
func TestPassAfterCacheExpire(t *testing.T) {
deduplicate := createDedup(time.Now())
// Create metric in the past
source := createMetric("m1", 1, time.Now().Add(-1*time.Hour))
target := deduplicate.Apply(source)
source = createMetric("m1", 1, time.Now())
target = deduplicate.Apply(source)
assertCacheRefresh(t, &deduplicate, source)
assertMetricPassed(t, target, source)
}
func TestCacheRetainsMetrics(t *testing.T) {
deduplicate := createDedup(time.Now())
// Create metric in the past 3sec
source := createMetric("m1", 1, time.Now().Add(-3*time.Hour))
deduplicate.Apply(source)
// Create metric in the past 2sec
source = createMetric("m1", 1, time.Now().Add(-2*time.Hour))
deduplicate.Apply(source)
source = createMetric("m1", 1, time.Now())
deduplicate.Apply(source)
assertCacheRefresh(t, &deduplicate, source)
}
func TestCacheShrink(t *testing.T) {
// Time offset is more than 2 * DedupInterval
deduplicate := createDedup(time.Now().Add(-2 * time.Hour))
// Time offset is more than 1 * DedupInterval
source := createMetric("m1", 1, time.Now().Add(-1*time.Hour))
deduplicate.Apply(source)
require.Equal(t, 0, len(deduplicate.Cache))
}