Log a warning on write if the metric buffer has overflowed (#5959)

This commit is contained in:
Daniel Nelson 2019-06-05 12:34:45 -07:00 committed by GitHub
parent 8bc768b239
commit 77cac557ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 21 additions and 5 deletions

View File

@ -99,10 +99,12 @@ func (b *Buffer) metricDropped(metric telegraf.Metric) {
metric.Reject() metric.Reject()
} }
func (b *Buffer) add(m telegraf.Metric) { func (b *Buffer) add(m telegraf.Metric) int {
dropped := 0
// Check if Buffer is full // Check if Buffer is full
if b.size == b.cap { if b.size == b.cap {
b.metricDropped(b.buf[b.last]) b.metricDropped(b.buf[b.last])
dropped++
if b.last == b.batchFirst && b.batchSize > 0 { if b.last == b.batchFirst && b.batchSize > 0 {
b.batchSize-- b.batchSize--
@ -120,18 +122,23 @@ func (b *Buffer) add(m telegraf.Metric) {
} }
b.size = min(b.size+1, b.cap) b.size = min(b.size+1, b.cap)
return dropped
} }
// Add adds metrics to the buffer // Add adds metrics to the buffer and returns number of dropped metrics.
func (b *Buffer) Add(metrics ...telegraf.Metric) { func (b *Buffer) Add(metrics ...telegraf.Metric) int {
b.Lock() b.Lock()
defer b.Unlock() defer b.Unlock()
dropped := 0
for i := range metrics { for i := range metrics {
b.add(metrics[i]) if n := b.add(metrics[i]); n != 0 {
dropped += n
}
} }
b.BufferSize.Set(int64(b.length())) b.BufferSize.Set(int64(b.length()))
return dropped
} }
// Batch returns a slice containing up to batchSize of the most recently added // Batch returns a slice containing up to batchSize of the most recently added

View File

@ -32,6 +32,7 @@ type OutputConfig struct {
type RunningOutput struct { type RunningOutput struct {
// Must be 64-bit aligned // Must be 64-bit aligned
newMetricsCount int64 newMetricsCount int64
droppedMetrics int64
Name string Name string
Output telegraf.Output Output telegraf.Output
@ -118,7 +119,8 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
return return
} }
ro.buffer.Add(metric) dropped := ro.buffer.Add(metric)
atomic.AddInt64(&ro.droppedMetrics, int64(dropped))
count := atomic.AddInt64(&ro.newMetricsCount, 1) count := atomic.AddInt64(&ro.newMetricsCount, 1)
if count == int64(ro.MetricBatchSize) { if count == int64(ro.MetricBatchSize) {
@ -188,6 +190,13 @@ func (ro *RunningOutput) Close() {
} }
func (ro *RunningOutput) write(metrics []telegraf.Metric) error { func (ro *RunningOutput) write(metrics []telegraf.Metric) error {
dropped := atomic.LoadInt64(&ro.droppedMetrics)
if dropped > 0 {
log.Printf("W! [outputs.%s] Metric buffer overflow; %d metrics have been dropped",
ro.Name, dropped)
atomic.StoreInt64(&ro.droppedMetrics, 0)
}
start := time.Now() start := time.Now()
err := ro.Output.Write(metrics) err := ro.Output.Write(metrics)
elapsed := time.Since(start) elapsed := time.Since(start)