diff --git a/plugins/aggregators/basicstats/README.md b/plugins/aggregators/basicstats/README.md index 0e3e5558c..f96dfa134 100644 --- a/plugins/aggregators/basicstats/README.md +++ b/plugins/aggregators/basicstats/README.md @@ -8,14 +8,26 @@ emitting the aggregate every `period` seconds. ```toml # Keep the aggregate basicstats of each metric passing through. [[aggregators.basicstats]] + ## General Aggregator Arguments: + ## The period on which to flush & clear the aggregator. period = "30s" + ## If true, the original metric will be dropped by the ## aggregator and will not get sent to the output plugins. drop_original = false + + ## BasicStats Arguments: + + ## Configures which basic stats to push as fields + stats = ["count","min","max","mean","stdev","s2"] ``` +- stats + - If not specified, all stats are aggregated and pushed as fields + - If empty array, no stats are aggregated + ### Measurements & Fields: - measurement1 diff --git a/plugins/aggregators/basicstats/basicstats.go b/plugins/aggregators/basicstats/basicstats.go index 40d65c873..4ad241e91 100644 --- a/plugins/aggregators/basicstats/basicstats.go +++ b/plugins/aggregators/basicstats/basicstats.go @@ -1,6 +1,7 @@ package basicstats import ( + "log" "math" "github.com/influxdata/telegraf" @@ -8,10 +9,22 @@ import ( ) type BasicStats struct { - cache map[uint64]aggregate + Stats []string `toml:"stats"` + + cache map[uint64]aggregate + statsConfig *configuredStats } -func NewBasicStats() telegraf.Aggregator { +type configuredStats struct { + count bool + min bool + max bool + mean bool + variance bool + stdev bool +} + +func NewBasicStats() *BasicStats { mm := &BasicStats{} mm.Reset() return mm @@ -114,25 +127,103 @@ func (m *BasicStats) Add(in telegraf.Metric) { } func (m *BasicStats) Push(acc telegraf.Accumulator) { + + config := getConfiguredStats(m) + for _, aggregate := range m.cache { fields := map[string]interface{}{} for k, v := range aggregate.fields { - fields[k+"_count"] = v.count - fields[k+"_min"] = v.min - fields[k+"_max"] = v.max - fields[k+"_mean"] = v.mean + + if config.count { + fields[k+"_count"] = v.count + } + if config.min { + fields[k+"_min"] = v.min + } + if config.max { + fields[k+"_max"] = v.max + } + if config.mean { + fields[k+"_mean"] = v.mean + } + //v.count always >=1 if v.count > 1 { variance := v.M2 / (v.count - 1) - fields[k+"_s2"] = variance - fields[k+"_stdev"] = math.Sqrt(variance) + + if config.variance { + fields[k+"_s2"] = variance + } + if config.stdev { + fields[k+"_stdev"] = math.Sqrt(variance) + } } //if count == 1 StdDev = infinite => so I won't send data } - acc.AddFields(aggregate.name, fields, aggregate.tags) + + if len(fields) > 0 { + acc.AddFields(aggregate.name, fields, aggregate.tags) + } } } +func parseStats(names []string) *configuredStats { + + parsed := &configuredStats{} + + for _, name := range names { + + switch name { + + case "count": + parsed.count = true + case "min": + parsed.min = true + case "max": + parsed.max = true + case "mean": + parsed.mean = true + case "s2": + parsed.variance = true + case "stdev": + parsed.stdev = true + + default: + log.Printf("W! Unrecognized basic stat '%s', ignoring", name) + } + } + + return parsed +} + +func defaultStats() *configuredStats { + + defaults := &configuredStats{} + + defaults.count = true + defaults.min = true + defaults.max = true + defaults.mean = true + defaults.variance = true + defaults.stdev = true + + return defaults +} + +func getConfiguredStats(m *BasicStats) *configuredStats { + + if m.statsConfig == nil { + + if m.Stats == nil { + m.statsConfig = defaultStats() + } else { + m.statsConfig = parseStats(m.Stats) + } + } + + return m.statsConfig +} + func (m *BasicStats) Reset() { m.cache = make(map[uint64]aggregate) } diff --git a/plugins/aggregators/basicstats/basicstats_test.go b/plugins/aggregators/basicstats/basicstats_test.go index 74237c0c7..d2642b568 100644 --- a/plugins/aggregators/basicstats/basicstats_test.go +++ b/plugins/aggregators/basicstats/basicstats_test.go @@ -149,3 +149,211 @@ func TestBasicStatsDifferentPeriods(t *testing.T) { } acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) } + +// Test only aggregating count +func TestBasicStatsWithOnlyCount(t *testing.T) { + + aggregator := NewBasicStats() + aggregator.Stats = []string{"count"} + + aggregator.Add(m1) + aggregator.Add(m2) + + acc := testutil.Accumulator{} + aggregator.Push(&acc) + + expectedFields := map[string]interface{}{ + "a_count": float64(2), + "b_count": float64(2), + "c_count": float64(2), + "d_count": float64(2), + "e_count": float64(1), + } + expectedTags := map[string]string{ + "foo": "bar", + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) +} + +// Test only aggregating minimum +func TestBasicStatsWithOnlyMin(t *testing.T) { + + aggregator := NewBasicStats() + aggregator.Stats = []string{"min"} + + aggregator.Add(m1) + aggregator.Add(m2) + + acc := testutil.Accumulator{} + aggregator.Push(&acc) + + expectedFields := map[string]interface{}{ + "a_min": float64(1), + "b_min": float64(1), + "c_min": float64(2), + "d_min": float64(2), + "e_min": float64(200), + } + expectedTags := map[string]string{ + "foo": "bar", + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) +} + +// Test only aggregating maximum +func TestBasicStatsWithOnlyMax(t *testing.T) { + + aggregator := NewBasicStats() + aggregator.Stats = []string{"max"} + + aggregator.Add(m1) + aggregator.Add(m2) + + acc := testutil.Accumulator{} + aggregator.Push(&acc) + + expectedFields := map[string]interface{}{ + "a_max": float64(1), + "b_max": float64(3), + "c_max": float64(4), + "d_max": float64(6), + "e_max": float64(200), + } + expectedTags := map[string]string{ + "foo": "bar", + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) +} + +// Test only aggregating mean +func TestBasicStatsWithOnlyMean(t *testing.T) { + + aggregator := NewBasicStats() + aggregator.Stats = []string{"mean"} + + aggregator.Add(m1) + aggregator.Add(m2) + + acc := testutil.Accumulator{} + aggregator.Push(&acc) + + expectedFields := map[string]interface{}{ + "a_mean": float64(1), + "b_mean": float64(2), + "c_mean": float64(3), + "d_mean": float64(4), + "e_mean": float64(200), + } + expectedTags := map[string]string{ + "foo": "bar", + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) +} + +// Test only aggregating variance +func TestBasicStatsWithOnlyVariance(t *testing.T) { + + aggregator := NewBasicStats() + aggregator.Stats = []string{"s2"} + + aggregator.Add(m1) + aggregator.Add(m2) + + acc := testutil.Accumulator{} + aggregator.Push(&acc) + + expectedFields := map[string]interface{}{ + "a_s2": float64(0), + "b_s2": float64(2), + "c_s2": float64(2), + "d_s2": float64(8), + } + expectedTags := map[string]string{ + "foo": "bar", + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) +} + +// Test only aggregating standard deviation +func TestBasicStatsWithOnlyStandardDeviation(t *testing.T) { + + aggregator := NewBasicStats() + aggregator.Stats = []string{"stdev"} + + aggregator.Add(m1) + aggregator.Add(m2) + + acc := testutil.Accumulator{} + aggregator.Push(&acc) + + expectedFields := map[string]interface{}{ + "a_stdev": float64(0), + "b_stdev": math.Sqrt(2), + "c_stdev": math.Sqrt(2), + "d_stdev": math.Sqrt(8), + } + expectedTags := map[string]string{ + "foo": "bar", + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) +} + +// Test only aggregating minimum and maximum +func TestBasicStatsWithMinAndMax(t *testing.T) { + + aggregator := NewBasicStats() + aggregator.Stats = []string{"min", "max"} + + aggregator.Add(m1) + aggregator.Add(m2) + + acc := testutil.Accumulator{} + aggregator.Push(&acc) + + expectedFields := map[string]interface{}{ + "a_max": float64(1), //a + "a_min": float64(1), + "b_max": float64(3), //b + "b_min": float64(1), + "c_max": float64(4), //c + "c_min": float64(2), + "d_max": float64(6), //d + "d_min": float64(2), + "e_max": float64(200), //e + "e_min": float64(200), + } + expectedTags := map[string]string{ + "foo": "bar", + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) +} + +// Test that if an empty array is passed, no points are pushed +func TestBasicStatsWithNoStats(t *testing.T) { + + aggregator := NewBasicStats() + aggregator.Stats = []string{} + + aggregator.Add(m1) + aggregator.Add(m2) + + acc := testutil.Accumulator{} + aggregator.Push(&acc) + + acc.AssertDoesNotContainMeasurement(t, "m1") +} + +// Test that if an unknown stat is configured, it doesn't explode +func TestBasicStatsWithUnknownStat(t *testing.T) { + + aggregator := NewBasicStats() + aggregator.Stats = []string{"crazy"} + + aggregator.Add(m1) + aggregator.Add(m2) + + acc := testutil.Accumulator{} + aggregator.Push(&acc) + + acc.AssertDoesNotContainMeasurement(t, "m1") +}