Update the buffer_size internal metric after writes (#5314)

This commit is contained in:
Daniel Nelson 2019-01-22 13:43:51 -08:00 committed by GitHub
parent fa9a654f2d
commit b34c5e0d04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 120 additions and 45 deletions

View File

@ -27,6 +27,8 @@ type Buffer struct {
MetricsAdded selfstat.Stat MetricsAdded selfstat.Stat
MetricsWritten selfstat.Stat MetricsWritten selfstat.Stat
MetricsDropped selfstat.Stat MetricsDropped selfstat.Stat
BufferSize selfstat.Stat
BufferLimit selfstat.Stat
} }
// NewBuffer returns a new empty Buffer with the given capacity. // NewBuffer returns a new empty Buffer with the given capacity.
@ -53,7 +55,19 @@ func NewBuffer(name string, capacity int) *Buffer {
"metrics_dropped", "metrics_dropped",
map[string]string{"output": name}, map[string]string{"output": name},
), ),
BufferSize: selfstat.Register(
"write",
"buffer_size",
map[string]string{"output": name},
),
BufferLimit: selfstat.Register(
"write",
"buffer_limit",
map[string]string{"output": name},
),
} }
b.BufferSize.Set(int64(0))
b.BufferLimit.Set(int64(capacity))
return b return b
} }
@ -62,7 +76,11 @@ func (b *Buffer) Len() int {
b.Lock() b.Lock()
defer b.Unlock() defer b.Unlock()
return b.size return b.length()
}
func (b *Buffer) length() int {
return min(b.size+b.batchSize, b.cap)
} }
func (b *Buffer) metricAdded() { func (b *Buffer) metricAdded() {
@ -112,6 +130,8 @@ func (b *Buffer) Add(metrics ...telegraf.Metric) {
for i := range metrics { for i := range metrics {
b.add(metrics[i]) b.add(metrics[i])
} }
b.BufferSize.Set(int64(b.length()))
} }
// Batch returns a slice containing up to batchSize of the most recently added // Batch returns a slice containing up to batchSize of the most recently added
@ -153,6 +173,7 @@ func (b *Buffer) Accept(batch []telegraf.Metric) {
} }
b.resetBatch() b.resetBatch()
b.BufferSize.Set(int64(b.length()))
} }
// Reject returns the batch, acquired from Batch(), to the buffer and marks it // Reject returns the batch, acquired from Batch(), to the buffer and marks it
@ -176,6 +197,7 @@ func (b *Buffer) Reject(batch []telegraf.Metric) {
if b.buf[re] != nil { if b.buf[re] != nil {
b.metricDropped(b.buf[re]) b.metricDropped(b.buf[re])
b.first = b.next(b.first)
} }
b.buf[re] = b.buf[rp] b.buf[re] = b.buf[rp]
@ -188,13 +210,14 @@ func (b *Buffer) Reject(batch []telegraf.Metric) {
if i < restore { if i < restore {
re = b.prev(re) re = b.prev(re)
b.buf[re] = batch[i] b.buf[re] = batch[i]
b.size++ b.size = min(b.size+1, b.cap)
} else { } else {
b.metricDropped(batch[i]) b.metricDropped(batch[i])
} }
} }
b.resetBatch() b.resetBatch()
b.BufferSize.Set(int64(b.length()))
} }
// dist returns the distance between two indexes. Because this data structure // dist returns the distance between two indexes. Because this data structure
@ -204,7 +227,7 @@ func (b *Buffer) dist(begin, end int) int {
if begin <= end { if begin <= end {
return end - begin return end - begin
} else { } else {
return b.cap - begin - 1 + end return b.cap - begin + end
} }
} }

View File

