Fix locking if output is an AggregatingOutput

This commit is contained in:
Daniel Nelson 2018-09-10 15:14:14 -07:00
parent 3618f5dc98
commit 23a8498963
No known key found for this signature in database
GPG Key ID: CAAD59C9444F6155
1 changed files with 6 additions and 4 deletions

View File

@ -94,6 +94,9 @@ func NewRunningOutput(
// AddMetric adds a metric to the output. This function can also write cached // AddMetric adds a metric to the output. This function can also write cached
// points if FlushBufferWhenFull is true. // points if FlushBufferWhenFull is true.
func (ro *RunningOutput) AddMetric(m telegraf.Metric) { func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
ro.Lock()
defer ro.Unlock()
if m == nil { if m == nil {
return return
} }
@ -115,8 +118,6 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
} }
if output, ok := ro.Output.(telegraf.AggregatingOutput); ok { if output, ok := ro.Output.(telegraf.AggregatingOutput); ok {
ro.Lock()
defer ro.Unlock()
output.Add(m) output.Add(m)
return return
} }
@ -134,6 +135,9 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
// Write writes all cached points to this output. // Write writes all cached points to this output.
func (ro *RunningOutput) Write() error { func (ro *RunningOutput) Write() error {
ro.Lock()
defer ro.Unlock()
if output, ok := ro.Output.(telegraf.AggregatingOutput); ok { if output, ok := ro.Output.(telegraf.AggregatingOutput); ok {
metrics := output.Push() metrics := output.Push()
ro.metrics.Add(metrics...) ro.metrics.Add(metrics...)
@ -188,8 +192,6 @@ func (ro *RunningOutput) write(metrics []telegraf.Metric) error {
if nMetrics == 0 { if nMetrics == 0 {
return nil return nil
} }
ro.Lock()
defer ro.Unlock()
start := time.Now() start := time.Now()
err := ro.Output.Write(metrics) err := ro.Output.Write(metrics)
elapsed := time.Since(start) elapsed := time.Since(start)