From b34c5e0d04cad9ca9bce41369aea20d95c7e7a11 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 22 Jan 2019 13:43:51 -0800 Subject: [PATCH] Update the buffer_size internal metric after writes (#5314) --- internal/models/buffer.go | 29 +++++++++-- internal/models/buffer_test.go | 82 ++++++++++++++++++++++++++++++- internal/models/running_output.go | 54 ++++++-------------- 3 files changed, 120 insertions(+), 45 deletions(-) diff --git a/internal/models/buffer.go b/internal/models/buffer.go index 3d82e573a..7b27e6686 100644 --- a/internal/models/buffer.go +++ b/internal/models/buffer.go @@ -27,6 +27,8 @@ type Buffer struct { MetricsAdded selfstat.Stat MetricsWritten selfstat.Stat MetricsDropped selfstat.Stat + BufferSize selfstat.Stat + BufferLimit selfstat.Stat } // NewBuffer returns a new empty Buffer with the given capacity. @@ -53,7 +55,19 @@ func NewBuffer(name string, capacity int) *Buffer { "metrics_dropped", map[string]string{"output": name}, ), + BufferSize: selfstat.Register( + "write", + "buffer_size", + map[string]string{"output": name}, + ), + BufferLimit: selfstat.Register( + "write", + "buffer_limit", + map[string]string{"output": name}, + ), } + b.BufferSize.Set(int64(0)) + b.BufferLimit.Set(int64(capacity)) return b } @@ -62,7 +76,11 @@ func (b *Buffer) Len() int { b.Lock() defer b.Unlock() - return b.size + return b.length() +} + +func (b *Buffer) length() int { + return min(b.size+b.batchSize, b.cap) } func (b *Buffer) metricAdded() { @@ -112,6 +130,8 @@ func (b *Buffer) Add(metrics ...telegraf.Metric) { for i := range metrics { b.add(metrics[i]) } + + b.BufferSize.Set(int64(b.length())) } // Batch returns a slice containing up to batchSize of the most recently added @@ -153,6 +173,7 @@ func (b *Buffer) Accept(batch []telegraf.Metric) { } b.resetBatch() + b.BufferSize.Set(int64(b.length())) } // Reject returns the batch, acquired from Batch(), to the buffer and marks it @@ -176,6 +197,7 @@ func (b *Buffer) Reject(batch []telegraf.Metric) { if b.buf[re] != nil { b.metricDropped(b.buf[re]) + b.first = b.next(b.first) } b.buf[re] = b.buf[rp] @@ -188,13 +210,14 @@ func (b *Buffer) Reject(batch []telegraf.Metric) { if i < restore { re = b.prev(re) b.buf[re] = batch[i] - b.size++ + b.size = min(b.size+1, b.cap) } else { b.metricDropped(batch[i]) } } b.resetBatch() + b.BufferSize.Set(int64(b.length())) } // dist returns the distance between two indexes. Because this data structure @@ -204,7 +227,7 @@ func (b *Buffer) dist(begin, end int) int { if begin <= end { return end - begin } else { - return b.cap - begin - 1 + end + return b.cap - begin + end } } diff --git a/internal/models/buffer_test.go b/internal/models/buffer_test.go index 7aa55a2c2..892af8bd4 100644 --- a/internal/models/buffer_test.go +++ b/internal/models/buffer_test.go @@ -359,7 +359,7 @@ func TestBuffer_RejectPartialRoom(t *testing.T) { }, batch) } -func TestBuffer_RejectWrapped(t *testing.T) { +func TestBuffer_RejectNewMetricsWrapped(t *testing.T) { b := setup(NewBuffer("test", 5)) b.Add(MetricTime(1)) b.Add(MetricTime(2)) @@ -402,6 +402,84 @@ func TestBuffer_RejectWrapped(t *testing.T) { }, batch) } +func TestBuffer_RejectWrapped(t *testing.T) { + b := setup(NewBuffer("test", 5)) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + + b.Add(MetricTime(6)) + b.Add(MetricTime(7)) + b.Add(MetricTime(8)) + batch := b.Batch(3) + + b.Add(MetricTime(9)) + b.Add(MetricTime(10)) + b.Add(MetricTime(11)) + b.Add(MetricTime(12)) + + b.Reject(batch) + + batch = b.Batch(5) + testutil.RequireMetricsEqual(t, + []telegraf.Metric{ + MetricTime(12), + MetricTime(11), + MetricTime(10), + MetricTime(9), + MetricTime(8), + }, batch) +} + +func TestBuffer_RejectAdjustFirst(t *testing.T) { + b := setup(NewBuffer("test", 10)) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + batch := b.Batch(3) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + b.Add(MetricTime(6)) + b.Reject(batch) + + b.Add(MetricTime(7)) + b.Add(MetricTime(8)) + b.Add(MetricTime(9)) + batch = b.Batch(3) + b.Add(MetricTime(10)) + b.Add(MetricTime(11)) + b.Add(MetricTime(12)) + b.Reject(batch) + + b.Add(MetricTime(13)) + b.Add(MetricTime(14)) + b.Add(MetricTime(15)) + batch = b.Batch(3) + b.Add(MetricTime(16)) + b.Add(MetricTime(17)) + b.Add(MetricTime(18)) + b.Reject(batch) + + b.Add(MetricTime(19)) + + batch = b.Batch(10) + testutil.RequireMetricsEqual(t, + []telegraf.Metric{ + MetricTime(19), + MetricTime(18), + MetricTime(17), + MetricTime(16), + MetricTime(15), + MetricTime(14), + MetricTime(13), + MetricTime(12), + MetricTime(11), + MetricTime(10), + }, batch) +} + func TestBuffer_AddDropsOverwrittenMetrics(t *testing.T) { m := Metric() b := setup(NewBuffer("test", 5)) @@ -509,7 +587,7 @@ func TestBuffer_BatchNotRemoved(t *testing.T) { b := setup(NewBuffer("test", 5)) b.Add(m, m, m, m, m) b.Batch(2) - require.Equal(t, 3, b.Len()) + require.Equal(t, 5, b.Len()) } func TestBuffer_BatchRejectAcceptNoop(t *testing.T) { diff --git a/internal/models/running_output.go b/internal/models/running_output.go index 8d7d9854b..531a3065b 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -3,6 +3,7 @@ package models import ( "log" "sync" + "sync/atomic" "time" "github.com/influxdata/telegraf" @@ -29,6 +30,9 @@ type OutputConfig struct { // RunningOutput contains the output configuration type RunningOutput struct { + // Must be 64-bit aligned + newMetricsCount int64 + Name string Output telegraf.Output Config *OutputConfig @@ -36,16 +40,13 @@ type RunningOutput struct { MetricBatchSize int MetricsFiltered selfstat.Stat - BufferSize selfstat.Stat - BufferLimit selfstat.Stat WriteTime selfstat.Stat - batch []telegraf.Metric - buffer *Buffer BatchReady chan time.Time - aggMutex sync.Mutex - batchMutex sync.Mutex + buffer *Buffer + + aggMutex sync.Mutex } func NewRunningOutput( @@ -69,7 +70,6 @@ func NewRunningOutput( } ro := &RunningOutput{ Name: name, - batch: make([]telegraf.Metric, 0, batchSize), buffer: NewBuffer(name, bufferLimit), BatchReady: make(chan time.Time, 1), Output: output, @@ -81,16 +81,6 @@ func NewRunningOutput( "metrics_filtered", map[string]string{"output": name}, ), - BufferSize: selfstat.Register( - "write", - "buffer_size", - map[string]string{"output": name}, - ), - BufferLimit: selfstat.Register( - "write", - "buffer_limit", - map[string]string{"output": name}, - ), WriteTime: selfstat.RegisterTiming( "write", "write_time_ns", @@ -98,7 +88,6 @@ func NewRunningOutput( ), } - ro.BufferLimit.Set(int64(ro.MetricBufferLimit)) return ro } @@ -129,28 +118,16 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) { return } - ro.batchMutex.Lock() - - ro.batch = append(ro.batch, metric) - if len(ro.batch) == ro.MetricBatchSize { - ro.addBatchToBuffer() - - nBuffer := ro.buffer.Len() - ro.BufferSize.Set(int64(nBuffer)) + ro.buffer.Add(metric) + count := atomic.AddInt64(&ro.newMetricsCount, 1) + if count == int64(ro.MetricBatchSize) { + atomic.StoreInt64(&ro.newMetricsCount, 0) select { case ro.BatchReady <- time.Now(): default: } } - - ro.batchMutex.Unlock() -} - -// AddBatchToBuffer moves the metrics from the batch into the metric buffer. -func (ro *RunningOutput) addBatchToBuffer() { - ro.buffer.Add(ro.batch...) - ro.batch = ro.batch[:0] } // Write writes all metrics to the output, stopping when all have been sent on @@ -163,15 +140,12 @@ func (ro *RunningOutput) Write() error { output.Reset() ro.aggMutex.Unlock() } - // add and write can be called concurrently - ro.batchMutex.Lock() - ro.addBatchToBuffer() - ro.batchMutex.Unlock() - nBuffer := ro.buffer.Len() + atomic.StoreInt64(&ro.newMetricsCount, 0) // Only process the metrics in the buffer now. Metrics added while we are // writing will be sent on the next call. + nBuffer := ro.buffer.Len() nBatches := nBuffer/ro.MetricBatchSize + 1 for i := 0; i < nBatches; i++ { batch := ro.buffer.Batch(ro.MetricBatchSize) @@ -189,7 +163,7 @@ func (ro *RunningOutput) Write() error { return nil } -// WriteBatch writes only the batch metrics to the output. +// WriteBatch writes a single batch of metrics to the output. func (ro *RunningOutput) WriteBatch() error { batch := ro.buffer.Batch(ro.MetricBatchSize) if len(batch) == 0 {