diff --git a/internal/models/buffer.go b/internal/models/buffer.go index 5d036c728..f7f343e46 100644 --- a/internal/models/buffer.go +++ b/internal/models/buffer.go @@ -99,10 +99,12 @@ func (b *Buffer) metricDropped(metric telegraf.Metric) { metric.Reject() } -func (b *Buffer) add(m telegraf.Metric) { +func (b *Buffer) add(m telegraf.Metric) int { + dropped := 0 // Check if Buffer is full if b.size == b.cap { b.metricDropped(b.buf[b.last]) + dropped++ if b.last == b.batchFirst && b.batchSize > 0 { b.batchSize-- @@ -120,18 +122,23 @@ func (b *Buffer) add(m telegraf.Metric) { } b.size = min(b.size+1, b.cap) + return dropped } -// Add adds metrics to the buffer -func (b *Buffer) Add(metrics ...telegraf.Metric) { +// Add adds metrics to the buffer and returns number of dropped metrics. +func (b *Buffer) Add(metrics ...telegraf.Metric) int { b.Lock() defer b.Unlock() + dropped := 0 for i := range metrics { - b.add(metrics[i]) + if n := b.add(metrics[i]); n != 0 { + dropped += n + } } b.BufferSize.Set(int64(b.length())) + return dropped } // Batch returns a slice containing up to batchSize of the most recently added diff --git a/internal/models/running_output.go b/internal/models/running_output.go index 4cec18cc8..ff2b88e2a 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -32,6 +32,7 @@ type OutputConfig struct { type RunningOutput struct { // Must be 64-bit aligned newMetricsCount int64 + droppedMetrics int64 Name string Output telegraf.Output @@ -118,7 +119,8 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) { return } - ro.buffer.Add(metric) + dropped := ro.buffer.Add(metric) + atomic.AddInt64(&ro.droppedMetrics, int64(dropped)) count := atomic.AddInt64(&ro.newMetricsCount, 1) if count == int64(ro.MetricBatchSize) { @@ -188,6 +190,13 @@ func (ro *RunningOutput) Close() { } func (ro *RunningOutput) write(metrics []telegraf.Metric) error { + dropped := atomic.LoadInt64(&ro.droppedMetrics) + if dropped > 0 { + log.Printf("W! [outputs.%s] Metric buffer overflow; %d metrics have been dropped", + ro.Name, dropped) + atomic.StoreInt64(&ro.droppedMetrics, 0) + } + start := time.Now() err := ro.Output.Write(metrics) elapsed := time.Since(start)