Handle multiple metrics with the same timestamp in dedup processor (#7439)

This commit is contained in:
reimda 2020-05-12 13:56:35 -06:00 committed by GitHub
parent 47e1cdc0ee
commit 934f6af99f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 67 additions and 4 deletions

View File

@ -75,10 +75,29 @@ func (d *Dedup) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
// For each field compare value with the cached one // For each field compare value with the cached one
changed := false changed := false
added := false
sametime := metric.Time() == m.Time()
for _, f := range metric.FieldList() { for _, f := range metric.FieldList() {
if value, ok := m.GetField(f.Key); ok && value != f.Value { if value, ok := m.GetField(f.Key); ok {
if value != f.Value {
changed = true changed = true
continue break
}
} else if sametime {
// This field isn't in the cached metric but it's the
// same series and timestamp. Merge it into the cached
// metric.
// Metrics have a ValueType that applies to all values
// in the metric. If an input needs to produce values
// with different ValueTypes but the same timestamp,
// they have to produce multiple metrics. (See the
// system input for an example.) In this case, dedup
// ignores the ValueTypes of the metrics and merges
// the fields into one metric for the dup check.
m.AddField(f.Key, f.Value)
added = true
} }
} }
// If any field value has changed then refresh the cache // If any field value has changed then refresh the cache
@ -87,6 +106,10 @@ func (d *Dedup) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
continue continue
} }
if sametime && added {
continue
}
// In any other case remove metric from the output // In any other case remove metric from the output
metrics = remove(metrics, idx) metrics = remove(metrics, idx)
} }

View File

@ -1,10 +1,11 @@
package dedup package dedup
import ( import (
"github.com/stretchr/testify/require"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
@ -152,3 +153,42 @@ func TestCacheShrink(t *testing.T) {
require.Equal(t, 0, len(deduplicate.Cache)) require.Equal(t, 0, len(deduplicate.Cache))
} }
func TestSameTimestamp(t *testing.T) {
now := time.Now()
dedup := createDedup(now)
var in telegraf.Metric
var out []telegraf.Metric
in, _ = metric.New("metric",
map[string]string{"tag": "value"},
map[string]interface{}{"foo": 1}, // field
now,
)
out = dedup.Apply(in)
require.Equal(t, []telegraf.Metric{in}, out) // pass
in, _ = metric.New("metric",
map[string]string{"tag": "value"},
map[string]interface{}{"bar": 1}, // different field
now,
)
out = dedup.Apply(in)
require.Equal(t, []telegraf.Metric{in}, out) // pass
in, _ = metric.New("metric",
map[string]string{"tag": "value"},
map[string]interface{}{"bar": 2}, // same field different value
now,
)
out = dedup.Apply(in)
require.Equal(t, []telegraf.Metric{in}, out) // pass
in, _ = metric.New("metric",
map[string]string{"tag": "value"},
map[string]interface{}{"bar": 2}, // same field same value
now,
)
out = dedup.Apply(in)
require.Equal(t, []telegraf.Metric{}, out) // drop
}