Add a copy of the input metric when adding to aggregator (#5266)
This commit is contained in:
parent
361baaa4bb
commit
0fd08dd65a
|
@ -109,11 +109,12 @@ func (r *RunningAggregator) metricDropped(metric telegraf.Metric) {
|
|||
// Add a metric to the aggregator and return true if the original metric
|
||||
// should be dropped.
|
||||
func (r *RunningAggregator) Add(metric telegraf.Metric) bool {
|
||||
|
||||
if ok := r.Config.Filter.Select(metric); !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
metric = metric.Copy()
|
||||
|
||||
r.Config.Filter.Modify(metric)
|
||||
if len(metric.FieldList()) == 0 {
|
||||
return r.Config.DropOriginal
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -152,6 +151,32 @@ func TestAddDropOriginal(t *testing.T) {
|
|||
require.False(t, ra.Add(m2))
|
||||
}
|
||||
|
||||
func TestAddDoesNotModifyMetric(t *testing.T) {
|
||||
ra := NewRunningAggregator(&TestAggregator{}, &AggregatorConfig{
|
||||
Name: "TestRunningAggregator",
|
||||
Filter: Filter{
|
||||
FieldPass: []string{"a"},
|
||||
},
|
||||
DropOriginal: true,
|
||||
})
|
||||
require.NoError(t, ra.Config.Filter.Compile())
|
||||
|
||||
now := time.Now()
|
||||
|
||||
m := testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"a": int64(42),
|
||||
"b": int64(42),
|
||||
},
|
||||
now)
|
||||
expected := m.Copy()
|
||||
ra.Add(m)
|
||||
|
||||
testutil.RequireMetricEqual(t, expected, m)
|
||||
}
|
||||
|
||||
type TestAggregator struct {
|
||||
sum int64
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue