From 9e77bfc3edcedc67bbab7b66ed978ba9d085db08 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 11 Jul 2018 17:33:27 -0700 Subject: [PATCH] Fix potential deadlock by not calling AddMetric concurrently (#4404) --- agent/agent.go | 100 +++++++++++++++++++++++++------------------------ 1 file changed, 52 insertions(+), 48 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 6eb9505e2..6f7b540f2 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -241,14 +241,12 @@ func (a *Agent) flush() { } // flusher monitors the metrics input channel and flushes on the minimum interval -func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric, aggC chan telegraf.Metric) error { - // Inelegant, but this sleep is to allow the Gather threads to run, so that - // the flusher will flush after metrics are collected. - time.Sleep(time.Millisecond * 300) - - // create an output metric channel and a gorouting that continuously passes - // each metric onto the output plugins & aggregators. - outMetricC := make(chan telegraf.Metric, 100) +func (a *Agent) flusher( + shutdown chan struct{}, + metricC chan telegraf.Metric, + aggMetricC chan telegraf.Metric, + outMetricC chan telegraf.Metric, +) error { var wg sync.WaitGroup wg.Add(1) go func() { @@ -257,56 +255,67 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric, ag select { case <-shutdown: if len(outMetricC) > 0 { - // keep going until outMetricC is flushed + // keep going until channel is empty continue } return - case m := <-outMetricC: - // if dropOriginal is set to true, then we will only send this - // metric to the aggregators, not the outputs. - var dropOriginal bool - for _, agg := range a.Config.Aggregators { - if ok := agg.Add(m.Copy()); ok { - dropOriginal = true - } - } - if !dropOriginal { - for i, o := range a.Config.Outputs { - if i == len(a.Config.Outputs)-1 { - o.AddMetric(m) - } else { - o.AddMetric(m.Copy()) - } + case metric := <-outMetricC: + for i, o := range a.Config.Outputs { + if i == len(a.Config.Outputs)-1 { + o.AddMetric(metric) + } else { + o.AddMetric(metric.Copy()) } } } } }() + wg.Add(1) + go func() { + defer wg.Done() + for metric := range aggMetricC { + // Apply Processors + metrics := []telegraf.Metric{metric} + for _, processor := range a.Config.Processors { + metrics = processor.Apply(metrics...) + } + outMetricC <- metric + } + }() + wg.Add(1) go func() { defer wg.Done() for { select { case <-shutdown: - if len(aggC) > 0 { - // keep going until aggC is flushed + if len(metricC) > 0 { + // keep going until channel is empty continue } + close(aggMetricC) return - case metric := <-aggC: + case metric := <-metricC: + // Apply Processors metrics := []telegraf.Metric{metric} for _, processor := range a.Config.Processors { metrics = processor.Apply(metrics...) } - for _, m := range metrics { - for i, o := range a.Config.Outputs { - if i == len(a.Config.Outputs)-1 { - o.AddMetric(m) - } else { - o.AddMetric(m.Copy()) + + for _, metric := range metrics { + // Apply Aggregators + var dropOriginal bool + for _, agg := range a.Config.Aggregators { + if ok := agg.Add(metric.Copy()); ok { + dropOriginal = true } } + + // Forward metric to Outputs + if !dropOriginal { + outMetricC <- metric + } } } } @@ -335,16 +344,6 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric, ag " already a flush ongoing.") } }() - case metric := <-metricC: - // NOTE potential bottleneck here as we put each metric through the - // processors serially. - mS := []telegraf.Metric{metric} - for _, processor := range a.Config.Processors { - mS = processor.Apply(mS...) - } - for _, m := range mS { - outMetricC <- m - } } } } @@ -358,9 +357,14 @@ func (a *Agent) Run(shutdown chan struct{}) error { a.Config.Agent.Interval.Duration, a.Config.Agent.Quiet, a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration) - // channel shared between all input threads for accumulating metrics + // Channel shared between all input threads for accumulating metrics metricC := make(chan telegraf.Metric, 100) - aggC := make(chan telegraf.Metric, 100) + + // Channel for metrics ready to be output + outMetricC := make(chan telegraf.Metric, 100) + + // Channel for aggregated metrics + aggMetricC := make(chan telegraf.Metric, 100) // Round collection to nearest interval by sleeping if a.Config.Agent.RoundInterval { @@ -371,7 +375,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { wg.Add(1) go func() { defer wg.Done() - if err := a.flusher(shutdown, metricC, aggC); err != nil { + if err := a.flusher(shutdown, metricC, aggMetricC, outMetricC); err != nil { log.Printf("E! Flusher routine failed, exiting: %s\n", err.Error()) close(shutdown) } @@ -381,7 +385,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { for _, aggregator := range a.Config.Aggregators { go func(agg *models.RunningAggregator) { defer wg.Done() - acc := NewAccumulator(agg, aggC) + acc := NewAccumulator(agg, aggMetricC) acc.SetPrecision(a.Config.Agent.Precision.Duration, a.Config.Agent.Interval.Duration) agg.Run(acc, shutdown)