Compare commits
1 Commits
master
...
aggregatin
Author | SHA1 | Date |
---|---|---|
Daniel Nelson | 4f61d2a09c |
|
@ -113,6 +113,11 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
|
||||||
m, _ = metric.New(name, tags, fields, t)
|
m, _ = metric.New(name, tags, fields, t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if output, ok := ro.Output.(telegraf.AggregatingOutput); ok {
|
||||||
|
output.Add(m)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
ro.metrics.Add(m)
|
ro.metrics.Add(m)
|
||||||
if ro.metrics.Len() == ro.MetricBatchSize {
|
if ro.metrics.Len() == ro.MetricBatchSize {
|
||||||
batch := ro.metrics.Batch(ro.MetricBatchSize)
|
batch := ro.metrics.Batch(ro.MetricBatchSize)
|
||||||
|
@ -125,6 +130,12 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
|
||||||
|
|
||||||
// Write writes all cached points to this output.
|
// Write writes all cached points to this output.
|
||||||
func (ro *RunningOutput) Write() error {
|
func (ro *RunningOutput) Write() error {
|
||||||
|
if output, ok := ro.Output.(telegraf.AggregatingOutput); ok {
|
||||||
|
metrics := output.Push()
|
||||||
|
ro.metrics.Add(metrics...)
|
||||||
|
output.Reset()
|
||||||
|
}
|
||||||
|
|
||||||
nFails, nMetrics := ro.failMetrics.Len(), ro.metrics.Len()
|
nFails, nMetrics := ro.failMetrics.Len(), ro.metrics.Len()
|
||||||
ro.BufferSize.Set(int64(nFails + nMetrics))
|
ro.BufferSize.Set(int64(nFails + nMetrics))
|
||||||
log.Printf("D! Output [%s] buffer fullness: %d / %d metrics. ",
|
log.Printf("D! Output [%s] buffer fullness: %d / %d metrics. ",
|
||||||
|
|
|
@ -13,6 +13,12 @@ type Output interface {
|
||||||
Write(metrics []Metric) error
|
Write(metrics []Metric) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type AggregatingOutput interface {
|
||||||
|
Add(in Metric)
|
||||||
|
Push() []Metric
|
||||||
|
Reset()
|
||||||
|
}
|
||||||
|
|
||||||
type ServiceOutput interface {
|
type ServiceOutput interface {
|
||||||
// Connect to the Output
|
// Connect to the Output
|
||||||
Connect() error
|
Connect() error
|
||||||
|
|
Loading…
Reference in New Issue