From da802768023bbaf40327358c3ab24a56b09372e1 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 15 Jan 2019 11:48:52 -0800 Subject: [PATCH] Use lifo order in metric buffer (#5287) --- internal/models/buffer.go | 145 ++++++++------ internal/models/buffer_test.go | 265 ++++++++++++++++++++++++- internal/models/running_output_test.go | 28 +-- testutil/metric.go | 4 + 4 files changed, 365 insertions(+), 77 deletions(-) diff --git a/internal/models/buffer.go b/internal/models/buffer.go index 8c03db3d4..3d82e573a 100644 --- a/internal/models/buffer.go +++ b/internal/models/buffer.go @@ -22,8 +22,7 @@ type Buffer struct { cap int // the capacity of the buffer batchFirst int // index of the first metric in the batch - batchLast int // one after the index of the last metric in the batch - batchSize int // number of metrics current in the batch + batchSize int // number of metrics currently in the batch MetricsAdded selfstat.Stat MetricsWritten selfstat.Stat @@ -82,46 +81,24 @@ func (b *Buffer) metricDropped(metric telegraf.Metric) { metric.Reject() } -func (b *Buffer) inBatch() bool { - if b.batchSize == 0 { - return false - } - - if b.batchFirst < b.batchLast { - return b.last >= b.batchFirst && b.last < b.batchLast - } else { - return b.last >= b.batchFirst || b.last < b.batchLast - } -} - func (b *Buffer) add(m telegraf.Metric) { // Check if Buffer is full if b.size == b.cap { - if b.batchSize == 0 { - // No batch taken by the output, we can drop the metric now. - b.metricDropped(b.buf[b.last]) - } else if b.inBatch() { - // There is an outstanding batch and this will overwrite a metric - // in it, delay the dropping only in case the batch gets rejected. + b.metricDropped(b.buf[b.last]) + + if b.last == b.batchFirst && b.batchSize > 0 { b.batchSize-- - b.batchFirst++ - b.batchFirst %= b.cap - } else { - // There is an outstanding batch, but this overwrites a metric - // outside of it. - b.metricDropped(b.buf[b.last]) + b.batchFirst = b.next(b.batchFirst) } } b.metricAdded() b.buf[b.last] = m - b.last++ - b.last %= b.cap + b.last = b.next(b.last) if b.size == b.cap { - b.first++ - b.first %= b.cap + b.first = b.next(b.first) } b.size = min(b.size+1, b.cap) @@ -138,10 +115,8 @@ func (b *Buffer) Add(metrics ...telegraf.Metric) { } // Batch returns a slice containing up to batchSize of the most recently added -// metrics. -// -// The metrics contained in the batch are not removed from the buffer, instead -// the last batch is recorded and removed only if Accept is called. +// metrics. Metrics are ordered from newest to oldest in the batch. The +// batch must not be modified by the client. func (b *Buffer) Batch(batchSize int) []telegraf.Metric { b.Lock() defer b.Unlock() @@ -152,21 +127,23 @@ func (b *Buffer) Batch(batchSize int) []telegraf.Metric { return out } - b.batchFirst = b.first - b.batchLast = b.first + outLen - b.batchLast %= b.cap + b.batchFirst = b.cap + b.last - outLen + b.batchFirst %= b.cap b.batchSize = outLen - until := min(b.cap, b.first+outLen) - - n := copy(out, b.buf[b.first:until]) - if n < outLen { - copy(out[n:], b.buf[:outLen-n]) + batchIndex := b.batchFirst + for i := range out { + out[len(out)-1-i] = b.buf[batchIndex] + b.buf[batchIndex] = nil + batchIndex = b.next(batchIndex) } + + b.last = b.batchFirst + b.size -= outLen return out } -// Accept removes the metrics contained in the last batch. +// Accept marks the batch, acquired from Batch(), as successfully written. func (b *Buffer) Accept(batch []telegraf.Metric) { b.Lock() defer b.Unlock() @@ -175,35 +152,89 @@ func (b *Buffer) Accept(batch []telegraf.Metric) { b.metricWritten(m) } - b.size -= b.batchSize - for i := 0; i < b.batchSize; i++ { - b.buf[b.first] = nil - b.first++ - b.first %= b.cap - } - b.resetBatch() } -// Reject clears the current batch record so that calls to Accept will have no -// effect. +// Reject returns the batch, acquired from Batch(), to the buffer and marks it +// as unsent. func (b *Buffer) Reject(batch []telegraf.Metric) { b.Lock() defer b.Unlock() - if len(batch) > b.batchSize { - // Part or all of the batch was dropped before reject was called. - for _, m := range batch[b.batchSize:] { - b.metricDropped(m) + older := b.dist(b.first, b.batchFirst) + free := b.cap - b.size + restore := min(len(batch), free+older) + + // Rotate newer metrics forward the number of metrics that we can restore. + rb := b.batchFirst + rp := b.last + re := b.nextby(rp, restore) + b.last = re + for rb != rp { + rp = b.prev(rp) + re = b.prev(re) + + if b.buf[re] != nil { + b.metricDropped(b.buf[re]) + } + + b.buf[re] = b.buf[rp] + b.buf[rp] = nil + } + + // Copy metrics from the batch back into the buffer; recall that the + // batch is in reverse order compared to b.buf + for i := range batch { + if i < restore { + re = b.prev(re) + b.buf[re] = batch[i] + b.size++ + } else { + b.metricDropped(batch[i]) } } b.resetBatch() } +// dist returns the distance between two indexes. Because this data structure +// uses a half open range the arguments must both either left side or right +// side pairs. +func (b *Buffer) dist(begin, end int) int { + if begin <= end { + return end - begin + } else { + return b.cap - begin - 1 + end + } +} + +// next returns the next index with wrapping. +func (b *Buffer) next(index int) int { + index++ + if index == b.cap { + return 0 + } + return index +} + +// next returns the index that is count newer with wrapping. +func (b *Buffer) nextby(index, count int) int { + index += count + index %= b.cap + return index +} + +// next returns the prev index with wrapping. +func (b *Buffer) prev(index int) int { + index-- + if index < 0 { + return b.cap - 1 + } + return index +} + func (b *Buffer) resetBatch() { b.batchFirst = 0 - b.batchLast = 0 b.batchSize = 0 } diff --git a/internal/models/buffer_test.go b/internal/models/buffer_test.go index 246aaf6ea..7aa55a2c2 100644 --- a/internal/models/buffer_test.go +++ b/internal/models/buffer_test.go @@ -6,6 +6,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) @@ -29,13 +30,17 @@ func (m *MockMetric) Drop() { } func Metric() telegraf.Metric { + return MetricTime(0) +} + +func MetricTime(sec int64) telegraf.Metric { m, err := metric.New( "cpu", map[string]string{}, map[string]interface{}{ "value": 42.0, }, - time.Unix(0, 0), + time.Unix(sec, 0), ) if err != nil { panic(err) @@ -147,6 +152,256 @@ func TestBuffer_BatchWrap(t *testing.T) { require.Len(t, batch, 5) } +func TestBuffer_BatchLatest(t *testing.T) { + b := setup(NewBuffer("test", 4)) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + batch := b.Batch(2) + + testutil.RequireMetricsEqual(t, + []telegraf.Metric{ + MetricTime(3), + MetricTime(2), + }, batch) +} + +func TestBuffer_BatchLatestWrap(t *testing.T) { + b := setup(NewBuffer("test", 4)) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + batch := b.Batch(2) + + testutil.RequireMetricsEqual(t, + []telegraf.Metric{ + MetricTime(5), + MetricTime(4), + }, batch) +} + +func TestBuffer_MultipleBatch(t *testing.T) { + b := setup(NewBuffer("test", 10)) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + b.Add(MetricTime(6)) + batch := b.Batch(5) + testutil.RequireMetricsEqual(t, + []telegraf.Metric{ + MetricTime(6), + MetricTime(5), + MetricTime(4), + MetricTime(3), + MetricTime(2), + }, batch) + b.Accept(batch) + batch = b.Batch(5) + testutil.RequireMetricsEqual(t, + []telegraf.Metric{ + MetricTime(1), + }, batch) + b.Accept(batch) +} + +func TestBuffer_RejectWithRoom(t *testing.T) { + b := setup(NewBuffer("test", 5)) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + batch := b.Batch(2) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + b.Reject(batch) + + require.Equal(t, int64(0), b.MetricsDropped.Get()) + + batch = b.Batch(5) + testutil.RequireMetricsEqual(t, + []telegraf.Metric{ + MetricTime(5), + MetricTime(4), + MetricTime(3), + MetricTime(2), + MetricTime(1), + }, batch) +} + +func TestBuffer_RejectNothingNewFull(t *testing.T) { + b := setup(NewBuffer("test", 5)) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + batch := b.Batch(2) + b.Reject(batch) + + require.Equal(t, int64(0), b.MetricsDropped.Get()) + + batch = b.Batch(5) + testutil.RequireMetricsEqual(t, + []telegraf.Metric{ + MetricTime(5), + MetricTime(4), + MetricTime(3), + MetricTime(2), + MetricTime(1), + }, batch) +} + +func TestBuffer_RejectNoRoom(t *testing.T) { + b := setup(NewBuffer("test", 5)) + b.Add(MetricTime(1)) + + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + batch := b.Batch(2) + + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + b.Add(MetricTime(6)) + b.Add(MetricTime(7)) + b.Add(MetricTime(8)) + + b.Reject(batch) + + require.Equal(t, int64(3), b.MetricsDropped.Get()) + + batch = b.Batch(5) + testutil.RequireMetricsEqual(t, + []telegraf.Metric{ + MetricTime(8), + MetricTime(7), + MetricTime(6), + MetricTime(5), + MetricTime(4), + }, batch) +} + +func TestBuffer_RejectRoomExact(t *testing.T) { + b := setup(NewBuffer("test", 5)) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + batch := b.Batch(2) + b.Add(MetricTime(3)) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + + b.Reject(batch) + + require.Equal(t, int64(0), b.MetricsDropped.Get()) + + batch = b.Batch(5) + testutil.RequireMetricsEqual(t, + []telegraf.Metric{ + MetricTime(5), + MetricTime(4), + MetricTime(3), + MetricTime(2), + MetricTime(1), + }, batch) +} + +func TestBuffer_RejectRoomOverwriteOld(t *testing.T) { + b := setup(NewBuffer("test", 5)) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + batch := b.Batch(1) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + b.Add(MetricTime(6)) + + b.Reject(batch) + + require.Equal(t, int64(1), b.MetricsDropped.Get()) + + batch = b.Batch(5) + testutil.RequireMetricsEqual(t, + []telegraf.Metric{ + MetricTime(6), + MetricTime(5), + MetricTime(4), + MetricTime(3), + MetricTime(2), + }, batch) +} + +func TestBuffer_RejectPartialRoom(t *testing.T) { + b := setup(NewBuffer("test", 5)) + b.Add(MetricTime(1)) + + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + batch := b.Batch(2) + + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + b.Add(MetricTime(6)) + b.Add(MetricTime(7)) + b.Reject(batch) + + require.Equal(t, int64(2), b.MetricsDropped.Get()) + + batch = b.Batch(5) + testutil.RequireMetricsEqual(t, + []telegraf.Metric{ + MetricTime(7), + MetricTime(6), + MetricTime(5), + MetricTime(4), + MetricTime(3), + }, batch) +} + +func TestBuffer_RejectWrapped(t *testing.T) { + b := setup(NewBuffer("test", 5)) + b.Add(MetricTime(1)) + b.Add(MetricTime(2)) + b.Add(MetricTime(3)) + batch := b.Batch(2) + b.Add(MetricTime(4)) + b.Add(MetricTime(5)) + + // buffer: 1, 4, 5; batch: 2, 3 + require.Equal(t, int64(0), b.MetricsDropped.Get()) + + b.Add(MetricTime(6)) + b.Add(MetricTime(7)) + b.Add(MetricTime(8)) + b.Add(MetricTime(9)) + b.Add(MetricTime(10)) + + // buffer: 8, 9, 10, 6, 7; batch: 2, 3 + require.Equal(t, int64(3), b.MetricsDropped.Get()) + + b.Add(MetricTime(11)) + b.Add(MetricTime(12)) + b.Add(MetricTime(13)) + b.Add(MetricTime(14)) + b.Add(MetricTime(15)) + // buffer: 13, 14, 15, 11, 12; batch: 2, 3 + require.Equal(t, int64(8), b.MetricsDropped.Get()) + b.Reject(batch) + + require.Equal(t, int64(10), b.MetricsDropped.Get()) + + batch = b.Batch(5) + testutil.RequireMetricsEqual(t, + []telegraf.Metric{ + MetricTime(15), + MetricTime(14), + MetricTime(13), + MetricTime(12), + MetricTime(11), + }, batch) +} + func TestBuffer_AddDropsOverwrittenMetrics(t *testing.T) { m := Metric() b := setup(NewBuffer("test", 5)) @@ -210,8 +465,8 @@ func TestBuffer_MetricsOverwriteBatchAccept(t *testing.T) { batch := b.Batch(3) b.Add(m, m, m) b.Accept(batch) - require.Equal(t, int64(0), b.MetricsDropped.Get()) - require.Equal(t, int64(3), b.MetricsWritten.Get()) + require.Equal(t, int64(0), b.MetricsDropped.Get(), "dropped") + require.Equal(t, int64(3), b.MetricsWritten.Get(), "written") } func TestBuffer_MetricsOverwriteBatchReject(t *testing.T) { @@ -254,7 +509,7 @@ func TestBuffer_BatchNotRemoved(t *testing.T) { b := setup(NewBuffer("test", 5)) b.Add(m, m, m, m, m) b.Batch(2) - require.Equal(t, 5, b.Len()) + require.Equal(t, 3, b.Len()) } func TestBuffer_BatchRejectAcceptNoop(t *testing.T) { @@ -310,10 +565,8 @@ func TestBuffer_AddCallsMetricRejectWhenNotInBatch(t *testing.T) { b.Add(mm, mm, mm, mm, mm) batch := b.Batch(2) b.Add(mm, mm, mm, mm) - // metric[2] and metric[3] rejected require.Equal(t, 2, reject) b.Reject(batch) - // metric[1] and metric[2] now rejected require.Equal(t, 4, reject) } diff --git a/internal/models/running_output_test.go b/internal/models/running_output_test.go index fe8755395..fd38b0faa 100644 --- a/internal/models/running_output_test.go +++ b/internal/models/running_output_test.go @@ -7,7 +7,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -28,6 +27,14 @@ var next5 = []telegraf.Metric{ testutil.TestMetric(101, "metric10"), } +func reverse(metrics []telegraf.Metric) []telegraf.Metric { + result := make([]telegraf.Metric, 0, len(metrics)) + for i := len(metrics) - 1; i >= 0; i-- { + result = append(result, metrics[i]) + } + return result +} + // Benchmark adding metrics. func BenchmarkRunningOutputAddWrite(b *testing.B) { conf := &OutputConfig{ @@ -297,7 +304,7 @@ func TestRunningOutputWriteFailOrder(t *testing.T) { // Verify that 10 metrics were written assert.Len(t, m.Metrics(), 10) // Verify that they are in order - expected := append(first5, next5...) + expected := append(reverse(next5), reverse(first5)...) assert.Equal(t, expected, m.Metrics()) } @@ -355,24 +362,17 @@ func TestRunningOutputWriteFailOrder2(t *testing.T) { err = ro.Write() require.NoError(t, err) - // Verify that 10 metrics were written + // Verify that 20 metrics were written assert.Len(t, m.Metrics(), 20) // Verify that they are in order - expected := append(first5, next5...) - expected = append(expected, first5...) - expected = append(expected, next5...) + expected := append(reverse(next5), reverse(first5)...) + expected = append(expected, reverse(next5)...) + expected = append(expected, reverse(first5)...) assert.Equal(t, expected, m.Metrics()) } // Verify that the order of points is preserved when there is a remainder // of points for the batch. -// -// ie, with a batch size of 5: -// -// 1 2 3 4 5 6 <-- order, failed points -// 6 1 2 3 4 5 <-- order, after 1st write failure (1 2 3 4 5 was batch) -// 1 2 3 4 5 6 <-- order, after 2nd write failure, (6 was batch) -// func TestRunningOutputWriteFailOrder3(t *testing.T) { conf := &OutputConfig{ Filter: Filter{}, @@ -408,7 +408,7 @@ func TestRunningOutputWriteFailOrder3(t *testing.T) { // Verify that 6 metrics were written assert.Len(t, m.Metrics(), 6) // Verify that they are in order - expected := append(first5, next5[0]) + expected := []telegraf.Metric{next5[0], first5[4], first5[3], first5[2], first5[1], first5[0]} assert.Equal(t, expected, m.Metrics()) } diff --git a/testutil/metric.go b/testutil/metric.go index 6d0db4e17..5ce0a99a6 100644 --- a/testutil/metric.go +++ b/testutil/metric.go @@ -19,6 +19,10 @@ type metricDiff struct { } func newMetricDiff(metric telegraf.Metric) *metricDiff { + if metric == nil { + return nil + } + m := &metricDiff{} m.Measurement = metric.Name()