package models import ( "log" "sync" "sync/atomic" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/selfstat" ) const ( // Default size of metrics batch size. DEFAULT_METRIC_BATCH_SIZE = 1000 // Default number of metrics kept. It should be a multiple of batch size. DEFAULT_METRIC_BUFFER_LIMIT = 10000 ) // OutputConfig containing name and filter type OutputConfig struct { Name string Filter Filter FlushInterval time.Duration MetricBufferLimit int MetricBatchSize int } // RunningOutput contains the output configuration type RunningOutput struct { // Must be 64-bit aligned newMetricsCount int64 droppedMetrics int64 Name string Output telegraf.Output Config *OutputConfig MetricBufferLimit int MetricBatchSize int MetricsFiltered selfstat.Stat WriteTime selfstat.Stat BatchReady chan time.Time buffer *Buffer aggMutex sync.Mutex } func NewRunningOutput( name string, output telegraf.Output, conf *OutputConfig, batchSize int, bufferLimit int, ) *RunningOutput { if conf.MetricBufferLimit > 0 { bufferLimit = conf.MetricBufferLimit } if bufferLimit == 0 { bufferLimit = DEFAULT_METRIC_BUFFER_LIMIT } if conf.MetricBatchSize > 0 { batchSize = conf.MetricBatchSize } if batchSize == 0 { batchSize = DEFAULT_METRIC_BATCH_SIZE } ro := &RunningOutput{ Name: name, buffer: NewBuffer(name, bufferLimit), BatchReady: make(chan time.Time, 1), Output: output, Config: conf, MetricBufferLimit: bufferLimit, MetricBatchSize: batchSize, MetricsFiltered: selfstat.Register( "write", "metrics_filtered", map[string]string{"output": name}, ), WriteTime: selfstat.RegisterTiming( "write", "write_time_ns", map[string]string{"output": name}, ), } return ro } func (ro *RunningOutput) metricFiltered(metric telegraf.Metric) { ro.MetricsFiltered.Incr(1) metric.Drop() } func (ro *RunningOutput) Init() error { if p, ok := ro.Output.(telegraf.Initializer); ok { err := p.Init() if err != nil { return err } } return nil } // AddMetric adds a metric to the output. // // Takes ownership of metric func (ro *RunningOutput) AddMetric(metric telegraf.Metric) { if ok := ro.Config.Filter.Select(metric); !ok { ro.metricFiltered(metric) return } ro.Config.Filter.Modify(metric) if len(metric.FieldList()) == 0 { ro.metricFiltered(metric) return } if output, ok := ro.Output.(telegraf.AggregatingOutput); ok { ro.aggMutex.Lock() output.Add(metric) ro.aggMutex.Unlock() return } dropped := ro.buffer.Add(metric) atomic.AddInt64(&ro.droppedMetrics, int64(dropped)) count := atomic.AddInt64(&ro.newMetricsCount, 1) if count == int64(ro.MetricBatchSize) { atomic.StoreInt64(&ro.newMetricsCount, 0) select { case ro.BatchReady <- time.Now(): default: } } } // Write writes all metrics to the output, stopping when all have been sent on // or error. func (ro *RunningOutput) Write() error { if output, ok := ro.Output.(telegraf.AggregatingOutput); ok { ro.aggMutex.Lock() metrics := output.Push() ro.buffer.Add(metrics...) output.Reset() ro.aggMutex.Unlock() } atomic.StoreInt64(&ro.newMetricsCount, 0) // Only process the metrics in the buffer now. Metrics added while we are // writing will be sent on the next call. nBuffer := ro.buffer.Len() nBatches := nBuffer/ro.MetricBatchSize + 1 for i := 0; i < nBatches; i++ { batch := ro.buffer.Batch(ro.MetricBatchSize) if len(batch) == 0 { break } err := ro.write(batch) if err != nil { ro.buffer.Reject(batch) return err } ro.buffer.Accept(batch) } return nil } // WriteBatch writes a single batch of metrics to the output. func (ro *RunningOutput) WriteBatch() error { batch := ro.buffer.Batch(ro.MetricBatchSize) if len(batch) == 0 { return nil } err := ro.write(batch) if err != nil { ro.buffer.Reject(batch) return err } ro.buffer.Accept(batch) return nil } func (ro *RunningOutput) Close() { err := ro.Output.Close() if err != nil { log.Printf("E! [outputs.%s] Error closing output: %v", ro.Name, err) } } func (ro *RunningOutput) write(metrics []telegraf.Metric) error { dropped := atomic.LoadInt64(&ro.droppedMetrics) if dropped > 0 { log.Printf("W! [outputs.%s] Metric buffer overflow; %d metrics have been dropped", ro.Name, dropped) atomic.StoreInt64(&ro.droppedMetrics, 0) } start := time.Now() err := ro.Output.Write(metrics) elapsed := time.Since(start) ro.WriteTime.Incr(elapsed.Nanoseconds()) if err == nil { log.Printf("D! [outputs.%s] wrote batch of %d metrics in %s\n", ro.Name, len(metrics), elapsed) } return err } func (ro *RunningOutput) LogBufferStatus() { nBuffer := ro.buffer.Len() log.Printf("D! [outputs.%s] buffer fullness: %d / %d metrics. ", ro.Name, nBuffer, ro.MetricBufferLimit) }