Performance refactor of running_output buffers

closes #914
closes #967
This commit is contained in:
Cameron Sparr
2016-04-25 17:49:06 -06:00
parent 1c4043ab39
commit 4de75ce621
7 changed files with 587 additions and 206 deletions

View File

@@ -2,14 +2,13 @@ package internal_models
import (
"log"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/buffer"
)
const (
// Default size of metrics batch size.
DEFAULT_METRIC_BATCH_SIZE = 1000
@@ -17,40 +16,40 @@ const (
DEFAULT_METRIC_BUFFER_LIMIT = 10000
)
// tmpmetrics point to batch of metrics ready to be wrote to output.
// readI point to the oldest batch of metrics (the first to sent to output). It
// may point to nil value if tmpmetrics is empty.
// writeI point to the next slot to buffer a batch of metrics is output fail to
// write.
// RunningOutput contains the output configuration
type RunningOutput struct {
Name string
Output telegraf.Output
Config *OutputConfig
Quiet bool
MetricBufferLimit int
MetricBatchSize int
FlushBufferWhenFull bool
Name string
Output telegraf.Output
Config *OutputConfig
Quiet bool
MetricBufferLimit int
MetricBatchSize int
metrics []telegraf.Metric
tmpmetrics []([]telegraf.Metric)
writeI int
readI int
sync.Mutex
metrics *buffer.Buffer
failMetrics *buffer.Buffer
}
func NewRunningOutput(
name string,
output telegraf.Output,
conf *OutputConfig,
batchSize int,
bufferLimit int,
) *RunningOutput {
if bufferLimit == 0 {
bufferLimit = DEFAULT_METRIC_BUFFER_LIMIT
}
if batchSize == 0 {
batchSize = DEFAULT_METRIC_BATCH_SIZE
}
ro := &RunningOutput{
Name: name,
metrics: make([]telegraf.Metric, 0),
metrics: buffer.NewBuffer(batchSize),
failMetrics: buffer.NewBuffer(bufferLimit),
Output: output,
Config: conf,
MetricBufferLimit: DEFAULT_METRIC_BUFFER_LIMIT,
MetricBatchSize: DEFAULT_METRIC_BATCH_SIZE,
MetricBufferLimit: bufferLimit,
MetricBatchSize: batchSize,
}
return ro
}
@@ -63,19 +62,6 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
return
}
}
ro.Lock()
defer ro.Unlock()
if ro.tmpmetrics == nil {
size := ro.MetricBufferLimit / ro.MetricBatchSize
// ro.metrics already contains one batch
size = size - 1
if size < 1 {
size = 1
}
ro.tmpmetrics = make([]([]telegraf.Metric), size)
}
// Filter any tagexclude/taginclude parameters before adding metric
if len(ro.Config.Filter.TagExclude) != 0 || len(ro.Config.Filter.TagInclude) != 0 {
@@ -90,69 +76,64 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
metric, _ = telegraf.NewMetric(name, tags, fields, t)
}
if len(ro.metrics) < ro.MetricBatchSize {
ro.metrics = append(ro.metrics, metric)
} else {
flushSuccess := true
if ro.FlushBufferWhenFull {
err := ro.write(ro.metrics)
if err != nil {
log.Printf("ERROR writing full metric buffer to output %s, %s",
ro.Name, err)
flushSuccess = false
}
} else {
flushSuccess = false
ro.metrics.Add(metric)
if ro.metrics.Len() == ro.MetricBatchSize {
batch := ro.metrics.Batch(ro.MetricBatchSize)
err := ro.write(batch)
if err != nil {
ro.failMetrics.Add(batch...)
}
if !flushSuccess {
if ro.tmpmetrics[ro.writeI] != nil && ro.writeI == ro.readI {
log.Printf("WARNING: overwriting cached metrics, you may want to " +
"increase the metric_buffer_limit setting in your [agent] " +
"config if you do not wish to overwrite metrics.\n")
ro.readI = (ro.readI + 1) % cap(ro.tmpmetrics)
}
ro.tmpmetrics[ro.writeI] = ro.metrics
ro.writeI = (ro.writeI + 1) % cap(ro.tmpmetrics)
}
ro.metrics = make([]telegraf.Metric, 0)
ro.metrics = append(ro.metrics, metric)
}
}
// Write writes all cached points to this output.
func (ro *RunningOutput) Write() error {
ro.Lock()
defer ro.Unlock()
if ro.tmpmetrics == nil {
size := ro.MetricBufferLimit / ro.MetricBatchSize
// ro.metrics already contains one batch
size = size - 1
if size < 1 {
size = 1
}
ro.tmpmetrics = make([]([]telegraf.Metric), size)
if !ro.Quiet {
log.Printf("Output [%s] buffer fullness: %d / %d metrics. "+
"Total gathered metrics: %d. Total dropped metrics: %d.",
ro.Name,
ro.failMetrics.Len()+ro.metrics.Len(),
ro.MetricBufferLimit,
ro.metrics.Total(),
ro.metrics.Drops()+ro.failMetrics.Drops())
}
// Write any cached metric buffers before, as those metrics are the
// oldest
for ro.tmpmetrics[ro.readI] != nil {
if err := ro.write(ro.tmpmetrics[ro.readI]); err != nil {
return err
} else {
ro.tmpmetrics[ro.readI] = nil
ro.readI = (ro.readI + 1) % cap(ro.tmpmetrics)
var err error
if !ro.failMetrics.IsEmpty() {
bufLen := ro.failMetrics.Len()
// how many batches of failed writes we need to write.
nBatches := bufLen/ro.MetricBatchSize + 1
batchSize := ro.MetricBatchSize
for i := 0; i < nBatches; i++ {
// If it's the last batch, only grab the metrics that have not had
// a write attempt already (this is primarily to preserve order).
if i == nBatches-1 {
batchSize = bufLen % ro.MetricBatchSize
}
batch := ro.failMetrics.Batch(batchSize)
// If we've already failed previous writes, don't bother trying to
// write to this output again. We are not exiting the loop just so
// that we can rotate the metrics to preserve order.
if err == nil {
err = ro.write(batch)
}
if err != nil {
ro.failMetrics.Add(batch...)
}
}
}
err := ro.write(ro.metrics)
batch := ro.metrics.Batch(ro.MetricBatchSize)
// see comment above about not trying to write to an already failed output.
// if ro.failMetrics is empty then err will always be nil at this point.
if err == nil {
err = ro.write(batch)
}
if err != nil {
ro.failMetrics.Add(batch...)
return err
} else {
ro.metrics = make([]telegraf.Metric, 0)
}
return nil
}
@@ -165,8 +146,8 @@ func (ro *RunningOutput) write(metrics []telegraf.Metric) error {
elapsed := time.Since(start)
if err == nil {
if !ro.Quiet {
log.Printf("Wrote %d metrics to output %s in %s\n",
len(metrics), ro.Name, elapsed)
log.Printf("Output [%s] wrote batch of %d metrics in %s\n",
ro.Name, len(metrics), elapsed)
}
}
return err

View File

@@ -2,7 +2,6 @@ package internal_models
import (
"fmt"
"sort"
"sync"
"testing"
@@ -29,6 +28,62 @@ var next5 = []telegraf.Metric{
testutil.TestMetric(101, "metric10"),
}
// Benchmark adding metrics.
func BenchmarkRunningOutputAddWrite(b *testing.B) {
conf := &OutputConfig{
Filter: Filter{
IsActive: false,
},
}
m := &perfOutput{}
ro := NewRunningOutput("test", m, conf, 1000, 10000)
ro.Quiet = true
for n := 0; n < b.N; n++ {
ro.AddMetric(first5[0])
ro.Write()
}
}
// Benchmark adding metrics.
func BenchmarkRunningOutputAddWriteEvery100(b *testing.B) {
conf := &OutputConfig{
Filter: Filter{
IsActive: false,
},
}
m := &perfOutput{}
ro := NewRunningOutput("test", m, conf, 1000, 10000)
ro.Quiet = true
for n := 0; n < b.N; n++ {
ro.AddMetric(first5[0])
if n%100 == 0 {
ro.Write()
}
}
}
// Benchmark adding metrics.
func BenchmarkRunningOutputAddFailWrites(b *testing.B) {
conf := &OutputConfig{
Filter: Filter{
IsActive: false,
},
}
m := &perfOutput{}
m.failWrite = true
ro := NewRunningOutput("test", m, conf, 1000, 10000)
ro.Quiet = true
for n := 0; n < b.N; n++ {
ro.AddMetric(first5[0])
}
}
// Test that NameDrop filters ger properly applied.
func TestRunningOutput_DropFilter(t *testing.T) {
conf := &OutputConfig{
@@ -40,7 +95,7 @@ func TestRunningOutput_DropFilter(t *testing.T) {
assert.NoError(t, conf.Filter.CompileFilter())
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
ro := NewRunningOutput("test", m, conf, 1000, 10000)
for _, metric := range first5 {
ro.AddMetric(metric)
@@ -66,7 +121,7 @@ func TestRunningOutput_PassFilter(t *testing.T) {
assert.NoError(t, conf.Filter.CompileFilter())
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
ro := NewRunningOutput("test", m, conf, 1000, 10000)
for _, metric := range first5 {
ro.AddMetric(metric)
@@ -92,7 +147,7 @@ func TestRunningOutput_TagIncludeNoMatch(t *testing.T) {
assert.NoError(t, conf.Filter.CompileFilter())
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
ro := NewRunningOutput("test", m, conf, 1000, 10000)
ro.AddMetric(first5[0])
assert.Len(t, m.Metrics(), 0)
@@ -114,7 +169,7 @@ func TestRunningOutput_TagExcludeMatch(t *testing.T) {
assert.NoError(t, conf.Filter.CompileFilter())
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
ro := NewRunningOutput("test", m, conf, 1000, 10000)
ro.AddMetric(first5[0])
assert.Len(t, m.Metrics(), 0)
@@ -136,7 +191,7 @@ func TestRunningOutput_TagExcludeNoMatch(t *testing.T) {
assert.NoError(t, conf.Filter.CompileFilter())
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
ro := NewRunningOutput("test", m, conf, 1000, 10000)
ro.AddMetric(first5[0])
assert.Len(t, m.Metrics(), 0)
@@ -158,7 +213,7 @@ func TestRunningOutput_TagIncludeMatch(t *testing.T) {
assert.NoError(t, conf.Filter.CompileFilter())
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
ro := NewRunningOutput("test", m, conf, 1000, 10000)
ro.AddMetric(first5[0])
assert.Len(t, m.Metrics(), 0)
@@ -178,7 +233,7 @@ func TestRunningOutputDefault(t *testing.T) {
}
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
ro := NewRunningOutput("test", m, conf, 1000, 10000)
for _, metric := range first5 {
ro.AddMetric(metric)
@@ -193,77 +248,6 @@ func TestRunningOutputDefault(t *testing.T) {
assert.Len(t, m.Metrics(), 10)
}
// Test that the first metrics batch gets overwritten if there is a buffer overflow.
func TestRunningOutputOverwrite(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
IsActive: false,
},
}
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
ro.MetricBatchSize = 1
ro.MetricBufferLimit = 4
for _, metric := range first5 {
ro.AddMetric(metric)
}
require.Len(t, m.Metrics(), 0)
err := ro.Write()
require.NoError(t, err)
require.Len(t, m.Metrics(), 4)
var expected, actual []string
for i, exp := range first5[1:] {
expected = append(expected, exp.String())
actual = append(actual, m.Metrics()[i].String())
}
sort.Strings(expected)
sort.Strings(actual)
assert.Equal(t, expected, actual)
}
// Test that multiple buffer overflows are handled properly.
func TestRunningOutputMultiOverwrite(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
IsActive: false,
},
}
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
ro.MetricBatchSize = 1
ro.MetricBufferLimit = 3
for _, metric := range first5 {
ro.AddMetric(metric)
}
for _, metric := range next5 {
ro.AddMetric(metric)
}
require.Len(t, m.Metrics(), 0)
err := ro.Write()
require.NoError(t, err)
require.Len(t, m.Metrics(), 3)
var expected, actual []string
for i, exp := range next5[2:] {
expected = append(expected, exp.String())
actual = append(actual, m.Metrics()[i].String())
}
sort.Strings(expected)
sort.Strings(actual)
assert.Equal(t, expected, actual)
}
// Test that running output doesn't flush until it's full when
// FlushBufferWhenFull is set.
func TestRunningOutputFlushWhenFull(t *testing.T) {
@@ -274,12 +258,9 @@ func TestRunningOutputFlushWhenFull(t *testing.T) {
}
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
ro.FlushBufferWhenFull = true
ro.MetricBatchSize = 5
ro.MetricBufferLimit = 10
ro := NewRunningOutput("test", m, conf, 6, 10)
// Fill buffer to limit
// Fill buffer to 1 under limit
for _, metric := range first5 {
ro.AddMetric(metric)
}
@@ -289,7 +270,7 @@ func TestRunningOutputFlushWhenFull(t *testing.T) {
// add one more metric
ro.AddMetric(next5[0])
// now it flushed
assert.Len(t, m.Metrics(), 5)
assert.Len(t, m.Metrics(), 6)
// add one more metric and write it manually
ro.AddMetric(next5[1])
@@ -308,10 +289,7 @@ func TestRunningOutputMultiFlushWhenFull(t *testing.T) {
}
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
ro.FlushBufferWhenFull = true
ro.MetricBatchSize = 4
ro.MetricBufferLimit = 12
ro := NewRunningOutput("test", m, conf, 4, 12)
// Fill buffer past limit twive
for _, metric := range first5 {
@@ -333,12 +311,9 @@ func TestRunningOutputWriteFail(t *testing.T) {
m := &mockOutput{}
m.failWrite = true
ro := NewRunningOutput("test", m, conf)
ro.FlushBufferWhenFull = true
ro.MetricBatchSize = 4
ro.MetricBufferLimit = 12
ro := NewRunningOutput("test", m, conf, 4, 12)
// Fill buffer past limit twice
// Fill buffer to limit twice
for _, metric := range first5 {
ro.AddMetric(metric)
}
@@ -361,6 +336,161 @@ func TestRunningOutputWriteFail(t *testing.T) {
assert.Len(t, m.Metrics(), 10)
}
// Verify that the order of points is preserved during a write failure.
func TestRunningOutputWriteFailOrder(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
IsActive: false,
},
}
m := &mockOutput{}
m.failWrite = true
ro := NewRunningOutput("test", m, conf, 100, 1000)
// add 5 metrics
for _, metric := range first5 {
ro.AddMetric(metric)
}
// no successful flush yet
assert.Len(t, m.Metrics(), 0)
// Write fails
err := ro.Write()
require.Error(t, err)
// no successful flush yet
assert.Len(t, m.Metrics(), 0)
m.failWrite = false
// add 5 more metrics
for _, metric := range next5 {
ro.AddMetric(metric)
}
err = ro.Write()
require.NoError(t, err)
// Verify that 10 metrics were written
assert.Len(t, m.Metrics(), 10)
// Verify that they are in order
expected := append(first5, next5...)
assert.Equal(t, expected, m.Metrics())
}
// Verify that the order of points is preserved during many write failures.
func TestRunningOutputWriteFailOrder2(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
IsActive: false,
},
}
m := &mockOutput{}
m.failWrite = true
ro := NewRunningOutput("test", m, conf, 5, 100)
// add 5 metrics
for _, metric := range first5 {
ro.AddMetric(metric)
}
// Write fails
err := ro.Write()
require.Error(t, err)
// no successful flush yet
assert.Len(t, m.Metrics(), 0)
// add 5 metrics
for _, metric := range next5 {
ro.AddMetric(metric)
}
// Write fails
err = ro.Write()
require.Error(t, err)
// no successful flush yet
assert.Len(t, m.Metrics(), 0)
// add 5 metrics
for _, metric := range first5 {
ro.AddMetric(metric)
}
// Write fails
err = ro.Write()
require.Error(t, err)
// no successful flush yet
assert.Len(t, m.Metrics(), 0)
// add 5 metrics
for _, metric := range next5 {
ro.AddMetric(metric)
}
// Write fails
err = ro.Write()
require.Error(t, err)
// no successful flush yet
assert.Len(t, m.Metrics(), 0)
m.failWrite = false
err = ro.Write()
require.NoError(t, err)
// Verify that 10 metrics were written
assert.Len(t, m.Metrics(), 20)
// Verify that they are in order
expected := append(first5, next5...)
expected = append(expected, first5...)
expected = append(expected, next5...)
assert.Equal(t, expected, m.Metrics())
}
// Verify that the order of points is preserved when there is a remainder
// of points for the batch.
//
// ie, with a batch size of 5:
//
// 1 2 3 4 5 6 <-- order, failed points
// 6 1 2 3 4 5 <-- order, after 1st write failure (1 2 3 4 5 was batch)
// 1 2 3 4 5 6 <-- order, after 2nd write failure, (6 was batch)
//
func TestRunningOutputWriteFailOrder3(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
IsActive: false,
},
}
m := &mockOutput{}
m.failWrite = true
ro := NewRunningOutput("test", m, conf, 5, 1000)
// add 5 metrics
for _, metric := range first5 {
ro.AddMetric(metric)
}
// no successful flush yet
assert.Len(t, m.Metrics(), 0)
// Write fails
err := ro.Write()
require.Error(t, err)
// no successful flush yet
assert.Len(t, m.Metrics(), 0)
// add and attempt to write a single metric:
ro.AddMetric(next5[0])
err = ro.Write()
require.Error(t, err)
// unset fail and write metrics
m.failWrite = false
err = ro.Write()
require.NoError(t, err)
// Verify that 6 metrics were written
assert.Len(t, m.Metrics(), 6)
// Verify that they are in order
expected := append(first5, next5[0])
assert.Equal(t, expected, m.Metrics())
}
type mockOutput struct {
sync.Mutex
@@ -408,3 +538,31 @@ func (m *mockOutput) Metrics() []telegraf.Metric {
defer m.Unlock()
return m.metrics
}
type perfOutput struct {
// if true, mock a write failure
failWrite bool
}
func (m *perfOutput) Connect() error {
return nil
}
func (m *perfOutput) Close() error {
return nil
}
func (m *perfOutput) Description() string {
return ""
}
func (m *perfOutput) SampleConfig() string {
return ""
}
func (m *perfOutput) Write(metrics []telegraf.Metric) error {
if m.failWrite {
return fmt.Errorf("Failed Write!")
}
return nil
}