Use separate mutexes for write and aggregation in running_output
This commit is contained in:
parent
94566098cc
commit
1aa969aabc
|
@ -36,8 +36,10 @@ type RunningOutput struct {
|
||||||
metrics *buffer.Buffer
|
metrics *buffer.Buffer
|
||||||
failMetrics *buffer.Buffer
|
failMetrics *buffer.Buffer
|
||||||
|
|
||||||
|
// Guards against concurrent calls to Add, Push, Reset
|
||||||
|
aggMutex sync.Mutex
|
||||||
// Guards against concurrent calls to the Output as described in #3009
|
// Guards against concurrent calls to the Output as described in #3009
|
||||||
sync.Mutex
|
writeMutex sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRunningOutput(
|
func NewRunningOutput(
|
||||||
|
@ -94,8 +96,6 @@ func NewRunningOutput(
|
||||||
// AddMetric adds a metric to the output. This function can also write cached
|
// AddMetric adds a metric to the output. This function can also write cached
|
||||||
// points if FlushBufferWhenFull is true.
|
// points if FlushBufferWhenFull is true.
|
||||||
func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
|
func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
|
||||||
ro.Lock()
|
|
||||||
defer ro.Unlock()
|
|
||||||
|
|
||||||
if m == nil {
|
if m == nil {
|
||||||
return
|
return
|
||||||
|
@ -118,7 +118,9 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if output, ok := ro.Output.(telegraf.AggregatingOutput); ok {
|
if output, ok := ro.Output.(telegraf.AggregatingOutput); ok {
|
||||||
|
ro.aggMutex.Lock()
|
||||||
output.Add(m)
|
output.Add(m)
|
||||||
|
ro.aggMutex.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -135,13 +137,12 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
|
||||||
|
|
||||||
// Write writes all cached points to this output.
|
// Write writes all cached points to this output.
|
||||||
func (ro *RunningOutput) Write() error {
|
func (ro *RunningOutput) Write() error {
|
||||||
ro.Lock()
|
|
||||||
defer ro.Unlock()
|
|
||||||
|
|
||||||
if output, ok := ro.Output.(telegraf.AggregatingOutput); ok {
|
if output, ok := ro.Output.(telegraf.AggregatingOutput); ok {
|
||||||
|
ro.aggMutex.Lock()
|
||||||
metrics := output.Push()
|
metrics := output.Push()
|
||||||
ro.metrics.Add(metrics...)
|
ro.metrics.Add(metrics...)
|
||||||
output.Reset()
|
output.Reset()
|
||||||
|
ro.aggMutex.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
nFails, nMetrics := ro.failMetrics.Len(), ro.metrics.Len()
|
nFails, nMetrics := ro.failMetrics.Len(), ro.metrics.Len()
|
||||||
|
@ -192,6 +193,8 @@ func (ro *RunningOutput) write(metrics []telegraf.Metric) error {
|
||||||
if nMetrics == 0 {
|
if nMetrics == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
ro.writeMutex.Lock()
|
||||||
|
defer ro.writeMutex.Unlock()
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
err := ro.Output.Write(metrics)
|
err := ro.Output.Write(metrics)
|
||||||
elapsed := time.Since(start)
|
elapsed := time.Since(start)
|
||||||
|
|
18
output.go
18
output.go
|
@ -13,9 +13,26 @@ type Output interface {
|
||||||
Write(metrics []Metric) error
|
Write(metrics []Metric) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AggregatingOutput adds aggregating functionality to an Output. May be used
|
||||||
|
// if the Output only accepts a fixed set of aggregations over a time period.
|
||||||
|
// These functions may be called concurrently to the Write function.
|
||||||
type AggregatingOutput interface {
|
type AggregatingOutput interface {
|
||||||
|
// Connect to the Output
|
||||||
|
Connect() error
|
||||||
|
// Close any connections to the Output
|
||||||
|
Close() error
|
||||||
|
// Description returns a one-sentence description on the Output
|
||||||
|
Description() string
|
||||||
|
// SampleConfig returns the default configuration of the Output
|
||||||
|
SampleConfig() string
|
||||||
|
// Write takes in group of points to be written to the Output
|
||||||
|
Write(metrics []Metric) error
|
||||||
|
|
||||||
|
// Add the metric to the aggregator
|
||||||
Add(in Metric)
|
Add(in Metric)
|
||||||
|
// Push returns the aggregated metrics and is called every flush interval.
|
||||||
Push() []Metric
|
Push() []Metric
|
||||||
|
// Reset signals the the aggregator period is completed.
|
||||||
Reset()
|
Reset()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -30,6 +47,7 @@ type ServiceOutput interface {
|
||||||
SampleConfig() string
|
SampleConfig() string
|
||||||
// Write takes in group of points to be written to the Output
|
// Write takes in group of points to be written to the Output
|
||||||
Write(metrics []Metric) error
|
Write(metrics []Metric) error
|
||||||
|
|
||||||
// Start the "service" that will provide an Output
|
// Start the "service" that will provide an Output
|
||||||
Start() error
|
Start() error
|
||||||
// Stop the "service" that will provide an Output
|
// Stop the "service" that will provide an Output
|
||||||
|
|
Loading…
Reference in New Issue