Prevent possible deadlock when using aggregators (#3016)
Looping the metrics back through the same channel could result in a deadlock, by using a new channel and locking the processor we can ensure that all stages can make continual progress.
This commit is contained in:
		
							parent
							
								
									d9d1ca5a46
								
							
						
					
					
						commit
						b165ce4cd5
					
				|  | @ -247,7 +247,7 @@ func (a *Agent) flush() { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // flusher monitors the metrics input channel and flushes on the minimum interval
 | // flusher monitors the metrics input channel and flushes on the minimum interval
 | ||||||
| func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) error { | func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric, aggC chan telegraf.Metric) error { | ||||||
| 	// Inelegant, but this sleep is to allow the Gather threads to run, so that
 | 	// Inelegant, but this sleep is to allow the Gather threads to run, so that
 | ||||||
| 	// the flusher will flush after metrics are collected.
 | 	// the flusher will flush after metrics are collected.
 | ||||||
| 	time.Sleep(time.Millisecond * 300) | 	time.Sleep(time.Millisecond * 300) | ||||||
|  | @ -291,6 +291,29 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er | ||||||
| 		} | 		} | ||||||
| 	}() | 	}() | ||||||
| 
 | 
 | ||||||
|  | 	wg.Add(1) | ||||||
|  | 	go func() { | ||||||
|  | 		defer wg.Done() | ||||||
|  | 		for { | ||||||
|  | 			select { | ||||||
|  | 			case <-shutdown: | ||||||
|  | 				if len(aggC) > 0 { | ||||||
|  | 					// keep going until aggC is flushed
 | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
|  | 				return | ||||||
|  | 			case metric := <-aggC: | ||||||
|  | 				metrics := []telegraf.Metric{metric} | ||||||
|  | 				for _, processor := range a.Config.Processors { | ||||||
|  | 					metrics = processor.Apply(metrics...) | ||||||
|  | 				} | ||||||
|  | 				for _, m := range metrics { | ||||||
|  | 					outMetricC <- m | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
| 	ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration) | 	ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration) | ||||||
| 	semaphore := make(chan struct{}, 1) | 	semaphore := make(chan struct{}, 1) | ||||||
| 	for { | 	for { | ||||||
|  | @ -339,6 +362,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { | ||||||
| 
 | 
 | ||||||
| 	// channel shared between all input threads for accumulating metrics
 | 	// channel shared between all input threads for accumulating metrics
 | ||||||
| 	metricC := make(chan telegraf.Metric, 100) | 	metricC := make(chan telegraf.Metric, 100) | ||||||
|  | 	aggC := make(chan telegraf.Metric, 100) | ||||||
| 
 | 
 | ||||||
| 	// Start all ServicePlugins
 | 	// Start all ServicePlugins
 | ||||||
| 	for _, input := range a.Config.Inputs { | 	for _, input := range a.Config.Inputs { | ||||||
|  | @ -367,7 +391,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { | ||||||
| 	wg.Add(1) | 	wg.Add(1) | ||||||
| 	go func() { | 	go func() { | ||||||
| 		defer wg.Done() | 		defer wg.Done() | ||||||
| 		if err := a.flusher(shutdown, metricC); err != nil { | 		if err := a.flusher(shutdown, metricC, aggC); err != nil { | ||||||
| 			log.Printf("E! Flusher routine failed, exiting: %s\n", err.Error()) | 			log.Printf("E! Flusher routine failed, exiting: %s\n", err.Error()) | ||||||
| 			close(shutdown) | 			close(shutdown) | ||||||
| 		} | 		} | ||||||
|  | @ -377,7 +401,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { | ||||||
| 	for _, aggregator := range a.Config.Aggregators { | 	for _, aggregator := range a.Config.Aggregators { | ||||||
| 		go func(agg *models.RunningAggregator) { | 		go func(agg *models.RunningAggregator) { | ||||||
| 			defer wg.Done() | 			defer wg.Done() | ||||||
| 			acc := NewAccumulator(agg, metricC) | 			acc := NewAccumulator(agg, aggC) | ||||||
| 			acc.SetPrecision(a.Config.Agent.Precision.Duration, | 			acc.SetPrecision(a.Config.Agent.Precision.Duration, | ||||||
| 				a.Config.Agent.Interval.Duration) | 				a.Config.Agent.Interval.Duration) | ||||||
| 			agg.Run(acc, shutdown) | 			agg.Run(acc, shutdown) | ||||||
|  |  | ||||||
|  | @ -1,11 +1,15 @@ | ||||||
| package models | package models | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
|  | 	"sync" | ||||||
|  | 
 | ||||||
| 	"github.com/influxdata/telegraf" | 	"github.com/influxdata/telegraf" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| type RunningProcessor struct { | type RunningProcessor struct { | ||||||
| 	Name      string | 	Name string | ||||||
|  | 
 | ||||||
|  | 	sync.Mutex | ||||||
| 	Processor telegraf.Processor | 	Processor telegraf.Processor | ||||||
| 	Config    *ProcessorConfig | 	Config    *ProcessorConfig | ||||||
| } | } | ||||||
|  | @ -24,6 +28,9 @@ type ProcessorConfig struct { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (rp *RunningProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric { | func (rp *RunningProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric { | ||||||
|  | 	rp.Lock() | ||||||
|  | 	defer rp.Unlock() | ||||||
|  | 
 | ||||||
| 	ret := []telegraf.Metric{} | 	ret := []telegraf.Metric{} | ||||||
| 
 | 
 | ||||||
| 	for _, metric := range in { | 	for _, metric := range in { | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue