Use lifo order in metric buffer (#5287)

This commit is contained in:
Daniel Nelson 2019-01-15 11:48:52 -08:00 committed by GitHub
parent 193aba8673
commit da80276802
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 365 additions and 77 deletions

View File

@ -22,8 +22,7 @@ type Buffer struct {
cap int // the capacity of the buffer
batchFirst int // index of the first metric in the batch
batchLast int // one after the index of the last metric in the batch
batchSize int // number of metrics current in the batch
batchSize int // number of metrics currently in the batch
MetricsAdded selfstat.Stat
MetricsWritten selfstat.Stat
@ -82,46 +81,24 @@ func (b *Buffer) metricDropped(metric telegraf.Metric) {
metric.Reject()
}
func (b *Buffer) inBatch() bool {
if b.batchSize == 0 {
return false
}
if b.batchFirst < b.batchLast {
return b.last >= b.batchFirst && b.last < b.batchLast
} else {
return b.last >= b.batchFirst || b.last < b.batchLast
}
}
func (b *Buffer) add(m telegraf.Metric) {
// Check if Buffer is full
if b.size == b.cap {
if b.batchSize == 0 {
// No batch taken by the output, we can drop the metric now.
b.metricDropped(b.buf[b.last])
} else if b.inBatch() {
// There is an outstanding batch and this will overwrite a metric
// in it, delay the dropping only in case the batch gets rejected.
if b.last == b.batchFirst && b.batchSize > 0 {
b.batchSize--
b.batchFirst++
b.batchFirst %= b.cap
} else {
// There is an outstanding batch, but this overwrites a metric
// outside of it.
b.metricDropped(b.buf[b.last])
b.batchFirst = b.next(b.batchFirst)
}
}
b.metricAdded()
b.buf[b.last] = m
b.last++
b.last %= b.cap
b.last = b.next(b.last)
if b.size == b.cap {
b.first++
b.first %= b.cap
b.first = b.next(b.first)
}
b.size = min(b.size+1, b.cap)
@ -138,10 +115,8 @@ func (b *Buffer) Add(metrics ...telegraf.Metric) {
}
// Batch returns a slice containing up to batchSize of the most recently added
// metrics.
//
// The metrics contained in the batch are not removed from the buffer, instead
// the last batch is recorded and removed only if Accept is called.
// metrics. Metrics are ordered from newest to oldest in the batch. The
// batch must not be modified by the client.
func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
b.Lock()
defer b.Unlock()
@ -152,21 +127,23 @@ func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
return out
}
b.batchFirst = b.first
b.batchLast = b.first + outLen
b.batchLast %= b.cap
b.batchFirst = b.cap + b.last - outLen
b.batchFirst %= b.cap
b.batchSize = outLen
until := min(b.cap, b.first+outLen)
n := copy(out, b.buf[b.first:until])
if n < outLen {
copy(out[n:], b.buf[:outLen-n])
batchIndex := b.batchFirst
for i := range out {
out[len(out)-1-i] = b.buf[batchIndex]
b.buf[batchIndex] = nil
batchIndex = b.next(batchIndex)
}
b.last = b.batchFirst
b.size -= outLen
return out
}
// Accept removes the metrics contained in the last batch.
// Accept marks the batch, acquired from Batch(), as successfully written.
func (b *Buffer) Accept(batch []telegraf.Metric) {
b.Lock()
defer b.Unlock()
@ -175,35 +152,89 @@ func (b *Buffer) Accept(batch []telegraf.Metric) {
b.metricWritten(m)
}
b.size -= b.batchSize
for i := 0; i < b.batchSize; i++ {
b.buf[b.first] = nil
b.first++
b.first %= b.cap
}
b.resetBatch()
}
// Reject clears the current batch record so that calls to Accept will have no
// effect.
// Reject returns the batch, acquired from Batch(), to the buffer and marks it
// as unsent.
func (b *Buffer) Reject(batch []telegraf.Metric) {
b.Lock()
defer b.Unlock()
if len(batch) > b.batchSize {
// Part or all of the batch was dropped before reject was called.
for _, m := range batch[b.batchSize:] {
b.metricDropped(m)
older := b.dist(b.first, b.batchFirst)
free := b.cap - b.size
restore := min(len(batch), free+older)
// Rotate newer metrics forward the number of metrics that we can restore.
rb := b.batchFirst
rp := b.last
re := b.nextby(rp, restore)
b.last = re
for rb != rp {
rp = b.prev(rp)
re = b.prev(re)
if b.buf[re] != nil {
b.metricDropped(b.buf[re])
}
b.buf[re] = b.buf[rp]
b.buf[rp] = nil
}
// Copy metrics from the batch back into the buffer; recall that the
// batch is in reverse order compared to b.buf
for i := range batch {
if i < restore {
re = b.prev(re)
b.buf[re] = batch[i]
b.size++
} else {
b.metricDropped(batch[i])
}
}
b.resetBatch()
}
// dist returns the distance between two indexes. Because this data structure
// uses a half open range the arguments must both either left side or right
// side pairs.
func (b *Buffer) dist(begin, end int) int {
if begin <= end {
return end - begin
} else {
return b.cap - begin - 1 + end
}
}
// next returns the next index with wrapping.
func (b *Buffer) next(index int) int {
index++
if index == b.cap {
return 0
}
return index
}
// next returns the index that is count newer with wrapping.
func (b *Buffer) nextby(index, count int) int {
index += count
index %= b.cap
return index
}
// next returns the prev index with wrapping.
func (b *Buffer) prev(index int) int {
index--
if index < 0 {
return b.cap - 1
}
return index
}
func (b *Buffer) resetBatch() {
b.batchFirst = 0
b.batchLast = 0
b.batchSize = 0
}

View File

@ -6,6 +6,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
@ -29,13 +30,17 @@ func (m *MockMetric) Drop() {
}
func Metric() telegraf.Metric {
return MetricTime(0)
}
func MetricTime(sec int64) telegraf.Metric {
m, err := metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
time.Unix(sec, 0),
)
if err != nil {
panic(err)
@ -147,6 +152,256 @@ func TestBuffer_BatchWrap(t *testing.T) {
require.Len(t, batch, 5)
}
func TestBuffer_BatchLatest(t *testing.T) {
b := setup(NewBuffer("test", 4))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
batch := b.Batch(2)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(3),
MetricTime(2),
}, batch)
}
func TestBuffer_BatchLatestWrap(t *testing.T) {
b := setup(NewBuffer("test", 4))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
b.Add(MetricTime(4))
b.Add(MetricTime(5))
batch := b.Batch(2)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(5),
MetricTime(4),
}, batch)
}
func TestBuffer_MultipleBatch(t *testing.T) {
b := setup(NewBuffer("test", 10))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
b.Add(MetricTime(4))
b.Add(MetricTime(5))
b.Add(MetricTime(6))
batch := b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(6),
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(2),
}, batch)
b.Accept(batch)
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(1),
}, batch)
b.Accept(batch)
}
func TestBuffer_RejectWithRoom(t *testing.T) {
b := setup(NewBuffer("test", 5))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
batch := b.Batch(2)
b.Add(MetricTime(4))
b.Add(MetricTime(5))
b.Reject(batch)
require.Equal(t, int64(0), b.MetricsDropped.Get())
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(2),
MetricTime(1),
}, batch)
}
func TestBuffer_RejectNothingNewFull(t *testing.T) {
b := setup(NewBuffer("test", 5))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
b.Add(MetricTime(4))
b.Add(MetricTime(5))
batch := b.Batch(2)
b.Reject(batch)
require.Equal(t, int64(0), b.MetricsDropped.Get())
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(2),
MetricTime(1),
}, batch)
}
func TestBuffer_RejectNoRoom(t *testing.T) {
b := setup(NewBuffer("test", 5))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
batch := b.Batch(2)
b.Add(MetricTime(4))
b.Add(MetricTime(5))
b.Add(MetricTime(6))
b.Add(MetricTime(7))
b.Add(MetricTime(8))
b.Reject(batch)
require.Equal(t, int64(3), b.MetricsDropped.Get())
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(8),
MetricTime(7),
MetricTime(6),
MetricTime(5),
MetricTime(4),
}, batch)
}
func TestBuffer_RejectRoomExact(t *testing.T) {
b := setup(NewBuffer("test", 5))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
batch := b.Batch(2)
b.Add(MetricTime(3))
b.Add(MetricTime(4))
b.Add(MetricTime(5))
b.Reject(batch)
require.Equal(t, int64(0), b.MetricsDropped.Get())
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(2),
MetricTime(1),
}, batch)
}
func TestBuffer_RejectRoomOverwriteOld(t *testing.T) {
b := setup(NewBuffer("test", 5))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
batch := b.Batch(1)
b.Add(MetricTime(4))
b.Add(MetricTime(5))
b.Add(MetricTime(6))
b.Reject(batch)
require.Equal(t, int64(1), b.MetricsDropped.Get())
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(6),
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(2),
}, batch)
}
func TestBuffer_RejectPartialRoom(t *testing.T) {
b := setup(NewBuffer("test", 5))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
batch := b.Batch(2)
b.Add(MetricTime(4))
b.Add(MetricTime(5))
b.Add(MetricTime(6))
b.Add(MetricTime(7))
b.Reject(batch)
require.Equal(t, int64(2), b.MetricsDropped.Get())
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(7),
MetricTime(6),
MetricTime(5),
MetricTime(4),
MetricTime(3),
}, batch)
}
func TestBuffer_RejectWrapped(t *testing.T) {
b := setup(NewBuffer("test", 5))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
batch := b.Batch(2)
b.Add(MetricTime(4))
b.Add(MetricTime(5))
// buffer: 1, 4, 5; batch: 2, 3
require.Equal(t, int64(0), b.MetricsDropped.Get())
b.Add(MetricTime(6))
b.Add(MetricTime(7))
b.Add(MetricTime(8))
b.Add(MetricTime(9))
b.Add(MetricTime(10))
// buffer: 8, 9, 10, 6, 7; batch: 2, 3
require.Equal(t, int64(3), b.MetricsDropped.Get())
b.Add(MetricTime(11))
b.Add(MetricTime(12))
b.Add(MetricTime(13))
b.Add(MetricTime(14))
b.Add(MetricTime(15))
// buffer: 13, 14, 15, 11, 12; batch: 2, 3
require.Equal(t, int64(8), b.MetricsDropped.Get())
b.Reject(batch)
require.Equal(t, int64(10), b.MetricsDropped.Get())
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(15),
MetricTime(14),
MetricTime(13),
MetricTime(12),
MetricTime(11),
}, batch)
}
func TestBuffer_AddDropsOverwrittenMetrics(t *testing.T) {
m := Metric()
b := setup(NewBuffer("test", 5))
@ -210,8 +465,8 @@ func TestBuffer_MetricsOverwriteBatchAccept(t *testing.T) {
batch := b.Batch(3)
b.Add(m, m, m)
b.Accept(batch)
require.Equal(t, int64(0), b.MetricsDropped.Get())
require.Equal(t, int64(3), b.MetricsWritten.Get())
require.Equal(t, int64(0), b.MetricsDropped.Get(), "dropped")
require.Equal(t, int64(3), b.MetricsWritten.Get(), "written")
}
func TestBuffer_MetricsOverwriteBatchReject(t *testing.T) {
@ -254,7 +509,7 @@ func TestBuffer_BatchNotRemoved(t *testing.T) {
b := setup(NewBuffer("test", 5))
b.Add(m, m, m, m, m)
b.Batch(2)
require.Equal(t, 5, b.Len())
require.Equal(t, 3, b.Len())
}
func TestBuffer_BatchRejectAcceptNoop(t *testing.T) {
@ -310,10 +565,8 @@ func TestBuffer_AddCallsMetricRejectWhenNotInBatch(t *testing.T) {
b.Add(mm, mm, mm, mm, mm)
batch := b.Batch(2)
b.Add(mm, mm, mm, mm)
// metric[2] and metric[3] rejected
require.Equal(t, 2, reject)
b.Reject(batch)
// metric[1] and metric[2] now rejected
require.Equal(t, 4, reject)
}

View File

@ -7,7 +7,6 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -28,6 +27,14 @@ var next5 = []telegraf.Metric{
testutil.TestMetric(101, "metric10"),
}
func reverse(metrics []telegraf.Metric) []telegraf.Metric {
result := make([]telegraf.Metric, 0, len(metrics))
for i := len(metrics) - 1; i >= 0; i-- {
result = append(result, metrics[i])
}
return result
}
// Benchmark adding metrics.
func BenchmarkRunningOutputAddWrite(b *testing.B) {
conf := &OutputConfig{
@ -297,7 +304,7 @@ func TestRunningOutputWriteFailOrder(t *testing.T) {
// Verify that 10 metrics were written
assert.Len(t, m.Metrics(), 10)
// Verify that they are in order
expected := append(first5, next5...)
expected := append(reverse(next5), reverse(first5)...)
assert.Equal(t, expected, m.Metrics())
}
@ -355,24 +362,17 @@ func TestRunningOutputWriteFailOrder2(t *testing.T) {
err = ro.Write()
require.NoError(t, err)
// Verify that 10 metrics were written
// Verify that 20 metrics were written
assert.Len(t, m.Metrics(), 20)
// Verify that they are in order
expected := append(first5, next5...)
expected = append(expected, first5...)
expected = append(expected, next5...)
expected := append(reverse(next5), reverse(first5)...)
expected = append(expected, reverse(next5)...)
expected = append(expected, reverse(first5)...)
assert.Equal(t, expected, m.Metrics())
}
// Verify that the order of points is preserved when there is a remainder
// of points for the batch.
//
// ie, with a batch size of 5:
//
// 1 2 3 4 5 6 <-- order, failed points
// 6 1 2 3 4 5 <-- order, after 1st write failure (1 2 3 4 5 was batch)
// 1 2 3 4 5 6 <-- order, after 2nd write failure, (6 was batch)
//
func TestRunningOutputWriteFailOrder3(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{},
@ -408,7 +408,7 @@ func TestRunningOutputWriteFailOrder3(t *testing.T) {
// Verify that 6 metrics were written
assert.Len(t, m.Metrics(), 6)
// Verify that they are in order
expected := append(first5, next5[0])
expected := []telegraf.Metric{next5[0], first5[4], first5[3], first5[2], first5[1], first5[0]}
assert.Equal(t, expected, m.Metrics())
}

View File

@ -19,6 +19,10 @@ type metricDiff struct {
}
func newMetricDiff(metric telegraf.Metric) *metricDiff {
if metric == nil {
return nil
}
m := &metricDiff{}
m.Measurement = metric.Name()