Prevent possible deadlock when using aggregators (#3016)

Looping the metrics back through the same channel could result in a
deadlock, by using a new channel and locking the processor we can ensure
that all stages can make continual progress.
This commit is contained in:
Daniel Nelson 2017-07-13 15:34:21 -07:00 committed by GitHub
parent 88037c8a2c
commit 8567dfe7b1
2 changed files with 35 additions and 4 deletions

View File

@ -247,7 +247,7 @@ func (a *Agent) flush() {
} }
// flusher monitors the metrics input channel and flushes on the minimum interval // flusher monitors the metrics input channel and flushes on the minimum interval
func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) error { func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric, aggC chan telegraf.Metric) error {
// Inelegant, but this sleep is to allow the Gather threads to run, so that // Inelegant, but this sleep is to allow the Gather threads to run, so that
// the flusher will flush after metrics are collected. // the flusher will flush after metrics are collected.
time.Sleep(time.Millisecond * 300) time.Sleep(time.Millisecond * 300)
@ -291,6 +291,29 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
} }
}() }()
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-shutdown:
if len(aggC) > 0 {
// keep going until aggC is flushed
continue
}
return
case metric := <-aggC:
metrics := []telegraf.Metric{metric}
for _, processor := range a.Config.Processors {
metrics = processor.Apply(metrics...)
}
for _, m := range metrics {
outMetricC <- m
}
}
}
}()
ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration) ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration)
semaphore := make(chan struct{}, 1) semaphore := make(chan struct{}, 1)
for { for {
@ -339,6 +362,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
// channel shared between all input threads for accumulating metrics // channel shared between all input threads for accumulating metrics
metricC := make(chan telegraf.Metric, 100) metricC := make(chan telegraf.Metric, 100)
aggC := make(chan telegraf.Metric, 100)
// Start all ServicePlugins // Start all ServicePlugins
for _, input := range a.Config.Inputs { for _, input := range a.Config.Inputs {
@ -367,7 +391,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
if err := a.flusher(shutdown, metricC); err != nil { if err := a.flusher(shutdown, metricC, aggC); err != nil {
log.Printf("E! Flusher routine failed, exiting: %s\n", err.Error()) log.Printf("E! Flusher routine failed, exiting: %s\n", err.Error())
close(shutdown) close(shutdown)
} }
@ -377,7 +401,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
for _, aggregator := range a.Config.Aggregators { for _, aggregator := range a.Config.Aggregators {
go func(agg *models.RunningAggregator) { go func(agg *models.RunningAggregator) {
defer wg.Done() defer wg.Done()
acc := NewAccumulator(agg, metricC) acc := NewAccumulator(agg, aggC)
acc.SetPrecision(a.Config.Agent.Precision.Duration, acc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration) a.Config.Agent.Interval.Duration)
agg.Run(acc, shutdown) agg.Run(acc, shutdown)

View File

@ -1,11 +1,15 @@
package models package models
import ( import (
"sync"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
) )
type RunningProcessor struct { type RunningProcessor struct {
Name string Name string
sync.Mutex
Processor telegraf.Processor Processor telegraf.Processor
Config *ProcessorConfig Config *ProcessorConfig
} }
@ -24,6 +28,9 @@ type ProcessorConfig struct {
} }
func (rp *RunningProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric { func (rp *RunningProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {
rp.Lock()
defer rp.Unlock()
ret := []telegraf.Metric{} ret := []telegraf.Metric{}
for _, metric := range in { for _, metric := range in {