From a724bf487f7c46d5e5b97ea8abd02e2480643aa1 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 15 May 2019 14:46:28 -0700 Subject: [PATCH] Add final aggregator (#5820) --- Gopkg.lock | 2 + plugins/aggregators/all/all.go | 1 + plugins/aggregators/final/README.md | 48 ++++++++ plugins/aggregators/final/final.go | 72 ++++++++++++ plugins/aggregators/final/final_test.go | 144 ++++++++++++++++++++++++ testutil/metric.go | 87 +++++++++++++- testutil/metric_test.go | 52 ++++++++- 7 files changed, 402 insertions(+), 4 deletions(-) create mode 100644 plugins/aggregators/final/README.md create mode 100644 plugins/aggregators/final/final.go create mode 100644 plugins/aggregators/final/final_test.go diff --git a/Gopkg.lock b/Gopkg.lock index 505bbaa39..76c5deb62 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -519,6 +519,7 @@ name = "github.com/google/go-cmp" packages = [ "cmp", + "cmp/cmpopts", "cmp/internal/diff", "cmp/internal/function", "cmp/internal/value", @@ -1585,6 +1586,7 @@ "github.com/golang/protobuf/ptypes/empty", "github.com/golang/protobuf/ptypes/timestamp", "github.com/google/go-cmp/cmp", + "github.com/google/go-cmp/cmp/cmpopts", "github.com/google/go-github/github", "github.com/gorilla/mux", "github.com/harlow/kinesis-consumer", diff --git a/plugins/aggregators/all/all.go b/plugins/aggregators/all/all.go index ff1bbfc70..ec04c0aaf 100644 --- a/plugins/aggregators/all/all.go +++ b/plugins/aggregators/all/all.go @@ -2,6 +2,7 @@ package all import ( _ "github.com/influxdata/telegraf/plugins/aggregators/basicstats" + _ "github.com/influxdata/telegraf/plugins/aggregators/final" _ "github.com/influxdata/telegraf/plugins/aggregators/histogram" _ "github.com/influxdata/telegraf/plugins/aggregators/minmax" _ "github.com/influxdata/telegraf/plugins/aggregators/valuecounter" diff --git a/plugins/aggregators/final/README.md b/plugins/aggregators/final/README.md new file mode 100644 index 000000000..444746d78 --- /dev/null +++ b/plugins/aggregators/final/README.md @@ -0,0 +1,48 @@ +# Final Aggregator Plugin + +The final aggregator emits the last metric of a contiguous series. A +contiguous series is defined as a series which receives updates within the +time period in `series_timeout`. The contiguous series may be longer than the +time interval defined by `period`. + +This is useful for getting the final value for data sources that produce +discrete time series such as procstat, cgroup, kubernetes etc. + +When a series has not been updated within the time defined in +`series_timeout`, the last metric is emitted with the `_final` appended. + +### Configuration + +```toml +[[aggregators.final]] + ## The period on which to flush & clear the aggregator. + period = "30s" + ## If true, the original metric will be dropped by the + ## aggregator and will not get sent to the output plugins. + drop_original = false + + ## The time that a series is not updated until considering it final. + series_timeout = "5m" +``` + +### Metrics + +Measurement and tags are unchanged, fields are emitted with the suffix +`_final`. + +### Example Output + +``` +counter,host=bar i_final=3,j_final=6 1554281635115090133 +counter,host=foo i_final=3,j_final=6 1554281635112992012 +``` + +Original input: +``` +counter,host=bar i=1,j=4 1554281633101153300 +counter,host=foo i=1,j=4 1554281633099323601 +counter,host=bar i=2,j=5 1554281634107980073 +counter,host=foo i=2,j=5 1554281634105931116 +counter,host=bar i=3,j=6 1554281635115090133 +counter,host=foo i=3,j=6 1554281635112992012 +``` diff --git a/plugins/aggregators/final/final.go b/plugins/aggregators/final/final.go new file mode 100644 index 000000000..53ad0a47c --- /dev/null +++ b/plugins/aggregators/final/final.go @@ -0,0 +1,72 @@ +package final + +import ( + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/aggregators" +) + +var sampleConfig = ` + ## The period on which to flush & clear the aggregator. + period = "30s" + ## If true, the original metric will be dropped by the + ## aggregator and will not get sent to the output plugins. + drop_original = false + + ## The time that a series is not updated until considering it final. + series_timeout = "5m" +` + +type Final struct { + SeriesTimeout internal.Duration `toml:"series_timeout"` + + // The last metric for all series which are active + metricCache map[uint64]telegraf.Metric +} + +func NewFinal() *Final { + return &Final{ + SeriesTimeout: internal.Duration{Duration: 5 * time.Minute}, + metricCache: make(map[uint64]telegraf.Metric), + } +} + +func (m *Final) SampleConfig() string { + return sampleConfig +} + +func (m *Final) Description() string { + return "Report the final metric of a series" +} + +func (m *Final) Add(in telegraf.Metric) { + id := in.HashID() + m.metricCache[id] = in +} + +func (m *Final) Push(acc telegraf.Accumulator) { + // Preserve timestamp of original metric + acc.SetPrecision(time.Nanosecond) + + for id, metric := range m.metricCache { + if time.Since(metric.Time()) > m.SeriesTimeout.Duration { + fields := map[string]interface{}{} + for _, field := range metric.FieldList() { + fields[field.Key+"_final"] = field.Value + } + acc.AddFields(metric.Name(), fields, metric.Tags(), metric.Time()) + delete(m.metricCache, id) + } + } +} + +func (m *Final) Reset() { +} + +func init() { + aggregators.Add("final", func() telegraf.Aggregator { + return NewFinal() + }) +} diff --git a/plugins/aggregators/final/final_test.go b/plugins/aggregators/final/final_test.go new file mode 100644 index 000000000..1b3367fa5 --- /dev/null +++ b/plugins/aggregators/final/final_test.go @@ -0,0 +1,144 @@ +package final + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" +) + +func TestSimple(t *testing.T) { + acc := testutil.Accumulator{} + final := NewFinal() + + tags := map[string]string{"foo": "bar"} + m1, _ := metric.New("m1", + tags, + map[string]interface{}{"a": int64(1)}, + time.Unix(1530939936, 0)) + m2, _ := metric.New("m1", + tags, + map[string]interface{}{"a": int64(2)}, + time.Unix(1530939937, 0)) + m3, _ := metric.New("m1", + tags, + map[string]interface{}{"a": int64(3)}, + time.Unix(1530939938, 0)) + final.Add(m1) + final.Add(m2) + final.Add(m3) + final.Push(&acc) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "m1", + tags, + map[string]interface{}{ + "a_final": 3, + }, + time.Unix(1530939938, 0), + ), + } + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) +} + +func TestTwoTags(t *testing.T) { + acc := testutil.Accumulator{} + final := NewFinal() + + tags1 := map[string]string{"foo": "bar"} + tags2 := map[string]string{"foo": "baz"} + + m1, _ := metric.New("m1", + tags1, + map[string]interface{}{"a": int64(1)}, + time.Unix(1530939936, 0)) + m2, _ := metric.New("m1", + tags2, + map[string]interface{}{"a": int64(2)}, + time.Unix(1530939937, 0)) + m3, _ := metric.New("m1", + tags1, + map[string]interface{}{"a": int64(3)}, + time.Unix(1530939938, 0)) + final.Add(m1) + final.Add(m2) + final.Add(m3) + final.Push(&acc) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "m1", + tags2, + map[string]interface{}{ + "a_final": 2, + }, + time.Unix(1530939937, 0), + ), + testutil.MustMetric( + "m1", + tags1, + map[string]interface{}{ + "a_final": 3, + }, + time.Unix(1530939938, 0), + ), + } + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.SortMetrics()) +} + +func TestLongDifference(t *testing.T) { + acc := testutil.Accumulator{} + final := NewFinal() + final.SeriesTimeout = internal.Duration{Duration: 30 * time.Second} + tags := map[string]string{"foo": "bar"} + + now := time.Now() + + m1, _ := metric.New("m", + tags, + map[string]interface{}{"a": int64(1)}, + now.Add(time.Second*-290)) + m2, _ := metric.New("m", + tags, + map[string]interface{}{"a": int64(2)}, + now.Add(time.Second*-275)) + m3, _ := metric.New("m", + tags, + map[string]interface{}{"a": int64(3)}, + now.Add(time.Second*-100)) + m4, _ := metric.New("m", + tags, + map[string]interface{}{"a": int64(4)}, + now.Add(time.Second*-20)) + final.Add(m1) + final.Add(m2) + final.Push(&acc) + final.Add(m3) + final.Push(&acc) + final.Add(m4) + final.Push(&acc) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "m", + tags, + map[string]interface{}{ + "a_final": 2, + }, + now.Add(time.Second*-275), + ), + testutil.MustMetric( + "m", + tags, + map[string]interface{}{ + "a_final": 3, + }, + now.Add(time.Second*-100), + ), + } + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.SortMetrics()) +} diff --git a/testutil/metric.go b/testutil/metric.go index afb3de7fe..b92c724f1 100644 --- a/testutil/metric.go +++ b/testutil/metric.go @@ -1,11 +1,13 @@ package testutil import ( + "reflect" "sort" "testing" "time" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" ) @@ -18,6 +20,77 @@ type metricDiff struct { Time time.Time } +func lessFunc(lhs, rhs *metricDiff) bool { + if lhs.Measurement != rhs.Measurement { + return lhs.Measurement < rhs.Measurement + } + + for i := 0; ; i++ { + if i >= len(lhs.Tags) && i >= len(rhs.Tags) { + break + } else if i >= len(lhs.Tags) { + return true + } else if i >= len(rhs.Tags) { + return false + } + + if lhs.Tags[i].Key != rhs.Tags[i].Key { + return lhs.Tags[i].Key < rhs.Tags[i].Key + } + if lhs.Tags[i].Value != rhs.Tags[i].Value { + return lhs.Tags[i].Value < rhs.Tags[i].Value + } + } + + for i := 0; ; i++ { + if i >= len(lhs.Fields) && i >= len(rhs.Fields) { + break + } else if i >= len(lhs.Fields) { + return true + } else if i >= len(rhs.Fields) { + return false + } + + if lhs.Fields[i].Key != rhs.Fields[i].Key { + return lhs.Fields[i].Key < rhs.Fields[i].Key + } + + if lhs.Fields[i].Value != rhs.Fields[i].Value { + ltype := reflect.TypeOf(lhs.Fields[i].Value) + rtype := reflect.TypeOf(lhs.Fields[i].Value) + + if ltype.Kind() != rtype.Kind() { + return ltype.Kind() < rtype.Kind() + } + + switch v := lhs.Fields[i].Value.(type) { + case int64: + return v < lhs.Fields[i].Value.(int64) + case uint64: + return v < lhs.Fields[i].Value.(uint64) + case float64: + return v < lhs.Fields[i].Value.(float64) + case string: + return v < lhs.Fields[i].Value.(string) + case bool: + return !v + default: + panic("unknown type") + } + } + } + + if lhs.Type != rhs.Type { + return lhs.Type < rhs.Type + } + + if lhs.Time.UnixNano() != rhs.Time.UnixNano() { + return lhs.Time.UnixNano() < rhs.Time.UnixNano() + } + + return false +} + func newMetricDiff(metric telegraf.Metric) *metricDiff { if metric == nil { return nil @@ -45,6 +118,12 @@ func newMetricDiff(metric telegraf.Metric) *metricDiff { return m } +// SortMetrics enables sorting metrics before comparison. +func SortMetrics() cmp.Option { + return cmpopts.SortSlices(lessFunc) +} + +// MetricEqual returns true if the metrics are equal. func MetricEqual(expected, actual telegraf.Metric) bool { var lhs, rhs *metricDiff if expected != nil { @@ -57,6 +136,8 @@ func MetricEqual(expected, actual telegraf.Metric) bool { return cmp.Equal(lhs, rhs) } +// RequireMetricEqual halts the test with an error if the metrics are not +// equal. func RequireMetricEqual(t *testing.T, expected, actual telegraf.Metric) { t.Helper() @@ -73,7 +154,9 @@ func RequireMetricEqual(t *testing.T, expected, actual telegraf.Metric) { } } -func RequireMetricsEqual(t *testing.T, expected, actual []telegraf.Metric) { +// RequireMetricsEqual halts the test with an error if the array of metrics +// are not equal. +func RequireMetricsEqual(t *testing.T, expected, actual []telegraf.Metric, opts ...cmp.Option) { t.Helper() lhs := make([]*metricDiff, 0, len(expected)) @@ -84,7 +167,7 @@ func RequireMetricsEqual(t *testing.T, expected, actual []telegraf.Metric) { for _, m := range actual { rhs = append(rhs, newMetricDiff(m)) } - if diff := cmp.Diff(lhs, rhs); diff != "" { + if diff := cmp.Diff(lhs, rhs, opts...); diff != "" { t.Fatalf("[]telegraf.Metric\n--- expected\n+++ actual\n%s", diff) } } diff --git a/testutil/metric_test.go b/testutil/metric_test.go index 5b5ef01f4..0c999185a 100644 --- a/testutil/metric_test.go +++ b/testutil/metric_test.go @@ -4,18 +4,19 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" ) -func TestRequireMetricsEqual(t *testing.T) { +func TestRequireMetricEqual(t *testing.T) { tests := []struct { name string got telegraf.Metric want telegraf.Metric }{ { - name: "telegraf and testutil metrics should be equal", + name: "equal metrics should be equal", got: func() telegraf.Metric { m, _ := metric.New( "test", @@ -56,3 +57,50 @@ func TestRequireMetricsEqual(t *testing.T) { }) } } + +func TestRequireMetricsEqual(t *testing.T) { + tests := []struct { + name string + got []telegraf.Metric + want []telegraf.Metric + opts []cmp.Option + }{ + { + name: "sort metrics option sorts by name", + got: []telegraf.Metric{ + MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{}, + time.Unix(0, 0), + ), + MustMetric( + "net", + map[string]string{}, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + want: []telegraf.Metric{ + MustMetric( + "net", + map[string]string{}, + map[string]interface{}{}, + time.Unix(0, 0), + ), + MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + opts: []cmp.Option{SortMetrics()}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + RequireMetricsEqual(t, tt.want, tt.got, tt.opts...) + }) + } +}