package models import ( "log" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/buffer" "github.com/influxdata/telegraf/metric" ) const ( // Default size of metrics batch size. DEFAULT_METRIC_BATCH_SIZE = 1000 // Default number of metrics kept. It should be a multiple of batch size. DEFAULT_METRIC_BUFFER_LIMIT = 10000 ) // RunningOutput contains the output configuration type RunningOutput struct { Name string Output telegraf.Output Config *OutputConfig Quiet bool MetricBufferLimit int MetricBatchSize int metrics *buffer.Buffer failMetrics *buffer.Buffer } func NewRunningOutput( name string, output telegraf.Output, conf *OutputConfig, batchSize int, bufferLimit int, ) *RunningOutput { if bufferLimit == 0 { bufferLimit = DEFAULT_METRIC_BUFFER_LIMIT } if batchSize == 0 { batchSize = DEFAULT_METRIC_BATCH_SIZE } ro := &RunningOutput{ Name: name, metrics: buffer.NewBuffer(batchSize), failMetrics: buffer.NewBuffer(bufferLimit), Output: output, Config: conf, MetricBufferLimit: bufferLimit, MetricBatchSize: batchSize, } return ro } // AddMetric adds a metric to the output. This function can also write cached // points if FlushBufferWhenFull is true. func (ro *RunningOutput) AddMetric(m telegraf.Metric) { // Filter any tagexclude/taginclude parameters before adding metric if ro.Config.Filter.IsActive() { // In order to filter out tags, we need to create a new metric, since // metrics are immutable once created. name := m.Name() tags := m.Tags() fields := m.Fields() t := m.Time() if ok := ro.Config.Filter.Apply(name, fields, tags); !ok { return } // error is not possible if creating from another metric, so ignore. m, _ = metric.New(name, tags, fields, t) } ro.metrics.Add(m) if ro.metrics.Len() == ro.MetricBatchSize { batch := ro.metrics.Batch(ro.MetricBatchSize) err := ro.write(batch) if err != nil { ro.failMetrics.Add(batch...) } } } // Write writes all cached points to this output. func (ro *RunningOutput) Write() error { if !ro.Quiet { log.Printf("I! Output [%s] buffer fullness: %d / %d metrics. "+ "Total gathered metrics: %d. Total dropped metrics: %d.", ro.Name, ro.failMetrics.Len()+ro.metrics.Len(), ro.MetricBufferLimit, ro.metrics.Total(), ro.metrics.Drops()+ro.failMetrics.Drops()) } var err error if !ro.failMetrics.IsEmpty() { bufLen := ro.failMetrics.Len() // how many batches of failed writes we need to write. nBatches := bufLen/ro.MetricBatchSize + 1 batchSize := ro.MetricBatchSize for i := 0; i < nBatches; i++ { // If it's the last batch, only grab the metrics that have not had // a write attempt already (this is primarily to preserve order). if i == nBatches-1 { batchSize = bufLen % ro.MetricBatchSize } batch := ro.failMetrics.Batch(batchSize) // If we've already failed previous writes, don't bother trying to // write to this output again. We are not exiting the loop just so // that we can rotate the metrics to preserve order. if err == nil { err = ro.write(batch) } if err != nil { ro.failMetrics.Add(batch...) } } } batch := ro.metrics.Batch(ro.MetricBatchSize) // see comment above about not trying to write to an already failed output. // if ro.failMetrics is empty then err will always be nil at this point. if err == nil { err = ro.write(batch) } if err != nil { ro.failMetrics.Add(batch...) return err } return nil } func (ro *RunningOutput) write(metrics []telegraf.Metric) error { if metrics == nil || len(metrics) == 0 { return nil } start := time.Now() err := ro.Output.Write(metrics) elapsed := time.Since(start) if err == nil { if !ro.Quiet { log.Printf("I! Output [%s] wrote batch of %d metrics in %s\n", ro.Name, len(metrics), elapsed) } } return err } // OutputConfig containing name and filter type OutputConfig struct { Name string Filter Filter }