Fix potential deadlock by not calling AddMetric concurrently (#4404)

This commit is contained in:
Daniel Nelson 2018-07-11 17:33:27 -07:00 committed by GitHub
parent fb7c1d775b
commit 9e77bfc3ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 52 additions and 48 deletions

View File

@ -241,14 +241,12 @@ func (a *Agent) flush() {
}
// flusher monitors the metrics input channel and flushes on the minimum interval
func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric, aggC chan telegraf.Metric) error {
// Inelegant, but this sleep is to allow the Gather threads to run, so that
// the flusher will flush after metrics are collected.
time.Sleep(time.Millisecond * 300)
// create an output metric channel and a gorouting that continuously passes
// each metric onto the output plugins & aggregators.
outMetricC := make(chan telegraf.Metric, 100)
func (a *Agent) flusher(
shutdown chan struct{},
metricC chan telegraf.Metric,
aggMetricC chan telegraf.Metric,
outMetricC chan telegraf.Metric,
) error {
var wg sync.WaitGroup
wg.Add(1)
go func() {
@ -257,26 +255,16 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric, ag
select {
case <-shutdown:
if len(outMetricC) > 0 {
// keep going until outMetricC is flushed
// keep going until channel is empty
continue
}
return
case m := <-outMetricC:
// if dropOriginal is set to true, then we will only send this
// metric to the aggregators, not the outputs.
var dropOriginal bool
for _, agg := range a.Config.Aggregators {
if ok := agg.Add(m.Copy()); ok {
dropOriginal = true
}
}
if !dropOriginal {
case metric := <-outMetricC:
for i, o := range a.Config.Outputs {
if i == len(a.Config.Outputs)-1 {
o.AddMetric(m)
o.AddMetric(metric)
} else {
o.AddMetric(m.Copy())
}
o.AddMetric(metric.Copy())
}
}
}
@ -286,26 +274,47 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric, ag
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-shutdown:
if len(aggC) > 0 {
// keep going until aggC is flushed
continue
}
return
case metric := <-aggC:
for metric := range aggMetricC {
// Apply Processors
metrics := []telegraf.Metric{metric}
for _, processor := range a.Config.Processors {
metrics = processor.Apply(metrics...)
}
for _, m := range metrics {
for i, o := range a.Config.Outputs {
if i == len(a.Config.Outputs)-1 {
o.AddMetric(m)
} else {
o.AddMetric(m.Copy())
outMetricC <- metric
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-shutdown:
if len(metricC) > 0 {
// keep going until channel is empty
continue
}
close(aggMetricC)
return
case metric := <-metricC:
// Apply Processors
metrics := []telegraf.Metric{metric}
for _, processor := range a.Config.Processors {
metrics = processor.Apply(metrics...)
}
for _, metric := range metrics {
// Apply Aggregators
var dropOriginal bool
for _, agg := range a.Config.Aggregators {
if ok := agg.Add(metric.Copy()); ok {
dropOriginal = true
}
}
// Forward metric to Outputs
if !dropOriginal {
outMetricC <- metric
}
}
}
@ -335,16 +344,6 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric, ag
" already a flush ongoing.")
}
}()
case metric := <-metricC:
// NOTE potential bottleneck here as we put each metric through the
// processors serially.
mS := []telegraf.Metric{metric}
for _, processor := range a.Config.Processors {
mS = processor.Apply(mS...)
}
for _, m := range mS {
outMetricC <- m
}
}
}
}
@ -358,9 +357,14 @@ func (a *Agent) Run(shutdown chan struct{}) error {
a.Config.Agent.Interval.Duration, a.Config.Agent.Quiet,
a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration)
// channel shared between all input threads for accumulating metrics
// Channel shared between all input threads for accumulating metrics
metricC := make(chan telegraf.Metric, 100)
aggC := make(chan telegraf.Metric, 100)
// Channel for metrics ready to be output
outMetricC := make(chan telegraf.Metric, 100)
// Channel for aggregated metrics
aggMetricC := make(chan telegraf.Metric, 100)
// Round collection to nearest interval by sleeping
if a.Config.Agent.RoundInterval {
@ -371,7 +375,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
wg.Add(1)
go func() {
defer wg.Done()
if err := a.flusher(shutdown, metricC, aggC); err != nil {
if err := a.flusher(shutdown, metricC, aggMetricC, outMetricC); err != nil {
log.Printf("E! Flusher routine failed, exiting: %s\n", err.Error())
close(shutdown)
}
@ -381,7 +385,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
for _, aggregator := range a.Config.Aggregators {
go func(agg *models.RunningAggregator) {
defer wg.Done()
acc := NewAccumulator(agg, aggC)
acc := NewAccumulator(agg, aggMetricC)
acc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration)
agg.Run(acc, shutdown)