diff --git a/plugins/processors/dedup/dedup.go b/plugins/processors/dedup/dedup.go index c0d40f434..3dd7516a6 100644 --- a/plugins/processors/dedup/dedup.go +++ b/plugins/processors/dedup/dedup.go @@ -75,10 +75,29 @@ func (d *Dedup) Apply(metrics ...telegraf.Metric) []telegraf.Metric { // For each field compare value with the cached one changed := false + added := false + sametime := metric.Time() == m.Time() for _, f := range metric.FieldList() { - if value, ok := m.GetField(f.Key); ok && value != f.Value { - changed = true - continue + if value, ok := m.GetField(f.Key); ok { + if value != f.Value { + changed = true + break + } + } else if sametime { + // This field isn't in the cached metric but it's the + // same series and timestamp. Merge it into the cached + // metric. + + // Metrics have a ValueType that applies to all values + // in the metric. If an input needs to produce values + // with different ValueTypes but the same timestamp, + // they have to produce multiple metrics. (See the + // system input for an example.) In this case, dedup + // ignores the ValueTypes of the metrics and merges + // the fields into one metric for the dup check. + + m.AddField(f.Key, f.Value) + added = true } } // If any field value has changed then refresh the cache @@ -87,6 +106,10 @@ func (d *Dedup) Apply(metrics ...telegraf.Metric) []telegraf.Metric { continue } + if sametime && added { + continue + } + // In any other case remove metric from the output metrics = remove(metrics, idx) } diff --git a/plugins/processors/dedup/dedup_test.go b/plugins/processors/dedup/dedup_test.go index 20a94ed30..cae2bf1a5 100644 --- a/plugins/processors/dedup/dedup_test.go +++ b/plugins/processors/dedup/dedup_test.go @@ -1,10 +1,11 @@ package dedup import ( - "github.com/stretchr/testify/require" "testing" "time" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/metric" @@ -152,3 +153,42 @@ func TestCacheShrink(t *testing.T) { require.Equal(t, 0, len(deduplicate.Cache)) } + +func TestSameTimestamp(t *testing.T) { + now := time.Now() + dedup := createDedup(now) + var in telegraf.Metric + var out []telegraf.Metric + + in, _ = metric.New("metric", + map[string]string{"tag": "value"}, + map[string]interface{}{"foo": 1}, // field + now, + ) + out = dedup.Apply(in) + require.Equal(t, []telegraf.Metric{in}, out) // pass + + in, _ = metric.New("metric", + map[string]string{"tag": "value"}, + map[string]interface{}{"bar": 1}, // different field + now, + ) + out = dedup.Apply(in) + require.Equal(t, []telegraf.Metric{in}, out) // pass + + in, _ = metric.New("metric", + map[string]string{"tag": "value"}, + map[string]interface{}{"bar": 2}, // same field different value + now, + ) + out = dedup.Apply(in) + require.Equal(t, []telegraf.Metric{in}, out) // pass + + in, _ = metric.New("metric", + map[string]string{"tag": "value"}, + map[string]interface{}{"bar": 2}, // same field same value + now, + ) + out = dedup.Apply(in) + require.Equal(t, []telegraf.Metric{}, out) // drop +}