diff --git a/agent/agent.go b/agent/agent.go index e82caf148..b6db56c5b 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -247,7 +247,7 @@ func (a *Agent) flush() { } // flusher monitors the metrics input channel and flushes on the minimum interval -func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) error { +func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric, aggC chan telegraf.Metric) error { // Inelegant, but this sleep is to allow the Gather threads to run, so that // the flusher will flush after metrics are collected. time.Sleep(time.Millisecond * 300) @@ -291,6 +291,29 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er } }() + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-shutdown: + if len(aggC) > 0 { + // keep going until aggC is flushed + continue + } + return + case metric := <-aggC: + metrics := []telegraf.Metric{metric} + for _, processor := range a.Config.Processors { + metrics = processor.Apply(metrics...) + } + for _, m := range metrics { + outMetricC <- m + } + } + } + }() + ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration) semaphore := make(chan struct{}, 1) for { @@ -339,6 +362,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { // channel shared between all input threads for accumulating metrics metricC := make(chan telegraf.Metric, 100) + aggC := make(chan telegraf.Metric, 100) // Start all ServicePlugins for _, input := range a.Config.Inputs { @@ -367,7 +391,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { wg.Add(1) go func() { defer wg.Done() - if err := a.flusher(shutdown, metricC); err != nil { + if err := a.flusher(shutdown, metricC, aggC); err != nil { log.Printf("E! Flusher routine failed, exiting: %s\n", err.Error()) close(shutdown) } @@ -377,7 +401,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { for _, aggregator := range a.Config.Aggregators { go func(agg *models.RunningAggregator) { defer wg.Done() - acc := NewAccumulator(agg, metricC) + acc := NewAccumulator(agg, aggC) acc.SetPrecision(a.Config.Agent.Precision.Duration, a.Config.Agent.Interval.Duration) agg.Run(acc, shutdown) diff --git a/internal/models/running_processor.go b/internal/models/running_processor.go index 600b61928..92d3d44d0 100644 --- a/internal/models/running_processor.go +++ b/internal/models/running_processor.go @@ -1,11 +1,15 @@ package models import ( + "sync" + "github.com/influxdata/telegraf" ) type RunningProcessor struct { - Name string + Name string + + sync.Mutex Processor telegraf.Processor Config *ProcessorConfig } @@ -24,6 +28,9 @@ type ProcessorConfig struct { } func (rp *RunningProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric { + rp.Lock() + defer rp.Unlock() + ret := []telegraf.Metric{} for _, metric := range in {