From 1aa969aabcbd4c446fccc12558446bf7346fad04 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 12 Sep 2018 00:23:50 -0700 Subject: [PATCH] Use separate mutexes for write and aggregation in running_output --- internal/models/running_output.go | 15 +++++++++------ output.go | 18 ++++++++++++++++++ 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/internal/models/running_output.go b/internal/models/running_output.go index 014202454..bad1f7659 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -36,8 +36,10 @@ type RunningOutput struct { metrics *buffer.Buffer failMetrics *buffer.Buffer + // Guards against concurrent calls to Add, Push, Reset + aggMutex sync.Mutex // Guards against concurrent calls to the Output as described in #3009 - sync.Mutex + writeMutex sync.Mutex } func NewRunningOutput( @@ -94,8 +96,6 @@ func NewRunningOutput( // AddMetric adds a metric to the output. This function can also write cached // points if FlushBufferWhenFull is true. func (ro *RunningOutput) AddMetric(m telegraf.Metric) { - ro.Lock() - defer ro.Unlock() if m == nil { return @@ -118,7 +118,9 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) { } if output, ok := ro.Output.(telegraf.AggregatingOutput); ok { + ro.aggMutex.Lock() output.Add(m) + ro.aggMutex.Unlock() return } @@ -135,13 +137,12 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) { // Write writes all cached points to this output. func (ro *RunningOutput) Write() error { - ro.Lock() - defer ro.Unlock() - if output, ok := ro.Output.(telegraf.AggregatingOutput); ok { + ro.aggMutex.Lock() metrics := output.Push() ro.metrics.Add(metrics...) output.Reset() + ro.aggMutex.Unlock() } nFails, nMetrics := ro.failMetrics.Len(), ro.metrics.Len() @@ -192,6 +193,8 @@ func (ro *RunningOutput) write(metrics []telegraf.Metric) error { if nMetrics == 0 { return nil } + ro.writeMutex.Lock() + defer ro.writeMutex.Unlock() start := time.Now() err := ro.Output.Write(metrics) elapsed := time.Since(start) diff --git a/output.go b/output.go index 39b371ac4..2421048f0 100644 --- a/output.go +++ b/output.go @@ -13,9 +13,26 @@ type Output interface { Write(metrics []Metric) error } +// AggregatingOutput adds aggregating functionality to an Output. May be used +// if the Output only accepts a fixed set of aggregations over a time period. +// These functions may be called concurrently to the Write function. type AggregatingOutput interface { + // Connect to the Output + Connect() error + // Close any connections to the Output + Close() error + // Description returns a one-sentence description on the Output + Description() string + // SampleConfig returns the default configuration of the Output + SampleConfig() string + // Write takes in group of points to be written to the Output + Write(metrics []Metric) error + + // Add the metric to the aggregator Add(in Metric) + // Push returns the aggregated metrics and is called every flush interval. Push() []Metric + // Reset signals the the aggregator period is completed. Reset() } @@ -30,6 +47,7 @@ type ServiceOutput interface { SampleConfig() string // Write takes in group of points to be written to the Output Write(metrics []Metric) error + // Start the "service" that will provide an Output Start() error // Stop the "service" that will provide an Output