Refactor running_output buffering

This commit is contained in:
Pierre Fersing 2016-04-24 12:43:54 +02:00
parent d3a25e4dc1
commit 91cbcb6cb1
5 changed files with 88 additions and 50 deletions

View File

@ -38,8 +38,12 @@ config.
* **interval**: Default data collection interval for all inputs * **interval**: Default data collection interval for all inputs
* **round_interval**: Rounds collection interval to 'interval' * **round_interval**: Rounds collection interval to 'interval'
ie, if interval="10s" then always collect on :00, :10, :20, etc. ie, if interval="10s" then always collect on :00, :10, :20, etc.
* **metric_batch_size**: Telegraf will send metrics to output in batch of at
most metric_batch_size metrics.
* **metric_buffer_limit**: Telegraf will cache metric_buffer_limit metrics * **metric_buffer_limit**: Telegraf will cache metric_buffer_limit metrics
for each output, and will flush this buffer on a successful write. for each output, and will flush this buffer on a successful write.
This should be a multiple of metric_batch_size and could not be less
than 2 times metric_batch_size.
* **collection_jitter**: Collection jitter is used to jitter * **collection_jitter**: Collection jitter is used to jitter
the collection by a random amount. the collection by a random amount.
Each plugin will sleep for a random time within jitter before collecting. Each plugin will sleep for a random time within jitter before collecting.

View File

@ -30,9 +30,13 @@
## ie, if interval="10s" then always collect on :00, :10, :20, etc. ## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true round_interval = true
## Telegraf will send metrics to output in batch of at
## most metric_batch_size metrics.
metric_batch_size = 1000
## Telegraf will cache metric_buffer_limit metrics for each output, and will ## Telegraf will cache metric_buffer_limit metrics for each output, and will
## flush this buffer on a successful write. ## flush this buffer on a successful write. This should be a multiple of
metric_buffer_limit = 1000 ## metric_batch_size and could not be less than 2 times metric_batch_size
metric_buffer_limit = 10000
## Flush the buffer whenever full, regardless of flush_interval. ## Flush the buffer whenever full, regardless of flush_interval.
flush_buffer_when_full = true flush_buffer_when_full = true

View File

@ -93,9 +93,15 @@ type AgentConfig struct {
// ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s // ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
FlushJitter internal.Duration FlushJitter internal.Duration
// MetricBatchSize is the maximum number of metrics that is wrote to an
// output plugin in one call.
MetricBatchSize int
// MetricBufferLimit is the max number of metrics that each output plugin // MetricBufferLimit is the max number of metrics that each output plugin
// will cache. The buffer is cleared when a successful write occurs. When // will cache. The buffer is cleared when a successful write occurs. When
// full, the oldest metrics will be overwritten. // full, the oldest metrics will be overwritten. This number should be a
// multiple of MetricBatchSize. Due to current implementation, this could
// not be less than 2 times MetricBatchSize.
MetricBufferLimit int MetricBufferLimit int
// FlushBufferWhenFull tells Telegraf to flush the metric buffer whenever // FlushBufferWhenFull tells Telegraf to flush the metric buffer whenever
@ -182,9 +188,13 @@ var header = `# Telegraf Configuration
## ie, if interval="10s" then always collect on :00, :10, :20, etc. ## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true round_interval = true
## Telegraf will send metrics to output in batch of at
## most metric_batch_size metrics.
metric_batch_size = 1000
## Telegraf will cache metric_buffer_limit metrics for each output, and will ## Telegraf will cache metric_buffer_limit metrics for each output, and will
## flush this buffer on a successful write. ## flush this buffer on a successful write. This should be a multiple of
metric_buffer_limit = 1000 ## metric_batch_size and could not be less than 2 times metric_batch_size
metric_buffer_limit = 10000
## Flush the buffer whenever full, regardless of flush_interval. ## Flush the buffer whenever full, regardless of flush_interval.
flush_buffer_when_full = true flush_buffer_when_full = true
@ -526,6 +536,9 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
} }
ro := internal_models.NewRunningOutput(name, output, outputConfig) ro := internal_models.NewRunningOutput(name, output, outputConfig)
if c.Agent.MetricBatchSize > 0 {
ro.MetricBatchSize = c.Agent.MetricBatchSize
}
if c.Agent.MetricBufferLimit > 0 { if c.Agent.MetricBufferLimit > 0 {
ro.MetricBufferLimit = c.Agent.MetricBufferLimit ro.MetricBufferLimit = c.Agent.MetricBufferLimit
} }

View File

@ -9,25 +9,32 @@ import (
) )
const ( const (
// Default number of metrics kept between flushes.
DEFAULT_METRIC_BUFFER_LIMIT = 1000
// Limit how many full metric buffers are kept due to failed writes. // Default size of metrics batch size.
FULL_METRIC_BUFFERS_LIMIT = 100 DEFAULT_METRIC_BATCH_SIZE = 1000
// Default number of metrics kept. It should be a multiple of batch size.
DEFAULT_METRIC_BUFFER_LIMIT = 10000
) )
// tmpmetrics point to batch of metrics ready to be wrote to output.
// readI point to the oldest batch of metrics (the first to sent to output). It
// may point to nil value if tmpmetrics is empty.
// writeI point to the next slot to buffer a batch of metrics is output fail to
// write.
type RunningOutput struct { type RunningOutput struct {
Name string Name string
Output telegraf.Output Output telegraf.Output
Config *OutputConfig Config *OutputConfig
Quiet bool Quiet bool
MetricBufferLimit int MetricBufferLimit int
MetricBatchSize int
FlushBufferWhenFull bool FlushBufferWhenFull bool
metrics []telegraf.Metric metrics []telegraf.Metric
tmpmetrics map[int][]telegraf.Metric tmpmetrics []([]telegraf.Metric)
overwriteI int writeI int
mapI int readI int
sync.Mutex sync.Mutex
} }
@ -40,10 +47,10 @@ func NewRunningOutput(
ro := &RunningOutput{ ro := &RunningOutput{
Name: name, Name: name,
metrics: make([]telegraf.Metric, 0), metrics: make([]telegraf.Metric, 0),
tmpmetrics: make(map[int][]telegraf.Metric),
Output: output, Output: output,
Config: conf, Config: conf,
MetricBufferLimit: DEFAULT_METRIC_BUFFER_LIMIT, MetricBufferLimit: DEFAULT_METRIC_BUFFER_LIMIT,
MetricBatchSize: DEFAULT_METRIC_BATCH_SIZE,
} }
return ro return ro
} }
@ -59,6 +66,17 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
ro.Lock() ro.Lock()
defer ro.Unlock() defer ro.Unlock()
if ro.tmpmetrics == nil {
size := ro.MetricBufferLimit / ro.MetricBatchSize
// ro.metrics already contains one batch
size = size - 1
if size < 1 {
size = 1
}
ro.tmpmetrics = make([]([]telegraf.Metric), size)
}
// Filter any tagexclude/taginclude parameters before adding metric // Filter any tagexclude/taginclude parameters before adding metric
if len(ro.Config.Filter.TagExclude) != 0 || len(ro.Config.Filter.TagInclude) != 0 { if len(ro.Config.Filter.TagExclude) != 0 || len(ro.Config.Filter.TagInclude) != 0 {
// In order to filter out tags, we need to create a new metric, since // In order to filter out tags, we need to create a new metric, since
@ -72,40 +90,32 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
metric, _ = telegraf.NewMetric(name, tags, fields, t) metric, _ = telegraf.NewMetric(name, tags, fields, t)
} }
if len(ro.metrics) < ro.MetricBufferLimit { if len(ro.metrics) < ro.MetricBatchSize {
ro.metrics = append(ro.metrics, metric) ro.metrics = append(ro.metrics, metric)
} else { } else {
flushSuccess := true
if ro.FlushBufferWhenFull { if ro.FlushBufferWhenFull {
ro.metrics = append(ro.metrics, metric) err := ro.write(ro.metrics)
tmpmetrics := make([]telegraf.Metric, len(ro.metrics))
copy(tmpmetrics, ro.metrics)
ro.metrics = make([]telegraf.Metric, 0)
err := ro.write(tmpmetrics)
if err != nil { if err != nil {
log.Printf("ERROR writing full metric buffer to output %s, %s", log.Printf("ERROR writing full metric buffer to output %s, %s",
ro.Name, err) ro.Name, err)
if len(ro.tmpmetrics) == FULL_METRIC_BUFFERS_LIMIT { flushSuccess = false
ro.mapI = 0
// overwrite one
ro.tmpmetrics[ro.mapI] = tmpmetrics
ro.mapI++
} else {
ro.tmpmetrics[ro.mapI] = tmpmetrics
ro.mapI++
}
} }
} else { } else {
if ro.overwriteI == 0 { flushSuccess = false
}
if !flushSuccess {
if ro.tmpmetrics[ro.writeI] != nil && ro.writeI == ro.readI {
log.Printf("WARNING: overwriting cached metrics, you may want to " + log.Printf("WARNING: overwriting cached metrics, you may want to " +
"increase the metric_buffer_limit setting in your [agent] " + "increase the metric_buffer_limit setting in your [agent] " +
"config if you do not wish to overwrite metrics.\n") "config if you do not wish to overwrite metrics.\n")
ro.readI = (ro.readI + 1) % cap(ro.tmpmetrics)
} }
if ro.overwriteI == len(ro.metrics) { ro.tmpmetrics[ro.writeI] = ro.metrics
ro.overwriteI = 0 ro.writeI = (ro.writeI + 1) % cap(ro.tmpmetrics)
}
ro.metrics[ro.overwriteI] = metric
ro.overwriteI++
} }
ro.metrics = make([]telegraf.Metric, 0)
ro.metrics = append(ro.metrics, metric)
} }
} }
@ -113,21 +123,23 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
func (ro *RunningOutput) Write() error { func (ro *RunningOutput) Write() error {
ro.Lock() ro.Lock()
defer ro.Unlock() defer ro.Unlock()
// Write any cached metric buffers before, as those metrics are the
// oldest
for ro.tmpmetrics[ro.readI] != nil {
if err := ro.write(ro.tmpmetrics[ro.readI]); err != nil {
return err
} else {
ro.tmpmetrics[ro.readI] = nil
ro.readI = (ro.readI + 1) % cap(ro.tmpmetrics)
}
}
err := ro.write(ro.metrics) err := ro.write(ro.metrics)
if err != nil { if err != nil {
return err return err
} else { } else {
ro.metrics = make([]telegraf.Metric, 0) ro.metrics = make([]telegraf.Metric, 0)
ro.overwriteI = 0
}
// Write any cached metric buffers that failed previously
for i, tmpmetrics := range ro.tmpmetrics {
if err := ro.write(tmpmetrics); err != nil {
return err
} else {
delete(ro.tmpmetrics, i)
}
} }
return nil return nil

View File

@ -193,7 +193,7 @@ func TestRunningOutputDefault(t *testing.T) {
assert.Len(t, m.Metrics(), 10) assert.Len(t, m.Metrics(), 10)
} }
// Test that the first metric gets overwritten if there is a buffer overflow. // Test that the first metrics batch gets overwritten if there is a buffer overflow.
func TestRunningOutputOverwrite(t *testing.T) { func TestRunningOutputOverwrite(t *testing.T) {
conf := &OutputConfig{ conf := &OutputConfig{
Filter: Filter{ Filter: Filter{
@ -203,6 +203,7 @@ func TestRunningOutputOverwrite(t *testing.T) {
m := &mockOutput{} m := &mockOutput{}
ro := NewRunningOutput("test", m, conf) ro := NewRunningOutput("test", m, conf)
ro.MetricBatchSize = 1
ro.MetricBufferLimit = 4 ro.MetricBufferLimit = 4
for _, metric := range first5 { for _, metric := range first5 {
@ -236,6 +237,7 @@ func TestRunningOutputMultiOverwrite(t *testing.T) {
m := &mockOutput{} m := &mockOutput{}
ro := NewRunningOutput("test", m, conf) ro := NewRunningOutput("test", m, conf)
ro.MetricBatchSize = 1
ro.MetricBufferLimit = 3 ro.MetricBufferLimit = 3
for _, metric := range first5 { for _, metric := range first5 {
@ -274,7 +276,8 @@ func TestRunningOutputFlushWhenFull(t *testing.T) {
m := &mockOutput{} m := &mockOutput{}
ro := NewRunningOutput("test", m, conf) ro := NewRunningOutput("test", m, conf)
ro.FlushBufferWhenFull = true ro.FlushBufferWhenFull = true
ro.MetricBufferLimit = 5 ro.MetricBatchSize = 5
ro.MetricBufferLimit = 10
// Fill buffer to limit // Fill buffer to limit
for _, metric := range first5 { for _, metric := range first5 {
@ -286,7 +289,7 @@ func TestRunningOutputFlushWhenFull(t *testing.T) {
// add one more metric // add one more metric
ro.AddMetric(next5[0]) ro.AddMetric(next5[0])
// now it flushed // now it flushed
assert.Len(t, m.Metrics(), 6) assert.Len(t, m.Metrics(), 5)
// add one more metric and write it manually // add one more metric and write it manually
ro.AddMetric(next5[1]) ro.AddMetric(next5[1])
@ -307,7 +310,8 @@ func TestRunningOutputMultiFlushWhenFull(t *testing.T) {
m := &mockOutput{} m := &mockOutput{}
ro := NewRunningOutput("test", m, conf) ro := NewRunningOutput("test", m, conf)
ro.FlushBufferWhenFull = true ro.FlushBufferWhenFull = true
ro.MetricBufferLimit = 4 ro.MetricBatchSize = 4
ro.MetricBufferLimit = 12
// Fill buffer past limit twive // Fill buffer past limit twive
for _, metric := range first5 { for _, metric := range first5 {
@ -317,7 +321,7 @@ func TestRunningOutputMultiFlushWhenFull(t *testing.T) {
ro.AddMetric(metric) ro.AddMetric(metric)
} }
// flushed twice // flushed twice
assert.Len(t, m.Metrics(), 10) assert.Len(t, m.Metrics(), 8)
} }
func TestRunningOutputWriteFail(t *testing.T) { func TestRunningOutputWriteFail(t *testing.T) {
@ -331,7 +335,8 @@ func TestRunningOutputWriteFail(t *testing.T) {
m.failWrite = true m.failWrite = true
ro := NewRunningOutput("test", m, conf) ro := NewRunningOutput("test", m, conf)
ro.FlushBufferWhenFull = true ro.FlushBufferWhenFull = true
ro.MetricBufferLimit = 4 ro.MetricBatchSize = 4
ro.MetricBufferLimit = 12
// Fill buffer past limit twice // Fill buffer past limit twice
for _, metric := range first5 { for _, metric := range first5 {