@ -359,7 +359,7 @@ func TestBuffer_RejectPartialRoom(t *testing.T) {
}, batch) }, batch)
} }
func TestBuffer_RejectWrapped(t *testing.T) { func TestBuffer_RejectNewMetricsWrapped(t *testing.T) {
b := setup(NewBuffer("test", 5)) b := setup(NewBuffer("test", 5))
b.Add(MetricTime(1)) b.Add(MetricTime(1))
b.Add(MetricTime(2)) b.Add(MetricTime(2))
@ -402,6 +402,84 @@ func TestBuffer_RejectWrapped(t *testing.T) {
}, batch) }, batch)
} }
func TestBuffer_RejectWrapped(t *testing.T) {
b := setup(NewBuffer("test", 5))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
b.Add(MetricTime(4))
b.Add(MetricTime(5))
b.Add(MetricTime(6))
b.Add(MetricTime(7))
b.Add(MetricTime(8))
batch := b.Batch(3)
b.Add(MetricTime(9))
b.Add(MetricTime(10))
b.Add(MetricTime(11))
b.Add(MetricTime(12))
b.Reject(batch)
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(12),
MetricTime(11),
MetricTime(10),
MetricTime(9),
MetricTime(8),
}, batch)
}
func TestBuffer_RejectAdjustFirst(t *testing.T) {
b := setup(NewBuffer("test", 10))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
batch := b.Batch(3)
b.Add(MetricTime(4))
b.Add(MetricTime(5))
b.Add(MetricTime(6))
b.Reject(batch)
b.Add(MetricTime(7))
b.Add(MetricTime(8))
b.Add(MetricTime(9))
batch = b.Batch(3)
b.Add(MetricTime(10))
b.Add(MetricTime(11))
b.Add(MetricTime(12))
b.Reject(batch)
b.Add(MetricTime(13))
b.Add(MetricTime(14))
b.Add(MetricTime(15))
batch = b.Batch(3)
b.Add(MetricTime(16))
b.Add(MetricTime(17))
b.Add(MetricTime(18))
b.Reject(batch)
b.Add(MetricTime(19))
batch = b.Batch(10)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(19),
MetricTime(18),
MetricTime(17),
MetricTime(16),
MetricTime(15),
MetricTime(14),
MetricTime(13),
MetricTime(12),
MetricTime(11),
MetricTime(10),
}, batch)
}
func TestBuffer_AddDropsOverwrittenMetrics(t *testing.T) { func TestBuffer_AddDropsOverwrittenMetrics(t *testing.T) {
m := Metric() m := Metric()
b := setup(NewBuffer("test", 5)) b := setup(NewBuffer("test", 5))
@ -509,7 +587,7 @@ func TestBuffer_BatchNotRemoved(t *testing.T) {
b := setup(NewBuffer("test", 5)) b := setup(NewBuffer("test", 5))
b.Add(m, m, m, m, m) b.Add(m, m, m, m, m)
b.Batch(2) b.Batch(2)
require.Equal(t, 3, b.Len()) require.Equal(t, 5, b.Len())
} }
func TestBuffer_BatchRejectAcceptNoop(t *testing.T) { func TestBuffer_BatchRejectAcceptNoop(t *testing.T) {

View File

@ -3,6 +3,7 @@ package models
import ( import (
"log" "log"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
@ -29,6 +30,9 @@ type OutputConfig struct {
// RunningOutput contains the output configuration // RunningOutput contains the output configuration
type RunningOutput struct { type RunningOutput struct {
// Must be 64-bit aligned
newMetricsCount int64
Name string Name string
Output telegraf.Output Output telegraf.Output
Config *OutputConfig Config *OutputConfig
@ -36,16 +40,13 @@ type RunningOutput struct {
MetricBatchSize int MetricBatchSize int
MetricsFiltered selfstat.Stat MetricsFiltered selfstat.Stat
BufferSize selfstat.Stat
BufferLimit selfstat.Stat
WriteTime selfstat.Stat WriteTime selfstat.Stat
batch []telegraf.Metric
buffer *Buffer
BatchReady chan time.Time BatchReady chan time.Time
buffer *Buffer
aggMutex sync.Mutex aggMutex sync.Mutex
batchMutex sync.Mutex
} }
func NewRunningOutput( func NewRunningOutput(
@ -69,7 +70,6 @@ func NewRunningOutput(
} }
ro := &RunningOutput{ ro := &RunningOutput{
Name: name, Name: name,
batch: make([]telegraf.Metric, 0, batchSize),
buffer: NewBuffer(name, bufferLimit), buffer: NewBuffer(name, bufferLimit),
BatchReady: make(chan time.Time, 1), BatchReady: make(chan time.Time, 1),
Output: output, Output: output,
@ -81,16 +81,6 @@ func NewRunningOutput(
"metrics_filtered", "metrics_filtered",
map[string]string{"output": name}, map[string]string{"output": name},
), ),
BufferSize: selfstat.Register(
"write",
"buffer_size",
map[string]string{"output": name},
),
BufferLimit: selfstat.Register(
"write",
"buffer_limit",
map[string]string{"output": name},
),
WriteTime: selfstat.RegisterTiming( WriteTime: selfstat.RegisterTiming(
"write", "write",
"write_time_ns", "write_time_ns",
@ -98,7 +88,6 @@ func NewRunningOutput(
), ),
} }
ro.BufferLimit.Set(int64(ro.MetricBufferLimit))
return ro return ro
} }
@ -129,28 +118,16 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
return return
} }
ro.batchMutex.Lock() ro.buffer.Add(metric)
ro.batch = append(ro.batch, metric)
if len(ro.batch) == ro.MetricBatchSize {
ro.addBatchToBuffer()
nBuffer := ro.buffer.Len()
ro.BufferSize.Set(int64(nBuffer))
count := atomic.AddInt64(&ro.newMetricsCount, 1)
if count == int64(ro.MetricBatchSize) {
atomic.StoreInt64(&ro.newMetricsCount, 0)
select { select {
case ro.BatchReady <- time.Now(): case ro.BatchReady <- time.Now():
default: default:
} }
} }
ro.batchMutex.Unlock()
}
// AddBatchToBuffer moves the metrics from the batch into the metric buffer.
func (ro *RunningOutput) addBatchToBuffer() {
ro.buffer.Add(ro.batch...)
ro.batch = ro.batch[:0]
} }
// Write writes all metrics to the output, stopping when all have been sent on // Write writes all metrics to the output, stopping when all have been sent on
@ -163,15 +140,12 @@ func (ro *RunningOutput) Write() error {
output.Reset() output.Reset()
ro.aggMutex.Unlock() ro.aggMutex.Unlock()
} }
// add and write can be called concurrently
ro.batchMutex.Lock()
ro.addBatchToBuffer()
ro.batchMutex.Unlock()
nBuffer := ro.buffer.Len() atomic.StoreInt64(&ro.newMetricsCount, 0)
// Only process the metrics in the buffer now. Metrics added while we are // Only process the metrics in the buffer now. Metrics added while we are
// writing will be sent on the next call. // writing will be sent on the next call.
nBuffer := ro.buffer.Len()
nBatches := nBuffer/ro.MetricBatchSize + 1 nBatches := nBuffer/ro.MetricBatchSize + 1
for i := 0; i < nBatches; i++ { for i := 0; i < nBatches; i++ {
batch := ro.buffer.Batch(ro.MetricBatchSize) batch := ro.buffer.Batch(ro.MetricBatchSize)
@ -189,7 +163,7 @@ func (ro *RunningOutput) Write() error {
return nil return nil
} }
// WriteBatch writes only the batch metrics to the output. // WriteBatch writes a single batch of metrics to the output.
func (ro *RunningOutput) WriteBatch() error { func (ro *RunningOutput) WriteBatch() error {
batch := ro.buffer.Batch(ro.MetricBatchSize) batch := ro.buffer.Batch(ro.MetricBatchSize)
if len(batch) == 0 { if len(batch) == 0 {