diff --git a/internal/buffer/buffer.go b/internal/buffer/buffer.go index 04835e042..6a460eccb 100644 --- a/internal/buffer/buffer.go +++ b/internal/buffer/buffer.go @@ -14,9 +14,12 @@ var ( // Buffer is an object for storing metrics in a circular buffer. type Buffer struct { - buf chan telegraf.Metric - - mu sync.Mutex + sync.Mutex + buf []telegraf.Metric + first int + last int + size int + empty bool } // NewBuffer returns a Buffer @@ -24,47 +27,98 @@ type Buffer struct { // called when the buffer is full, then the oldest metric(s) will be dropped. func NewBuffer(size int) *Buffer { return &Buffer{ - buf: make(chan telegraf.Metric, size), + buf: make([]telegraf.Metric, size), + first: 0, + last: 0, + size: size, + empty: true, } } // IsEmpty returns true if Buffer is empty. func (b *Buffer) IsEmpty() bool { - return len(b.buf) == 0 + return b.empty } // Len returns the current length of the buffer. func (b *Buffer) Len() int { - return len(b.buf) + if b.empty { + return 0 + } else if b.first <= b.last { + return b.last - b.first + 1 + } + // Spans the end of array. + // size - gap in the middle + return b.size - (b.first - b.last - 1) // size - gap +} + +func (b *Buffer) push(m telegraf.Metric) { + // Empty + if b.empty { + b.last = b.first // Reset + b.buf[b.last] = m + b.empty = false + return + } + + b.last++ + b.last %= b.size + + // Full + if b.first == b.last { + MetricsDropped.Incr(1) + b.first = (b.first + 1) % b.size + } + b.buf[b.last] = m } // Add adds metrics to the buffer. func (b *Buffer) Add(metrics ...telegraf.Metric) { - b.mu.Lock() - for i, _ := range metrics { + b.Lock() + defer b.Unlock() + for i := range metrics { MetricsWritten.Incr(1) - select { - case b.buf <- metrics[i]: - default: - MetricsDropped.Incr(1) - <-b.buf - b.buf <- metrics[i] - } + b.push(metrics[i]) } - b.mu.Unlock() } // Batch returns a batch of metrics of size batchSize. // the batch will be of maximum length batchSize. It can be less than batchSize, // if the length of Buffer is less than batchSize. func (b *Buffer) Batch(batchSize int) []telegraf.Metric { - b.mu.Lock() - n := min(len(b.buf), batchSize) - out := make([]telegraf.Metric, n) - for i := 0; i < n; i++ { - out[i] = <-b.buf + b.Lock() + defer b.Unlock() + outLen := min(b.Len(), batchSize) + out := make([]telegraf.Metric, outLen) + if outLen == 0 { + return out + } + + // We copy everything right of first up to last, count or end + // b.last >= rightInd || b.last < b.first + // therefore wont copy past b.last + rightInd := min(b.size, b.first+outLen) - 1 + + copyCount := copy(out, b.buf[b.first:rightInd+1]) + + // We've emptied the ring + if rightInd == b.last { + b.empty = true + } + b.first = rightInd + 1 + b.first %= b.size + + // We circle back for the rest + if copyCount < outLen { + right := min(b.last, outLen-copyCount) + copy(out[copyCount:], b.buf[b.first:right+1]) + // We've emptied the ring + if right == b.last { + b.empty = true + } + b.first = right + 1 + b.first %= b.size } - b.mu.Unlock() return out } diff --git a/internal/buffer/buffer_test.go b/internal/buffer/buffer_test.go index f84d8c66d..b3f666fd0 100644 --- a/internal/buffer/buffer_test.go +++ b/internal/buffer/buffer_test.go @@ -1,6 +1,8 @@ package buffer import ( + "sync" + "sync/atomic" "testing" "github.com/influxdata/telegraf" @@ -17,6 +19,107 @@ var metricList = []telegraf.Metric{ testutil.TestMetric(8, "mymetric5"), } +func makeBench5(b *testing.B, freq, batchSize int) { + const k = 1000 + var wg sync.WaitGroup + buf := NewBuffer(10000) + m := testutil.TestMetric(1, "mymetric") + + for i := 0; i < b.N; i++ { + buf.Add(m, m, m, m, m) + if i%(freq*k) == 0 { + wg.Add(1) + go func() { + buf.Batch(batchSize * k) + wg.Done() + }() + } + } + // Flush + buf.Batch(b.N) + wg.Wait() + +} +func makeBenchStrict(b *testing.B, freq, batchSize int) { + const k = 1000 + var count uint64 + var wg sync.WaitGroup + buf := NewBuffer(10000) + m := testutil.TestMetric(1, "mymetric") + + for i := 0; i < b.N; i++ { + buf.Add(m) + if i%(freq*k) == 0 { + wg.Add(1) + go func() { + defer wg.Done() + l := len(buf.Batch(batchSize * k)) + atomic.AddUint64(&count, uint64(l)) + }() + } + } + // Flush + wg.Add(1) + go func() { + l := len(buf.Batch(b.N)) + atomic.AddUint64(&count, uint64(l)) + wg.Done() + }() + + wg.Wait() + if count != uint64(b.N) { + b.Errorf("not all metrics came out. %d of %d", count, b.N) + } +} +func makeBench(b *testing.B, freq, batchSize int) { + const k = 1000 + var wg sync.WaitGroup + buf := NewBuffer(10000) + m := testutil.TestMetric(1, "mymetric") + + for i := 0; i < b.N; i++ { + buf.Add(m) + if i%(freq*k) == 0 { + wg.Add(1) + go func() { + buf.Batch(batchSize * k) + wg.Done() + }() + } + } + wg.Wait() + // Flush + buf.Batch(b.N) +} + +func BenchmarkBufferBatch5Add(b *testing.B) { + makeBench5(b, 100, 101) +} +func BenchmarkBufferBigInfrequentBatchCatchup(b *testing.B) { + makeBench(b, 100, 101) +} +func BenchmarkBufferOftenBatch(b *testing.B) { + makeBench(b, 1, 1) +} +func BenchmarkBufferAlmostBatch(b *testing.B) { + makeBench(b, 10, 9) +} +func BenchmarkBufferSlowBatch(b *testing.B) { + makeBench(b, 10, 1) +} +func BenchmarkBufferBatchNoDrop(b *testing.B) { + makeBenchStrict(b, 1, 4) +} +func BenchmarkBufferCatchup(b *testing.B) { + buf := NewBuffer(10000) + m := testutil.TestMetric(1, "mymetric") + + for i := 0; i < b.N; i++ { + buf.Add(m) + } + buf.Batch(b.N) +} + func BenchmarkAddMetrics(b *testing.B) { buf := NewBuffer(10000) m := testutil.TestMetric(1, "mymetric")