Add support for per output flush jitter (#6603)
This commit is contained in:
parent
2a8735d1c6
commit
2156a6242e
|
@ -503,6 +503,12 @@ func (a *Agent) runOutputs(
|
||||||
interval = output.Config.FlushInterval
|
interval = output.Config.FlushInterval
|
||||||
}
|
}
|
||||||
|
|
||||||
|
jitter := jitter
|
||||||
|
// Overwrite agent flush_jitter if this plugin has its own.
|
||||||
|
if output.Config.FlushJitter != nil {
|
||||||
|
jitter = *output.Config.FlushJitter
|
||||||
|
}
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(output *models.RunningOutput) {
|
go func(output *models.RunningOutput) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
|
@ -127,9 +127,11 @@ The agent table configures Telegraf and the defaults used across all plugins.
|
||||||
flush_interval + flush_jitter.
|
flush_interval + flush_jitter.
|
||||||
|
|
||||||
- **flush_jitter**:
|
- **flush_jitter**:
|
||||||
Jitter the flush [interval][] by a random amount. This is primarily to avoid
|
Default flush jitter for all outputs. This jitters the flush [interval][]
|
||||||
large write spikes for users running a large number of telegraf instances.
|
by a random amount. This is primarily to avoid large write spikes for users
|
||||||
ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s.
|
running a large number of telegraf instances. ie, a jitter of 5s and interval
|
||||||
|
10s means flushes will happen every 10-15s.
|
||||||
|
|
||||||
|
|
||||||
- **precision**:
|
- **precision**:
|
||||||
Collected metrics are rounded to the precision specified as an [interval][].
|
Collected metrics are rounded to the precision specified as an [interval][].
|
||||||
|
@ -260,6 +262,8 @@ Parameters that can be used with any output plugin:
|
||||||
- **alias**: Name an instance of a plugin.
|
- **alias**: Name an instance of a plugin.
|
||||||
- **flush_interval**: The maximum time between flushes. Use this setting to
|
- **flush_interval**: The maximum time between flushes. Use this setting to
|
||||||
override the agent `flush_interval` on a per plugin basis.
|
override the agent `flush_interval` on a per plugin basis.
|
||||||
|
- **flush_jitter**: The amount of time to jitter the flush interval. Use this
|
||||||
|
setting to override the agent `flush_jitter` on a per plugin basis.
|
||||||
- **metric_batch_size**: The maximum number of metrics to send at once. Use
|
- **metric_batch_size**: The maximum number of metrics to send at once. Use
|
||||||
this setting to override the agent `metric_batch_size` on a per plugin basis.
|
this setting to override the agent `metric_batch_size` on a per plugin basis.
|
||||||
- **metric_buffer_limit**: The maximum number of unsent metrics to buffer.
|
- **metric_buffer_limit**: The maximum number of unsent metrics to buffer.
|
||||||
|
@ -275,6 +279,7 @@ Override flush parameters for a single output:
|
||||||
```toml
|
```toml
|
||||||
[agent]
|
[agent]
|
||||||
flush_interval = "10s"
|
flush_interval = "10s"
|
||||||
|
flush_jitter = "5s"
|
||||||
metric_batch_size = 1000
|
metric_batch_size = 1000
|
||||||
|
|
||||||
[[outputs.influxdb]]
|
[[outputs.influxdb]]
|
||||||
|
@ -284,6 +289,7 @@ Override flush parameters for a single output:
|
||||||
[[outputs.file]]
|
[[outputs.file]]
|
||||||
files = [ "stdout" ]
|
files = [ "stdout" ]
|
||||||
flush_interval = "1s"
|
flush_interval = "1s"
|
||||||
|
flush_jitter = "1s"
|
||||||
metric_batch_size = 10
|
metric_batch_size = 10
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
|
@ -2026,6 +2026,19 @@ func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if node, ok := tbl.Fields["flush_jitter"]; ok {
|
||||||
|
if kv, ok := node.(*ast.KeyValue); ok {
|
||||||
|
if str, ok := kv.Value.(*ast.String); ok {
|
||||||
|
dur, err := time.ParseDuration(str.Value)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
oc.FlushJitter = new(time.Duration)
|
||||||
|
*oc.FlushJitter = dur
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if node, ok := tbl.Fields["metric_buffer_limit"]; ok {
|
if node, ok := tbl.Fields["metric_buffer_limit"]; ok {
|
||||||
if kv, ok := node.(*ast.KeyValue); ok {
|
if kv, ok := node.(*ast.KeyValue); ok {
|
||||||
if integer, ok := kv.Value.(*ast.Integer); ok {
|
if integer, ok := kv.Value.(*ast.Integer); ok {
|
||||||
|
@ -2059,6 +2072,7 @@ func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(tbl.Fields, "flush_interval")
|
delete(tbl.Fields, "flush_interval")
|
||||||
|
delete(tbl.Fields, "flush_jitter")
|
||||||
delete(tbl.Fields, "metric_buffer_limit")
|
delete(tbl.Fields, "metric_buffer_limit")
|
||||||
delete(tbl.Fields, "metric_batch_size")
|
delete(tbl.Fields, "metric_batch_size")
|
||||||
delete(tbl.Fields, "alias")
|
delete(tbl.Fields, "alias")
|
||||||
|
|
|
@ -24,6 +24,7 @@ type OutputConfig struct {
|
||||||
Filter Filter
|
Filter Filter
|
||||||
|
|
||||||
FlushInterval time.Duration
|
FlushInterval time.Duration
|
||||||
|
FlushJitter *time.Duration
|
||||||
MetricBufferLimit int
|
MetricBufferLimit int
|
||||||
MetricBatchSize int
|
MetricBatchSize int
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue