diff --git a/plugins/aggregators/all/all.go b/plugins/aggregators/all/all.go index ec04c0aaf..eabfaa4bf 100644 --- a/plugins/aggregators/all/all.go +++ b/plugins/aggregators/all/all.go @@ -4,6 +4,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/aggregators/basicstats" _ "github.com/influxdata/telegraf/plugins/aggregators/final" _ "github.com/influxdata/telegraf/plugins/aggregators/histogram" + _ "github.com/influxdata/telegraf/plugins/aggregators/merge" _ "github.com/influxdata/telegraf/plugins/aggregators/minmax" _ "github.com/influxdata/telegraf/plugins/aggregators/valuecounter" ) diff --git a/plugins/aggregators/merge/README.md b/plugins/aggregators/merge/README.md new file mode 100644 index 000000000..58fa47bbd --- /dev/null +++ b/plugins/aggregators/merge/README.md @@ -0,0 +1,23 @@ +# Merge Aggregator + +Merge metrics together into a metric with multiple fields into the most memory +and network transfer efficient form. + +Use this plugin when fields are split over multiple metrics, with the same +measurement, tag set and timestamp. By merging into a single metric they can +be handled more efficiently by the output. + +### Configuration + +```toml +[[aggregators.merge]] + # no configuration +``` + +### Example + +```diff +- cpu,host=localhost usage_time=42 1567562620000000000 +- cpu,host=localhost idle_time=42 1567562620000000000 ++ cpu,host=localhost idle_time=42,usage_time=42 1567562620000000000 +``` diff --git a/plugins/aggregators/merge/merge.go b/plugins/aggregators/merge/merge.go new file mode 100644 index 000000000..6a1e82911 --- /dev/null +++ b/plugins/aggregators/merge/merge.go @@ -0,0 +1,62 @@ +package seriesgrouper + +import ( + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/aggregators" +) + +const ( + description = "Merge metrics into multifield metrics by series key" + sampleConfig = "" +) + +type Merge struct { + grouper *metric.SeriesGrouper + log telegraf.Logger +} + +func (a *Merge) Init() error { + a.grouper = metric.NewSeriesGrouper() + return nil +} + +func (a *Merge) Description() string { + return description +} + +func (a *Merge) SampleConfig() string { + return sampleConfig +} + +func (a *Merge) Add(m telegraf.Metric) { + tags := m.Tags() + for _, field := range m.FieldList() { + err := a.grouper.Add(m.Name(), tags, m.Time(), field.Key, field.Value) + if err != nil { + a.log.Errorf("Error adding metric: %v", err) + } + } +} + +func (a *Merge) Push(acc telegraf.Accumulator) { + // Always use nanosecond precision to avoid rounding metrics that were + // produced at a precision higher than the agent default. + acc.SetPrecision(time.Nanosecond) + + for _, m := range a.grouper.Metrics() { + acc.AddMetric(m) + } +} + +func (a *Merge) Reset() { + a.grouper = metric.NewSeriesGrouper() +} + +func init() { + aggregators.Add("merge", func() telegraf.Aggregator { + return &Merge{} + }) +} diff --git a/plugins/aggregators/merge/merge_test.go b/plugins/aggregators/merge/merge_test.go new file mode 100644 index 000000000..2f2703c8f --- /dev/null +++ b/plugins/aggregators/merge/merge_test.go @@ -0,0 +1,186 @@ +package seriesgrouper + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestSimple(t *testing.T) { + plugin := &Merge{} + + err := plugin.Init() + require.NoError(t, err) + + plugin.Add( + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_idle": 42, + }, + time.Unix(0, 0), + ), + ) + require.NoError(t, err) + + plugin.Add( + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_guest": 42, + }, + time.Unix(0, 0), + ), + ) + require.NoError(t, err) + + var acc testutil.Accumulator + plugin.Push(&acc) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_idle": 42, + "time_guest": 42, + }, + time.Unix(0, 0), + ), + } + + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) +} + +func TestNanosecondPrecision(t *testing.T) { + plugin := &Merge{} + + err := plugin.Init() + require.NoError(t, err) + + plugin.Add( + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_idle": 42, + }, + time.Unix(0, 1), + ), + ) + require.NoError(t, err) + + plugin.Add( + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_guest": 42, + }, + time.Unix(0, 1), + ), + ) + require.NoError(t, err) + + var acc testutil.Accumulator + acc.SetPrecision(time.Second) + plugin.Push(&acc) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_idle": 42, + "time_guest": 42, + }, + time.Unix(0, 1), + ), + } + + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) +} + +func TestReset(t *testing.T) { + plugin := &Merge{} + + err := plugin.Init() + require.NoError(t, err) + + plugin.Add( + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_idle": 42, + }, + time.Unix(0, 0), + ), + ) + require.NoError(t, err) + + var acc testutil.Accumulator + plugin.Push(&acc) + + plugin.Reset() + + plugin.Add( + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_guest": 42, + }, + time.Unix(0, 0), + ), + ) + require.NoError(t, err) + + plugin.Push(&acc) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_idle": 42, + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_guest": 42, + }, + time.Unix(0, 0), + ), + } + + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) +}