parent
							
								
									07728d7425
								
							
						
					
					
						commit
						c159460b2c
					
				|  | @ -38,8 +38,12 @@ config. | |||
| * **interval**: Default data collection interval for all inputs | ||||
| * **round_interval**: Rounds collection interval to 'interval' | ||||
| ie, if interval="10s" then always collect on :00, :10, :20, etc. | ||||
| * **metric_batch_size**: Telegraf will send metrics to output in batch of at | ||||
| most metric_batch_size metrics. | ||||
| * **metric_buffer_limit**: Telegraf will cache metric_buffer_limit metrics | ||||
| for each output, and will flush this buffer on a successful write. | ||||
| This should be a multiple of metric_batch_size and could not be less | ||||
| than 2 times metric_batch_size. | ||||
| * **collection_jitter**: Collection jitter is used to jitter | ||||
| the collection by a random amount. | ||||
| Each plugin will sleep for a random time within jitter before collecting. | ||||
|  |  | |||
|  | @ -30,9 +30,13 @@ | |||
|   ## ie, if interval="10s" then always collect on :00, :10, :20, etc. | ||||
|   round_interval = true | ||||
| 
 | ||||
|   ## Telegraf will send metrics to output in batch of at | ||||
|   ## most metric_batch_size metrics. | ||||
|   metric_batch_size = 1000 | ||||
|   ## Telegraf will cache metric_buffer_limit metrics for each output, and will | ||||
|   ## flush this buffer on a successful write. | ||||
|   metric_buffer_limit = 1000 | ||||
|   ## flush this buffer on a successful write. This should be a multiple of | ||||
|   ## metric_batch_size and could not be less than 2 times metric_batch_size | ||||
|   metric_buffer_limit = 10000 | ||||
|   ## Flush the buffer whenever full, regardless of flush_interval. | ||||
|   flush_buffer_when_full = true | ||||
| 
 | ||||
