Implement a lock based ring buffer for internal/buffer. (#3377)

This commit is contained in:
estk 2018-08-20 14:47:48 -06:00 committed by Daniel Nelson
parent 036981c3b5
commit edb6e1f655
2 changed files with 179 additions and 22 deletions

View File

@ -14,9 +14,12 @@ var (
// Buffer is an object for storing metrics in a circular buffer. // Buffer is an object for storing metrics in a circular buffer.
type Buffer struct { type Buffer struct {
buf chan telegraf.Metric sync.Mutex
buf []telegraf.Metric
mu sync.Mutex first int
last int
size int
empty bool
} }
// NewBuffer returns a Buffer // NewBuffer returns a Buffer
@ -24,47 +27,98 @@ type Buffer struct {
// called when the buffer is full, then the oldest metric(s) will be dropped. // called when the buffer is full, then the oldest metric(s) will be dropped.
func NewBuffer(size int) *Buffer { func NewBuffer(size int) *Buffer {
return &Buffer{ return &Buffer{
buf: make(chan telegraf.Metric, size), buf: make([]telegraf.Metric, size),
first: 0,
last: 0,
size: size,
empty: true,
} }
} }
// IsEmpty returns true if Buffer is empty. // IsEmpty returns true if Buffer is empty.
func (b *Buffer) IsEmpty() bool { func (b *Buffer) IsEmpty() bool {
return len(b.buf) == 0 return b.empty
} }
// Len returns the current length of the buffer. // Len returns the current length of the buffer.
func (b *Buffer) Len() int { func (b *Buffer) Len() int {
return len(b.buf) if b.empty {
return 0
} else if b.first <= b.last {
return b.last - b.first + 1
}
// Spans the end of array.
// size - gap in the middle
return b.size - (b.first - b.last - 1) // size - gap
}
func (b *Buffer) push(m telegraf.Metric) {
// Empty
if b.empty {
b.last = b.first // Reset
b.buf[b.last] = m
b.empty = false
return
}
b.last++
b.last %= b.size
// Full
if b.first == b.last {
MetricsDropped.Incr(1)
b.first = (b.first + 1) % b.size
}
b.buf[b.last] = m
} }
// Add adds metrics to the buffer. // Add adds metrics to the buffer.
func (b *Buffer) Add(metrics ...telegraf.Metric) { func (b *Buffer) Add(metrics ...telegraf.Metric) {
b.mu.Lock() b.Lock()
for i, _ := range metrics { defer b.Unlock()
for i := range metrics {
MetricsWritten.Incr(1) MetricsWritten.Incr(1)
select { b.push(metrics[i])
case b.buf <- metrics[i]:
default:
MetricsDropped.Incr(1)
<-b.buf
b.buf <- metrics[i]
} }
}
b.mu.Unlock()
} }
// Batch returns a batch of metrics of size batchSize. // Batch returns a batch of metrics of size batchSize.
// the batch will be of maximum length batchSize. It can be less than batchSize, // the batch will be of maximum length batchSize. It can be less than batchSize,
// if the length of Buffer is less than batchSize. // if the length of Buffer is less than batchSize.
func (b *Buffer) Batch(batchSize int) []telegraf.Metric { func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
b.mu.Lock() b.Lock()
n := min(len(b.buf), batchSize) defer b.Unlock()
out := make([]telegraf.Metric, n) outLen := min(b.Len(), batchSize)
for i := 0; i < n; i++ { out := make([]telegraf.Metric, outLen)
out[i] = <-b.buf if outLen == 0 {
return out
}
// We copy everything right of first up to last, count or end
// b.last >= rightInd || b.last < b.first
// therefore wont copy past b.last
rightInd := min(b.size, b.first+outLen) - 1
copyCount := copy(out, b.buf[b.first:rightInd+1])
// We've emptied the ring
if rightInd == b.last {
b.empty = true
}
b.first = rightInd + 1
b.first %= b.size
// We circle back for the rest
if copyCount < outLen {
right := min(b.last, outLen-copyCount)
copy(out[copyCount:], b.buf[b.first:right+1])
// We've emptied the ring
if right == b.last {
b.empty = true
}
b.first = right + 1
b.first %= b.size
} }
b.mu.Unlock()
return out return out
} }

View File

@ -1,6 +1,8 @@
package buffer package buffer
import ( import (
"sync"
"sync/atomic"
"testing" "testing"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
@ -17,6 +19,107 @@ var metricList = []telegraf.Metric{
testutil.TestMetric(8, "mymetric5"), testutil.TestMetric(8, "mymetric5"),
} }
func makeBench5(b *testing.B, freq, batchSize int) {
const k = 1000
var wg sync.WaitGroup
buf := NewBuffer(10000)
m := testutil.TestMetric(1, "mymetric")
for i := 0; i < b.N; i++ {
buf.Add(m, m, m, m, m)
if i%(freq*k) == 0 {
wg.Add(1)
go func() {
buf.Batch(batchSize * k)
wg.Done()
}()
}
}
// Flush
buf.Batch(b.N)
wg.Wait()
}
func makeBenchStrict(b *testing.B, freq, batchSize int) {
const k = 1000
var count uint64
var wg sync.WaitGroup
buf := NewBuffer(10000)
m := testutil.TestMetric(1, "mymetric")
for i := 0; i < b.N; i++ {
buf.Add(m)
if i%(freq*k) == 0 {
wg.Add(1)
go func() {
defer wg.Done()
l := len(buf.Batch(batchSize * k))
atomic.AddUint64(&count, uint64(l))
}()
}
}
// Flush
wg.Add(1)
go func() {
l := len(buf.Batch(b.N))
atomic.AddUint64(&count, uint64(l))
wg.Done()
}()
wg.Wait()
if count != uint64(b.N) {
b.Errorf("not all metrics came out. %d of %d", count, b.N)
}
}
func makeBench(b *testing.B, freq, batchSize int) {
const k = 1000
var wg sync.WaitGroup
buf := NewBuffer(10000)
m := testutil.TestMetric(1, "mymetric")
for i := 0; i < b.N; i++ {
buf.Add(m)
if i%(freq*k) == 0 {
wg.Add(1)
go func() {
buf.Batch(batchSize * k)
wg.Done()
}()
}
}
wg.Wait()
// Flush
buf.Batch(b.N)
}
func BenchmarkBufferBatch5Add(b *testing.B) {
makeBench5(b, 100, 101)
}
func BenchmarkBufferBigInfrequentBatchCatchup(b *testing.B) {
makeBench(b, 100, 101)
}
func BenchmarkBufferOftenBatch(b *testing.B) {
makeBench(b, 1, 1)
}
func BenchmarkBufferAlmostBatch(b *testing.B) {
makeBench(b, 10, 9)
}
func BenchmarkBufferSlowBatch(b *testing.B) {
makeBench(b, 10, 1)
}
func BenchmarkBufferBatchNoDrop(b *testing.B) {
makeBenchStrict(b, 1, 4)
}
func BenchmarkBufferCatchup(b *testing.B) {
buf := NewBuffer(10000)
m := testutil.TestMetric(1, "mymetric")
for i := 0; i < b.N; i++ {
buf.Add(m)
}
buf.Batch(b.N)
}
func BenchmarkAddMetrics(b *testing.B) { func BenchmarkAddMetrics(b *testing.B) {
buf := NewBuffer(10000) buf := NewBuffer(10000)
m := testutil.TestMetric(1, "mymetric") m := testutil.TestMetric(1, "mymetric")