diff --git a/internal/models/running_output.go b/internal/models/running_output.go index c926917d6..014202454 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -94,6 +94,9 @@ func NewRunningOutput( // AddMetric adds a metric to the output. This function can also write cached // points if FlushBufferWhenFull is true. func (ro *RunningOutput) AddMetric(m telegraf.Metric) { + ro.Lock() + defer ro.Unlock() + if m == nil { return } @@ -115,8 +118,6 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) { } if output, ok := ro.Output.(telegraf.AggregatingOutput); ok { - ro.Lock() - defer ro.Unlock() output.Add(m) return } @@ -134,6 +135,9 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) { // Write writes all cached points to this output. func (ro *RunningOutput) Write() error { + ro.Lock() + defer ro.Unlock() + if output, ok := ro.Output.(telegraf.AggregatingOutput); ok { metrics := output.Push() ro.metrics.Add(metrics...) @@ -188,8 +192,6 @@ func (ro *RunningOutput) write(metrics []telegraf.Metric) error { if nMetrics == 0 { return nil } - ro.Lock() - defer ro.Unlock() start := time.Now() err := ro.Output.Write(metrics) elapsed := time.Since(start)