diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 9f783f87a..3e4e62adc 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -38,8 +38,12 @@ config. * **interval**: Default data collection interval for all inputs * **round_interval**: Rounds collection interval to 'interval' ie, if interval="10s" then always collect on :00, :10, :20, etc. +* **metric_batch_size**: Telegraf will send metrics to output in batch of at +most metric_batch_size metrics. * **metric_buffer_limit**: Telegraf will cache metric_buffer_limit metrics for each output, and will flush this buffer on a successful write. +This should be a multiple of metric_batch_size and could not be less +than 2 times metric_batch_size. * **collection_jitter**: Collection jitter is used to jitter the collection by a random amount. Each plugin will sleep for a random time within jitter before collecting. diff --git a/etc/telegraf.conf b/etc/telegraf.conf index dc41eaa96..46b422ffa 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -30,9 +30,13 @@ ## ie, if interval="10s" then always collect on :00, :10, :20, etc. round_interval = true + ## Telegraf will send metrics to output in batch of at + ## most metric_batch_size metrics. + metric_batch_size = 1000 ## Telegraf will cache metric_buffer_limit metrics for each output, and will - ## flush this buffer on a successful write. - metric_buffer_limit = 1000 + ## flush this buffer on a successful write. This should be a multiple of + ## metric_batch_size and could not be less than 2 times metric_batch_size + metric_buffer_limit = 10000 ## Flush the buffer whenever full, regardless of flush_interval. flush_buffer_when_full = true diff --git a/internal/config/config.go b/internal/config/config.go index 5d0836964..fcebd24e6 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -93,9 +93,15 @@ type AgentConfig struct { // ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s FlushJitter internal.Duration + // MetricBatchSize is the maximum number of metrics that is wrote to an + // output plugin in one call. + MetricBatchSize int + // MetricBufferLimit is the max number of metrics that each output plugin // will cache. The buffer is cleared when a successful write occurs. When - // full, the oldest metrics will be overwritten. + // full, the oldest metrics will be overwritten. This number should be a + // multiple of MetricBatchSize. Due to current implementation, this could + // not be less than 2 times MetricBatchSize. MetricBufferLimit int // FlushBufferWhenFull tells Telegraf to flush the metric buffer whenever @@ -182,9 +188,13 @@ var header = `# Telegraf Configuration ## ie, if interval="10s" then always collect on :00, :10, :20, etc. round_interval = true + ## Telegraf will send metrics to output in batch of at + ## most metric_batch_size metrics. + metric_batch_size = 1000 ## Telegraf will cache metric_buffer_limit metrics for each output, and will - ## flush this buffer on a successful write. - metric_buffer_limit = 1000 + ## flush this buffer on a successful write. This should be a multiple of + ## metric_batch_size and could not be less than 2 times metric_batch_size + metric_buffer_limit = 10000 ## Flush the buffer whenever full, regardless of flush_interval. flush_buffer_when_full = true @@ -526,6 +536,9 @@ func (c *Config) addOutput(name string, table *ast.Table) error { } ro := internal_models.NewRunningOutput(name, output, outputConfig) + if c.Agent.MetricBatchSize > 0 { + ro.MetricBatchSize = c.Agent.MetricBatchSize + } if c.Agent.MetricBufferLimit > 0 { ro.MetricBufferLimit = c.Agent.MetricBufferLimit } diff --git a/internal/models/running_output.go b/internal/models/running_output.go index c76dffcdf..91b200799 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -9,25 +9,32 @@ import ( ) const ( - // Default number of metrics kept between flushes. - DEFAULT_METRIC_BUFFER_LIMIT = 1000 - // Limit how many full metric buffers are kept due to failed writes. - FULL_METRIC_BUFFERS_LIMIT = 100 + // Default size of metrics batch size. + DEFAULT_METRIC_BATCH_SIZE = 1000 + + // Default number of metrics kept. It should be a multiple of batch size. + DEFAULT_METRIC_BUFFER_LIMIT = 10000 ) +// tmpmetrics point to batch of metrics ready to be wrote to output. +// readI point to the oldest batch of metrics (the first to sent to output). It +// may point to nil value if tmpmetrics is empty. +// writeI point to the next slot to buffer a batch of metrics is output fail to +// write. type RunningOutput struct { Name string Output telegraf.Output Config *OutputConfig Quiet bool MetricBufferLimit int + MetricBatchSize int FlushBufferWhenFull bool metrics []telegraf.Metric - tmpmetrics map[int][]telegraf.Metric - overwriteI int - mapI int + tmpmetrics []([]telegraf.Metric) + writeI int + readI int sync.Mutex } @@ -40,10 +47,10 @@ func NewRunningOutput( ro := &RunningOutput{ Name: name, metrics: make([]telegraf.Metric, 0), - tmpmetrics: make(map[int][]telegraf.Metric), Output: output, Config: conf, MetricBufferLimit: DEFAULT_METRIC_BUFFER_LIMIT, + MetricBatchSize: DEFAULT_METRIC_BATCH_SIZE, } return ro } @@ -59,6 +66,17 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) { ro.Lock() defer ro.Unlock() + if ro.tmpmetrics == nil { + size := ro.MetricBufferLimit / ro.MetricBatchSize + // ro.metrics already contains one batch + size = size - 1 + + if size < 1 { + size = 1 + } + ro.tmpmetrics = make([]([]telegraf.Metric), size) + } + // Filter any tagexclude/taginclude parameters before adding metric if len(ro.Config.Filter.TagExclude) != 0 || len(ro.Config.Filter.TagInclude) != 0 { // In order to filter out tags, we need to create a new metric, since @@ -72,40 +90,32 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) { metric, _ = telegraf.NewMetric(name, tags, fields, t) } - if len(ro.metrics) < ro.MetricBufferLimit { + if len(ro.metrics) < ro.MetricBatchSize { ro.metrics = append(ro.metrics, metric) } else { + flushSuccess := true if ro.FlushBufferWhenFull { - ro.metrics = append(ro.metrics, metric) - tmpmetrics := make([]telegraf.Metric, len(ro.metrics)) - copy(tmpmetrics, ro.metrics) - ro.metrics = make([]telegraf.Metric, 0) - err := ro.write(tmpmetrics) + err := ro.write(ro.metrics) if err != nil { log.Printf("ERROR writing full metric buffer to output %s, %s", ro.Name, err) - if len(ro.tmpmetrics) == FULL_METRIC_BUFFERS_LIMIT { - ro.mapI = 0 - // overwrite one - ro.tmpmetrics[ro.mapI] = tmpmetrics - ro.mapI++ - } else { - ro.tmpmetrics[ro.mapI] = tmpmetrics - ro.mapI++ - } + flushSuccess = false } } else { - if ro.overwriteI == 0 { + flushSuccess = false + } + if !flushSuccess { + if ro.tmpmetrics[ro.writeI] != nil && ro.writeI == ro.readI { log.Printf("WARNING: overwriting cached metrics, you may want to " + "increase the metric_buffer_limit setting in your [agent] " + "config if you do not wish to overwrite metrics.\n") + ro.readI = (ro.readI + 1) % cap(ro.tmpmetrics) } - if ro.overwriteI == len(ro.metrics) { - ro.overwriteI = 0 - } - ro.metrics[ro.overwriteI] = metric - ro.overwriteI++ + ro.tmpmetrics[ro.writeI] = ro.metrics + ro.writeI = (ro.writeI + 1) % cap(ro.tmpmetrics) } + ro.metrics = make([]telegraf.Metric, 0) + ro.metrics = append(ro.metrics, metric) } } @@ -113,21 +123,23 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) { func (ro *RunningOutput) Write() error { ro.Lock() defer ro.Unlock() + + // Write any cached metric buffers before, as those metrics are the + // oldest + for ro.tmpmetrics[ro.readI] != nil { + if err := ro.write(ro.tmpmetrics[ro.readI]); err != nil { + return err + } else { + ro.tmpmetrics[ro.readI] = nil + ro.readI = (ro.readI + 1) % cap(ro.tmpmetrics) + } + } + err := ro.write(ro.metrics) if err != nil { return err } else { ro.metrics = make([]telegraf.Metric, 0) - ro.overwriteI = 0 - } - - // Write any cached metric buffers that failed previously - for i, tmpmetrics := range ro.tmpmetrics { - if err := ro.write(tmpmetrics); err != nil { - return err - } else { - delete(ro.tmpmetrics, i) - } } return nil diff --git a/internal/models/running_output_test.go b/internal/models/running_output_test.go index 9607f2417..ca7034b61 100644 --- a/internal/models/running_output_test.go +++ b/internal/models/running_output_test.go @@ -193,7 +193,7 @@ func TestRunningOutputDefault(t *testing.T) { assert.Len(t, m.Metrics(), 10) } -// Test that the first metric gets overwritten if there is a buffer overflow. +// Test that the first metrics batch gets overwritten if there is a buffer overflow. func TestRunningOutputOverwrite(t *testing.T) { conf := &OutputConfig{ Filter: Filter{ @@ -203,6 +203,7 @@ func TestRunningOutputOverwrite(t *testing.T) { m := &mockOutput{} ro := NewRunningOutput("test", m, conf) + ro.MetricBatchSize = 1 ro.MetricBufferLimit = 4 for _, metric := range first5 { @@ -236,6 +237,7 @@ func TestRunningOutputMultiOverwrite(t *testing.T) { m := &mockOutput{} ro := NewRunningOutput("test", m, conf) + ro.MetricBatchSize = 1 ro.MetricBufferLimit = 3 for _, metric := range first5 { @@ -274,7 +276,8 @@ func TestRunningOutputFlushWhenFull(t *testing.T) { m := &mockOutput{} ro := NewRunningOutput("test", m, conf) ro.FlushBufferWhenFull = true - ro.MetricBufferLimit = 5 + ro.MetricBatchSize = 5 + ro.MetricBufferLimit = 10 // Fill buffer to limit for _, metric := range first5 { @@ -286,7 +289,7 @@ func TestRunningOutputFlushWhenFull(t *testing.T) { // add one more metric ro.AddMetric(next5[0]) // now it flushed - assert.Len(t, m.Metrics(), 6) + assert.Len(t, m.Metrics(), 5) // add one more metric and write it manually ro.AddMetric(next5[1]) @@ -307,7 +310,8 @@ func TestRunningOutputMultiFlushWhenFull(t *testing.T) { m := &mockOutput{} ro := NewRunningOutput("test", m, conf) ro.FlushBufferWhenFull = true - ro.MetricBufferLimit = 4 + ro.MetricBatchSize = 4 + ro.MetricBufferLimit = 12 // Fill buffer past limit twive for _, metric := range first5 { @@ -317,7 +321,7 @@ func TestRunningOutputMultiFlushWhenFull(t *testing.T) { ro.AddMetric(metric) } // flushed twice - assert.Len(t, m.Metrics(), 10) + assert.Len(t, m.Metrics(), 8) } func TestRunningOutputWriteFail(t *testing.T) { @@ -331,7 +335,8 @@ func TestRunningOutputWriteFail(t *testing.T) { m.failWrite = true ro := NewRunningOutput("test", m, conf) ro.FlushBufferWhenFull = true - ro.MetricBufferLimit = 4 + ro.MetricBatchSize = 4 + ro.MetricBufferLimit = 12 // Fill buffer past limit twice for _, metric := range first5 {