Add final aggregator (#5820)

This commit is contained in:
Daniel Nelson
2019-05-15 14:46:28 -07:00
committed by GitHub
parent d645e0303f
commit a724bf487f
7 changed files with 402 additions and 4 deletions

View File

@@ -2,6 +2,7 @@ package all
import (
_ "github.com/influxdata/telegraf/plugins/aggregators/basicstats"
_ "github.com/influxdata/telegraf/plugins/aggregators/final"
_ "github.com/influxdata/telegraf/plugins/aggregators/histogram"
_ "github.com/influxdata/telegraf/plugins/aggregators/minmax"
_ "github.com/influxdata/telegraf/plugins/aggregators/valuecounter"

View File

@@ -0,0 +1,48 @@
# Final Aggregator Plugin
The final aggregator emits the last metric of a contiguous series. A
contiguous series is defined as a series which receives updates within the
time period in `series_timeout`. The contiguous series may be longer than the
time interval defined by `period`.
This is useful for getting the final value for data sources that produce
discrete time series such as procstat, cgroup, kubernetes etc.
When a series has not been updated within the time defined in
`series_timeout`, the last metric is emitted with the `_final` appended.
### Configuration
```toml
[[aggregators.final]]
## The period on which to flush & clear the aggregator.
period = "30s"
## If true, the original metric will be dropped by the
## aggregator and will not get sent to the output plugins.
drop_original = false
## The time that a series is not updated until considering it final.
series_timeout = "5m"
```
### Metrics
Measurement and tags are unchanged, fields are emitted with the suffix
`_final`.
### Example Output
```
counter,host=bar i_final=3,j_final=6 1554281635115090133
counter,host=foo i_final=3,j_final=6 1554281635112992012
```
Original input:
```
counter,host=bar i=1,j=4 1554281633101153300
counter,host=foo i=1,j=4 1554281633099323601
counter,host=bar i=2,j=5 1554281634107980073
counter,host=foo i=2,j=5 1554281634105931116
counter,host=bar i=3,j=6 1554281635115090133
counter,host=foo i=3,j=6 1554281635112992012
```

View File

@@ -0,0 +1,72 @@
package final
import (
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/aggregators"
)
var sampleConfig = `
## The period on which to flush & clear the aggregator.
period = "30s"
## If true, the original metric will be dropped by the
## aggregator and will not get sent to the output plugins.
drop_original = false
## The time that a series is not updated until considering it final.
series_timeout = "5m"
`
type Final struct {
SeriesTimeout internal.Duration `toml:"series_timeout"`
// The last metric for all series which are active
metricCache map[uint64]telegraf.Metric
}
func NewFinal() *Final {
return &Final{
SeriesTimeout: internal.Duration{Duration: 5 * time.Minute},
metricCache: make(map[uint64]telegraf.Metric),
}
}
func (m *Final) SampleConfig() string {
return sampleConfig
}
func (m *Final) Description() string {
return "Report the final metric of a series"
}
func (m *Final) Add(in telegraf.Metric) {
id := in.HashID()
m.metricCache[id] = in
}
func (m *Final) Push(acc telegraf.Accumulator) {
// Preserve timestamp of original metric
acc.SetPrecision(time.Nanosecond)
for id, metric := range m.metricCache {
if time.Since(metric.Time()) > m.SeriesTimeout.Duration {
fields := map[string]interface{}{}
for _, field := range metric.FieldList() {
fields[field.Key+"_final"] = field.Value
}
acc.AddFields(metric.Name(), fields, metric.Tags(), metric.Time())
delete(m.metricCache, id)
}
}
}
func (m *Final) Reset() {
}
func init() {
aggregators.Add("final", func() telegraf.Aggregator {
return NewFinal()
})
}

View File

@@ -0,0 +1,144 @@
package final
import (
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
)
func TestSimple(t *testing.T) {
acc := testutil.Accumulator{}
final := NewFinal()
tags := map[string]string{"foo": "bar"}
m1, _ := metric.New("m1",
tags,
map[string]interface{}{"a": int64(1)},
time.Unix(1530939936, 0))
m2, _ := metric.New("m1",
tags,
map[string]interface{}{"a": int64(2)},
time.Unix(1530939937, 0))
m3, _ := metric.New("m1",
tags,
map[string]interface{}{"a": int64(3)},
time.Unix(1530939938, 0))
final.Add(m1)
final.Add(m2)
final.Add(m3)
final.Push(&acc)
expected := []telegraf.Metric{
testutil.MustMetric(
"m1",
tags,
map[string]interface{}{
"a_final": 3,
},
time.Unix(1530939938, 0),
),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
}
func TestTwoTags(t *testing.T) {
acc := testutil.Accumulator{}
final := NewFinal()
tags1 := map[string]string{"foo": "bar"}
tags2 := map[string]string{"foo": "baz"}
m1, _ := metric.New("m1",
tags1,
map[string]interface{}{"a": int64(1)},
time.Unix(1530939936, 0))
m2, _ := metric.New("m1",
tags2,
map[string]interface{}{"a": int64(2)},
time.Unix(1530939937, 0))
m3, _ := metric.New("m1",
tags1,
map[string]interface{}{"a": int64(3)},
time.Unix(1530939938, 0))
final.Add(m1)
final.Add(m2)
final.Add(m3)
final.Push(&acc)
expected := []telegraf.Metric{
testutil.MustMetric(
"m1",
tags2,
map[string]interface{}{
"a_final": 2,
},
time.Unix(1530939937, 0),
),
testutil.MustMetric(
"m1",
tags1,
map[string]interface{}{
"a_final": 3,
},
time.Unix(1530939938, 0),
),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.SortMetrics())
}
func TestLongDifference(t *testing.T) {
acc := testutil.Accumulator{}
final := NewFinal()
final.SeriesTimeout = internal.Duration{Duration: 30 * time.Second}
tags := map[string]string{"foo": "bar"}
now := time.Now()
m1, _ := metric.New("m",
tags,
map[string]interface{}{"a": int64(1)},
now.Add(time.Second*-290))
m2, _ := metric.New("m",
tags,
map[string]interface{}{"a": int64(2)},
now.Add(time.Second*-275))
m3, _ := metric.New("m",
tags,
map[string]interface{}{"a": int64(3)},
now.Add(time.Second*-100))
m4, _ := metric.New("m",
tags,
map[string]interface{}{"a": int64(4)},
now.Add(time.Second*-20))
final.Add(m1)
final.Add(m2)
final.Push(&acc)
final.Add(m3)
final.Push(&acc)
final.Add(m4)
final.Push(&acc)
expected := []telegraf.Metric{
testutil.MustMetric(
"m",
tags,
map[string]interface{}{
"a_final": 2,
},
now.Add(time.Second*-275),
),
testutil.MustMetric(
"m",
tags,
map[string]interface{}{
"a_final": 3,
},
now.Add(time.Second*-100),
),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.SortMetrics())
}