|  |  | |||
|  | @ -93,9 +93,15 @@ type AgentConfig struct { | |||
| 	// ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
 | ||||
| 	FlushJitter internal.Duration | ||||
| 
 | ||||
| 	// MetricBatchSize is the maximum number of metrics that is wrote to an
 | ||||
| 	// output plugin in one call.
 | ||||
| 	MetricBatchSize int | ||||
| 
 | ||||
| 	// MetricBufferLimit is the max number of metrics that each output plugin
 | ||||
| 	// will cache. The buffer is cleared when a successful write occurs. When
 | ||||
| 	// full, the oldest metrics will be overwritten.
 | ||||
| 	// full, the oldest metrics will be overwritten. This number should be a
 | ||||
| 	// multiple of MetricBatchSize. Due to current implementation, this could
 | ||||
| 	// not be less than 2 times MetricBatchSize.
 | ||||
| 	MetricBufferLimit int | ||||
| 
 | ||||
| 	// FlushBufferWhenFull tells Telegraf to flush the metric buffer whenever
 | ||||
|  | @ -182,9 +188,13 @@ var header = `# Telegraf Configuration | |||
|   ## ie, if interval="10s" then always collect on :00, :10, :20, etc. | ||||
|   round_interval = true | ||||
| 
 | ||||
|   ## Telegraf will send metrics to output in batch of at | ||||
|   ## most metric_batch_size metrics. | ||||
|   metric_batch_size = 1000 | ||||
|   ## Telegraf will cache metric_buffer_limit metrics for each output, and will | ||||
|   ## flush this buffer on a successful write. | ||||
|   metric_buffer_limit = 1000 | ||||
|   ## flush this buffer on a successful write. This should be a multiple of | ||||
|   ## metric_batch_size and could not be less than 2 times metric_batch_size | ||||
|   metric_buffer_limit = 10000 | ||||
|   ## Flush the buffer whenever full, regardless of flush_interval. | ||||
|   flush_buffer_when_full = true | ||||
| 
 | ||||
|  | @ -526,6 +536,9 @@ func (c *Config) addOutput(name string, table *ast.Table) error { | |||
| 	} | ||||
| 
 | ||||
| 	ro := internal_models.NewRunningOutput(name, output, outputConfig) | ||||
| 	if c.Agent.MetricBatchSize > 0 { | ||||
| 		ro.MetricBatchSize = c.Agent.MetricBatchSize | ||||
| 	} | ||||
| 	if c.Agent.MetricBufferLimit > 0 { | ||||
| 		ro.MetricBufferLimit = c.Agent.MetricBufferLimit | ||||
| 	} | ||||
|  |  | |||
|  | @ -9,25 +9,32 @@ import ( | |||
| ) | ||||
| 
 | ||||
| const ( | ||||
| 	// Default number of metrics kept between flushes.
 | ||||
| 	DEFAULT_METRIC_BUFFER_LIMIT = 1000 | ||||
| 
 | ||||
| 	// Limit how many full metric buffers are kept due to failed writes.
 | ||||
| 	FULL_METRIC_BUFFERS_LIMIT = 100 | ||||
| 	// Default size of metrics batch size.
 | ||||
| 	DEFAULT_METRIC_BATCH_SIZE = 1000 | ||||
| 
 | ||||
| 	// Default number of metrics kept. It should be a multiple of batch size.
 | ||||
| 	DEFAULT_METRIC_BUFFER_LIMIT = 10000 | ||||
| ) | ||||
| 
 | ||||
| // tmpmetrics point to batch of metrics ready to be wrote to output.
 | ||||
| // readI point to the oldest batch of metrics (the first to sent to output). It
 | ||||
| // may point to nil value if tmpmetrics is empty.
 | ||||
| // writeI point to the next slot to buffer a batch of metrics is output fail to
 | ||||
| // write.
 | ||||
| type RunningOutput struct { | ||||
| 	Name                string | ||||
| 	Output              telegraf.Output | ||||
| 	Config              *OutputConfig | ||||
| 	Quiet               bool | ||||
| 	MetricBufferLimit   int | ||||
| 	MetricBatchSize     int | ||||
| 	FlushBufferWhenFull bool | ||||
| 
 | ||||
| 	metrics    []telegraf.Metric | ||||
| 	tmpmetrics map[int][]telegraf.Metric | ||||
| 	overwriteI int | ||||
| 	mapI       int | ||||
| 	tmpmetrics []([]telegraf.Metric) | ||||
| 	writeI     int | ||||
| 	readI      int | ||||
| 
 | ||||
| 	sync.Mutex | ||||
| } | ||||
|  | @ -40,10 +47,10 @@ func NewRunningOutput( | |||
| 	ro := &RunningOutput{ | ||||
| 		Name:              name, | ||||
| 		metrics:           make([]telegraf.Metric, 0), | ||||
| 		tmpmetrics:        make(map[int][]telegraf.Metric), | ||||
| 		Output:            output, | ||||
| 		Config:            conf, | ||||
| 		MetricBufferLimit: DEFAULT_METRIC_BUFFER_LIMIT, | ||||
| 		MetricBatchSize:   DEFAULT_METRIC_BATCH_SIZE, | ||||
| 	} | ||||
| 	return ro | ||||
| } | ||||
|  | @ -59,6 +66,17 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) { | |||
| 	ro.Lock() | ||||
| 	defer ro.Unlock() | ||||
| 
 | ||||
| 	if ro.tmpmetrics == nil { | ||||
| 		size := ro.MetricBufferLimit / ro.MetricBatchSize | ||||
| 		// ro.metrics already contains one batch
 | ||||
| 		size = size - 1 | ||||
| 
 | ||||
| 		if size < 1 { | ||||
| 			size = 1 | ||||
| 		} | ||||
| 		ro.tmpmetrics = make([]([]telegraf.Metric), size) | ||||
| 	} | ||||
| 
 | ||||
| 	// Filter any tagexclude/taginclude parameters before adding metric
 | ||||
| 	if len(ro.Config.Filter.TagExclude) != 0 || len(ro.Config.Filter.TagInclude) != 0 { | ||||
| 		// In order to filter out tags, we need to create a new metric, since
 | ||||
|  | @ -72,40 +90,32 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) { | |||
| 		metric, _ = telegraf.NewMetric(name, tags, fields, t) | ||||
| 	} | ||||
| 
 | ||||
| 	if len(ro.metrics) < ro.MetricBufferLimit { | ||||
| 	if len(ro.metrics) < ro.MetricBatchSize { | ||||
| 		ro.metrics = append(ro.metrics, metric) | ||||
| 	} else { | ||||
| 		flushSuccess := true | ||||
| 		if ro.FlushBufferWhenFull { | ||||
| 			ro.metrics = append(ro.metrics, metric) | ||||
| 			tmpmetrics := make([]telegraf.Metric, len(ro.metrics)) | ||||
| 			copy(tmpmetrics, ro.metrics) | ||||
| 			ro.metrics = make([]telegraf.Metric, 0) | ||||
| 			err := ro.write(tmpmetrics) | ||||
| 			err := ro.write(ro.metrics) | ||||
| 			if err != nil { | ||||
| 				log.Printf("ERROR writing full metric buffer to output %s, %s", | ||||
| 					ro.Name, err) | ||||
| 				if len(ro.tmpmetrics) == FULL_METRIC_BUFFERS_LIMIT { | ||||
| 					ro.mapI = 0 | ||||
| 					// overwrite one
 | ||||
| 					ro.tmpmetrics[ro.mapI] = tmpmetrics | ||||
| 					ro.mapI++ | ||||
| 				} else { | ||||
| 					ro.tmpmetrics[ro.mapI] = tmpmetrics | ||||
| 					ro.mapI++ | ||||
| 				} | ||||
| 				flushSuccess = false | ||||
| 			} | ||||
| 		} else { | ||||
| 			if ro.overwriteI == 0 { | ||||
| 			flushSuccess = false | ||||
| 		} | ||||
| 		if !flushSuccess { | ||||
| 			if ro.tmpmetrics[ro.writeI] != nil && ro.writeI == ro.readI { | ||||
| 				log.Printf("WARNING: overwriting cached metrics, you may want to " + | ||||
| 					"increase the metric_buffer_limit setting in your [agent] " + | ||||
| 					"config if you do not wish to overwrite metrics.\n") | ||||
| 				ro.readI = (ro.readI + 1) % cap(ro.tmpmetrics) | ||||
| 			} | ||||
| 			if ro.overwriteI == len(ro.metrics) { | ||||
| 				ro.overwriteI = 0 | ||||
| 			} | ||||
| 			ro.metrics[ro.overwriteI] = metric | ||||
| 			ro.overwriteI++ | ||||
| 			ro.tmpmetrics[ro.writeI] = ro.metrics | ||||
| 			ro.writeI = (ro.writeI + 1) % cap(ro.tmpmetrics) | ||||
| 		} | ||||
| 		ro.metrics = make([]telegraf.Metric, 0) | ||||
| 		ro.metrics = append(ro.metrics, metric) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
|  | @ -113,21 +123,23 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) { | |||
| func (ro *RunningOutput) Write() error { | ||||
| 	ro.Lock() | ||||
| 	defer ro.Unlock() | ||||
| 
 | ||||
| 	// Write any cached metric buffers before, as those metrics are the
 | ||||
| 	// oldest
 | ||||
| 	for ro.tmpmetrics[ro.readI] != nil { | ||||
| 		if err := ro.write(ro.tmpmetrics[ro.readI]); err != nil { | ||||
| 			return err | ||||
| 		} else { | ||||
| 			ro.tmpmetrics[ro.readI] = nil | ||||
| 			ro.readI = (ro.readI + 1) % cap(ro.tmpmetrics) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	err := ro.write(ro.metrics) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} else { | ||||
| 		ro.metrics = make([]telegraf.Metric, 0) | ||||
| 		ro.overwriteI = 0 | ||||
| 	} | ||||
| 
 | ||||
| 	// Write any cached metric buffers that failed previously
 | ||||
| 	for i, tmpmetrics := range ro.tmpmetrics { | ||||
| 		if err := ro.write(tmpmetrics); err != nil { | ||||
| 			return err | ||||
| 		} else { | ||||
| 			delete(ro.tmpmetrics, i) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
|  |  | |||
|  | @ -193,7 +193,7 @@ func TestRunningOutputDefault(t *testing.T) { | |||
| 	assert.Len(t, m.Metrics(), 10) | ||||
| } | ||||
| 
 | ||||
| // Test that the first metric gets overwritten if there is a buffer overflow.
 | ||||
| // Test that the first metrics batch gets overwritten if there is a buffer overflow.
 | ||||
| func TestRunningOutputOverwrite(t *testing.T) { | ||||
| 	conf := &OutputConfig{ | ||||
| 		Filter: Filter{ | ||||
|  | @ -203,6 +203,7 @@ func TestRunningOutputOverwrite(t *testing.T) { | |||
| 
 | ||||
| 	m := &mockOutput{} | ||||
| 	ro := NewRunningOutput("test", m, conf) | ||||
| 	ro.MetricBatchSize = 1 | ||||
| 	ro.MetricBufferLimit = 4 | ||||
| 
 | ||||
| 	for _, metric := range first5 { | ||||
|  | @ -236,6 +237,7 @@ func TestRunningOutputMultiOverwrite(t *testing.T) { | |||
| 
 | ||||
| 	m := &mockOutput{} | ||||
| 	ro := NewRunningOutput("test", m, conf) | ||||
| 	ro.MetricBatchSize = 1 | ||||
| 	ro.MetricBufferLimit = 3 | ||||
| 
 | ||||
| 	for _, metric := range first5 { | ||||
|  | @ -274,7 +276,8 @@ func TestRunningOutputFlushWhenFull(t *testing.T) { | |||
| 	m := &mockOutput{} | ||||
| 	ro := NewRunningOutput("test", m, conf) | ||||
| 	ro.FlushBufferWhenFull = true | ||||
| 	ro.MetricBufferLimit = 5 | ||||
| 	ro.MetricBatchSize = 5 | ||||
| 	ro.MetricBufferLimit = 10 | ||||
| 
 | ||||
| 	// Fill buffer to limit
 | ||||
| 	for _, metric := range first5 { | ||||
|  | @ -286,7 +289,7 @@ func TestRunningOutputFlushWhenFull(t *testing.T) { | |||
| 	// add one more metric
 | ||||
| 	ro.AddMetric(next5[0]) | ||||
| 	// now it flushed
 | ||||
| 	assert.Len(t, m.Metrics(), 6) | ||||
| 	assert.Len(t, m.Metrics(), 5) | ||||
| 
 | ||||
| 	// add one more metric and write it manually
 | ||||
| 	ro.AddMetric(next5[1]) | ||||
|  | @ -307,7 +310,8 @@ func TestRunningOutputMultiFlushWhenFull(t *testing.T) { | |||
| 	m := &mockOutput{} | ||||
| 	ro := NewRunningOutput("test", m, conf) | ||||
| 	ro.FlushBufferWhenFull = true | ||||
| 	ro.MetricBufferLimit = 4 | ||||
| 	ro.MetricBatchSize = 4 | ||||
| 	ro.MetricBufferLimit = 12 | ||||
| 
 | ||||
| 	// Fill buffer past limit twive
 | ||||
| 	for _, metric := range first5 { | ||||
|  | @ -317,7 +321,7 @@ func TestRunningOutputMultiFlushWhenFull(t *testing.T) { | |||
| 		ro.AddMetric(metric) | ||||
| 	} | ||||
| 	// flushed twice
 | ||||
| 	assert.Len(t, m.Metrics(), 10) | ||||
| 	assert.Len(t, m.Metrics(), 8) | ||||
| } | ||||
| 
 | ||||
| func TestRunningOutputWriteFail(t *testing.T) { | ||||
|  | @ -331,7 +335,8 @@ func TestRunningOutputWriteFail(t *testing.T) { | |||
| 	m.failWrite = true | ||||
| 	ro := NewRunningOutput("test", m, conf) | ||||
| 	ro.FlushBufferWhenFull = true | ||||
| 	ro.MetricBufferLimit = 4 | ||||
| 	ro.MetricBatchSize = 4 | ||||
| 	ro.MetricBufferLimit = 12 | ||||
| 
 | ||||
| 	// Fill buffer past limit twice
 | ||||
| 	for _, metric := range first5 { | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue