From b641f06552cdc7a1ce6cac8c8080af04f56da70a Mon Sep 17 00:00:00 2001 From: Toni Moreno Date: Tue, 10 Oct 2017 21:02:01 +0200 Subject: [PATCH] Add new basicstats aggregator (#2167) --- plugins/aggregators/all/all.go | 1 + plugins/aggregators/basicstats/README.md | 43 +++++ plugins/aggregators/basicstats/basicstats.go | 155 ++++++++++++++++++ .../aggregators/basicstats/basicstats_test.go | 151 +++++++++++++++++ 4 files changed, 350 insertions(+) create mode 100644 plugins/aggregators/basicstats/README.md create mode 100644 plugins/aggregators/basicstats/basicstats.go create mode 100644 plugins/aggregators/basicstats/basicstats_test.go diff --git a/plugins/aggregators/all/all.go b/plugins/aggregators/all/all.go index c4d430cc9..98aecb83f 100644 --- a/plugins/aggregators/all/all.go +++ b/plugins/aggregators/all/all.go @@ -1,6 +1,7 @@ package all import ( + _ "github.com/influxdata/telegraf/plugins/aggregators/basicstats" _ "github.com/influxdata/telegraf/plugins/aggregators/histogram" _ "github.com/influxdata/telegraf/plugins/aggregators/minmax" ) diff --git a/plugins/aggregators/basicstats/README.md b/plugins/aggregators/basicstats/README.md new file mode 100644 index 000000000..0e3e5558c --- /dev/null +++ b/plugins/aggregators/basicstats/README.md @@ -0,0 +1,43 @@ +# BasicStats Aggregator Plugin + +The BasicStats aggregator plugin give us count,max,min,mean,s2(variance), stdev for a set of values, +emitting the aggregate every `period` seconds. + +### Configuration: + +```toml +# Keep the aggregate basicstats of each metric passing through. +[[aggregators.basicstats]] + ## General Aggregator Arguments: + ## The period on which to flush & clear the aggregator. + period = "30s" + ## If true, the original metric will be dropped by the + ## aggregator and will not get sent to the output plugins. + drop_original = false +``` + +### Measurements & Fields: + +- measurement1 + - field1_count + - field1_max + - field1_min + - field1_mean + - field1_s2 (variance) + - field1_stdev (standard deviation) + +### Tags: + +No tags are applied by this aggregator. + +### Example Output: + +``` +$ telegraf --config telegraf.conf --quiet +system,host=tars load1=1 1475583980000000000 +system,host=tars load1=1 1475583990000000000 +system,host=tars load1_count=2,load1_max=1,load1_min=1,load1_mean=1,load1_s2=0,load1_stdev=0 1475584010000000000 +system,host=tars load1=1 1475584020000000000 +system,host=tars load1=3 1475584030000000000 +system,host=tars load1_count=2,load1_max=3,load1_min=1,load1_mean=2,load1_s2=2,load1_stdev=1.414162 1475584010000000000 +``` diff --git a/plugins/aggregators/basicstats/basicstats.go b/plugins/aggregators/basicstats/basicstats.go new file mode 100644 index 000000000..40d65c873 --- /dev/null +++ b/plugins/aggregators/basicstats/basicstats.go @@ -0,0 +1,155 @@ +package basicstats + +import ( + "math" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/aggregators" +) + +type BasicStats struct { + cache map[uint64]aggregate +} + +func NewBasicStats() telegraf.Aggregator { + mm := &BasicStats{} + mm.Reset() + return mm +} + +type aggregate struct { + fields map[string]basicstats + name string + tags map[string]string +} + +type basicstats struct { + count float64 + min float64 + max float64 + mean float64 + M2 float64 //intermedia value for variance/stdev +} + +var sampleConfig = ` + ## General Aggregator Arguments: + ## The period on which to flush & clear the aggregator. + period = "30s" + ## If true, the original metric will be dropped by the + ## aggregator and will not get sent to the output plugins. + drop_original = false +` + +func (m *BasicStats) SampleConfig() string { + return sampleConfig +} + +func (m *BasicStats) Description() string { + return "Keep the aggregate basicstats of each metric passing through." +} + +func (m *BasicStats) Add(in telegraf.Metric) { + id := in.HashID() + if _, ok := m.cache[id]; !ok { + // hit an uncached metric, create caches for first time: + a := aggregate{ + name: in.Name(), + tags: in.Tags(), + fields: make(map[string]basicstats), + } + for k, v := range in.Fields() { + if fv, ok := convert(v); ok { + a.fields[k] = basicstats{ + count: 1, + min: fv, + max: fv, + mean: fv, + M2: 0.0, + } + } + } + m.cache[id] = a + } else { + for k, v := range in.Fields() { + if fv, ok := convert(v); ok { + if _, ok := m.cache[id].fields[k]; !ok { + // hit an uncached field of a cached metric + m.cache[id].fields[k] = basicstats{ + count: 1, + min: fv, + max: fv, + mean: fv, + M2: 0.0, + } + continue + } + + tmp := m.cache[id].fields[k] + //https://en.m.wikipedia.org/wiki/Algorithms_for_calculating_variance + //variable initialization + x := fv + mean := tmp.mean + M2 := tmp.M2 + //counter compute + n := tmp.count + 1 + tmp.count = n + //mean compute + delta := x - mean + mean = mean + delta/n + tmp.mean = mean + //variance/stdev compute + M2 = M2 + delta*(x-mean) + tmp.M2 = M2 + //max/min compute + if fv < tmp.min { + tmp.min = fv + } else if fv > tmp.max { + tmp.max = fv + } + //store final data + m.cache[id].fields[k] = tmp + } + } + } +} + +func (m *BasicStats) Push(acc telegraf.Accumulator) { + for _, aggregate := range m.cache { + fields := map[string]interface{}{} + for k, v := range aggregate.fields { + fields[k+"_count"] = v.count + fields[k+"_min"] = v.min + fields[k+"_max"] = v.max + fields[k+"_mean"] = v.mean + //v.count always >=1 + if v.count > 1 { + variance := v.M2 / (v.count - 1) + fields[k+"_s2"] = variance + fields[k+"_stdev"] = math.Sqrt(variance) + } + //if count == 1 StdDev = infinite => so I won't send data + } + acc.AddFields(aggregate.name, fields, aggregate.tags) + } +} + +func (m *BasicStats) Reset() { + m.cache = make(map[uint64]aggregate) +} + +func convert(in interface{}) (float64, bool) { + switch v := in.(type) { + case float64: + return v, true + case int64: + return float64(v), true + default: + return 0, false + } +} + +func init() { + aggregators.Add("basicstats", func() telegraf.Aggregator { + return NewBasicStats() + }) +} diff --git a/plugins/aggregators/basicstats/basicstats_test.go b/plugins/aggregators/basicstats/basicstats_test.go new file mode 100644 index 000000000..74237c0c7 --- /dev/null +++ b/plugins/aggregators/basicstats/basicstats_test.go @@ -0,0 +1,151 @@ +package basicstats + +import ( + "math" + "testing" + "time" + + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" +) + +var m1, _ = metric.New("m1", + map[string]string{"foo": "bar"}, + map[string]interface{}{ + "a": int64(1), + "b": int64(1), + "c": float64(2), + "d": float64(2), + }, + time.Now(), +) +var m2, _ = metric.New("m1", + map[string]string{"foo": "bar"}, + map[string]interface{}{ + "a": int64(1), + "b": int64(3), + "c": float64(4), + "d": float64(6), + "e": float64(200), + "ignoreme": "string", + "andme": true, + }, + time.Now(), +) + +func BenchmarkApply(b *testing.B) { + minmax := NewBasicStats() + + for n := 0; n < b.N; n++ { + minmax.Add(m1) + minmax.Add(m2) + } +} + +// Test two metrics getting added. +func TestBasicStatsWithPeriod(t *testing.T) { + acc := testutil.Accumulator{} + minmax := NewBasicStats() + + minmax.Add(m1) + minmax.Add(m2) + minmax.Push(&acc) + + expectedFields := map[string]interface{}{ + "a_count": float64(2), //a + "a_max": float64(1), + "a_min": float64(1), + "a_mean": float64(1), + "a_stdev": float64(0), + "a_s2": float64(0), + "b_count": float64(2), //b + "b_max": float64(3), + "b_min": float64(1), + "b_mean": float64(2), + "b_s2": float64(2), + "b_stdev": math.Sqrt(2), + "c_count": float64(2), //c + "c_max": float64(4), + "c_min": float64(2), + "c_mean": float64(3), + "c_s2": float64(2), + "c_stdev": math.Sqrt(2), + "d_count": float64(2), //d + "d_max": float64(6), + "d_min": float64(2), + "d_mean": float64(4), + "d_s2": float64(8), + "d_stdev": math.Sqrt(8), + "e_count": float64(1), //e + "e_max": float64(200), + "e_min": float64(200), + "e_mean": float64(200), + } + expectedTags := map[string]string{ + "foo": "bar", + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) +} + +// Test two metrics getting added with a push/reset in between (simulates +// getting added in different periods.) +func TestBasicStatsDifferentPeriods(t *testing.T) { + acc := testutil.Accumulator{} + minmax := NewBasicStats() + + minmax.Add(m1) + minmax.Push(&acc) + expectedFields := map[string]interface{}{ + "a_count": float64(1), //a + "a_max": float64(1), + "a_min": float64(1), + "a_mean": float64(1), + "b_count": float64(1), //b + "b_max": float64(1), + "b_min": float64(1), + "b_mean": float64(1), + "c_count": float64(1), //c + "c_max": float64(2), + "c_min": float64(2), + "c_mean": float64(2), + "d_count": float64(1), //d + "d_max": float64(2), + "d_min": float64(2), + "d_mean": float64(2), + } + expectedTags := map[string]string{ + "foo": "bar", + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) + + acc.ClearMetrics() + minmax.Reset() + minmax.Add(m2) + minmax.Push(&acc) + expectedFields = map[string]interface{}{ + "a_count": float64(1), //a + "a_max": float64(1), + "a_min": float64(1), + "a_mean": float64(1), + "b_count": float64(1), //b + "b_max": float64(3), + "b_min": float64(3), + "b_mean": float64(3), + "c_count": float64(1), //c + "c_max": float64(4), + "c_min": float64(4), + "c_mean": float64(4), + "d_count": float64(1), //d + "d_max": float64(6), + "d_min": float64(6), + "d_mean": float64(6), + "e_count": float64(1), //e + "e_max": float64(200), + "e_min": float64(200), + "e_mean": float64(200), + } + expectedTags = map[string]string{ + "foo": "bar", + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) +}