From 6cea487bfc88372a04b1885d1cca6ce6777aec2e Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 1 May 2018 16:47:16 -0700 Subject: [PATCH] Add idea for an output that aggregates before adding to metric buffer --- internal/models/running_output.go | 11 +++++++++++ output.go | 6 ++++++ 2 files changed, 17 insertions(+) diff --git a/internal/models/running_output.go b/internal/models/running_output.go index 713c28cce..7f3c2967b 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -113,6 +113,11 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) { m, _ = metric.New(name, tags, fields, t) } + if output, ok := ro.Output.(telegraf.AggregatingOutput); ok { + output.Add(m) + return + } + ro.metrics.Add(m) if ro.metrics.Len() == ro.MetricBatchSize { batch := ro.metrics.Batch(ro.MetricBatchSize) @@ -125,6 +130,12 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) { // Write writes all cached points to this output. func (ro *RunningOutput) Write() error { + if output, ok := ro.Output.(telegraf.AggregatingOutput); ok { + metrics := output.Push() + ro.metrics.Add(metrics...) + output.Reset() + } + nFails, nMetrics := ro.failMetrics.Len(), ro.metrics.Len() ro.BufferSize.Set(int64(nFails + nMetrics)) log.Printf("D! Output [%s] buffer fullness: %d / %d metrics. ", diff --git a/output.go b/output.go index d66ea4556..39b371ac4 100644 --- a/output.go +++ b/output.go @@ -13,6 +13,12 @@ type Output interface { Write(metrics []Metric) error } +type AggregatingOutput interface { + Add(in Metric) + Push() []Metric + Reset() +} + type ServiceOutput interface { // Connect to the Output Connect() error