Add sum stat to basicstats aggregator (#3797)

This commit is contained in:
Carl Pacey 2018-03-05 18:26:31 -05:00 committed by Daniel Nelson
parent 491b27adbb
commit 54f4a5a502
3 changed files with 170 additions and 5 deletions

View File

@ -1,6 +1,6 @@
# BasicStats Aggregator Plugin # BasicStats Aggregator Plugin
The BasicStats aggregator plugin give us count,max,min,mean,s2(variance), stdev for a set of values, The BasicStats aggregator plugin give us count,max,min,mean,sum,s2(variance), stdev for a set of values,
emitting the aggregate every `period` seconds. emitting the aggregate every `period` seconds.
### Configuration: ### Configuration:
@ -21,11 +21,11 @@ emitting the aggregate every `period` seconds.
## BasicStats Arguments: ## BasicStats Arguments:
## Configures which basic stats to push as fields ## Configures which basic stats to push as fields
stats = ["count","min","max","mean","stdev","s2"] stats = ["count","min","max","mean","stdev","s2","sum"]
``` ```
- stats - stats
- If not specified, all stats are aggregated and pushed as fields - If not specified, then `count`, `min`, `max`, `mean`, `stdev`, and `s2` are aggregated and pushed as fields. `sum` is not aggregated by default to maintain backwards compatibility.
- If empty array, no stats are aggregated - If empty array, no stats are aggregated
### Measurements & Fields: ### Measurements & Fields:
@ -35,6 +35,7 @@ emitting the aggregate every `period` seconds.
- field1_max - field1_max
- field1_min - field1_min
- field1_mean - field1_mean
- field1_sum
- field1_s2 (variance) - field1_s2 (variance)
- field1_stdev (standard deviation) - field1_stdev (standard deviation)
@ -48,8 +49,8 @@ No tags are applied by this aggregator.
$ telegraf --config telegraf.conf --quiet $ telegraf --config telegraf.conf --quiet
system,host=tars load1=1 1475583980000000000 system,host=tars load1=1 1475583980000000000
system,host=tars load1=1 1475583990000000000 system,host=tars load1=1 1475583990000000000
system,host=tars load1_count=2,load1_max=1,load1_min=1,load1_mean=1,load1_s2=0,load1_stdev=0 1475584010000000000 system,host=tars load1_count=2,load1_max=1,load1_min=1,load1_mean=1,load1_sum=2,load1_s2=0,load1_stdev=0 1475584010000000000
system,host=tars load1=1 1475584020000000000 system,host=tars load1=1 1475584020000000000
system,host=tars load1=3 1475584030000000000 system,host=tars load1=3 1475584030000000000
system,host=tars load1_count=2,load1_max=3,load1_min=1,load1_mean=2,load1_s2=2,load1_stdev=1.414162 1475584010000000000 system,host=tars load1_count=2,load1_max=3,load1_min=1,load1_mean=2,load1_sum=4,load1_s2=2,load1_stdev=1.414162 1475584010000000000
``` ```

View File

@ -22,6 +22,7 @@ type configuredStats struct {
mean bool mean bool
variance bool variance bool
stdev bool stdev bool
sum bool
} }
func NewBasicStats() *BasicStats { func NewBasicStats() *BasicStats {
@ -40,6 +41,7 @@ type basicstats struct {
count float64 count float64
min float64 min float64
max float64 max float64
sum float64
mean float64 mean float64
M2 float64 //intermedia value for variance/stdev M2 float64 //intermedia value for variance/stdev
} }
@ -77,6 +79,7 @@ func (m *BasicStats) Add(in telegraf.Metric) {
min: fv, min: fv,
max: fv, max: fv,
mean: fv, mean: fv,
sum: fv,
M2: 0.0, M2: 0.0,
} }
} }
@ -92,6 +95,7 @@ func (m *BasicStats) Add(in telegraf.Metric) {
min: fv, min: fv,
max: fv, max: fv,
mean: fv, mean: fv,
sum: fv,
M2: 0.0, M2: 0.0,
} }
continue continue
@ -119,6 +123,8 @@ func (m *BasicStats) Add(in telegraf.Metric) {
} else if fv > tmp.max { } else if fv > tmp.max {
tmp.max = fv tmp.max = fv
} }
//sum compute
tmp.sum += fv
//store final data //store final data
m.cache[id].fields[k] = tmp m.cache[id].fields[k] = tmp
} }
@ -146,6 +152,9 @@ func (m *BasicStats) Push(acc telegraf.Accumulator) {
if config.mean { if config.mean {
fields[k+"_mean"] = v.mean fields[k+"_mean"] = v.mean
} }
if config.sum {
fields[k+"_sum"] = v.sum
}
//v.count always >=1 //v.count always >=1
if v.count > 1 { if v.count > 1 {
@ -187,6 +196,8 @@ func parseStats(names []string) *configuredStats {
parsed.variance = true parsed.variance = true
case "stdev": case "stdev":
parsed.stdev = true parsed.stdev = true
case "sum":
parsed.sum = true
default: default:
log.Printf("W! Unrecognized basic stat '%s', ignoring", name) log.Printf("W! Unrecognized basic stat '%s', ignoring", name)
@ -206,6 +217,7 @@ func defaultStats() *configuredStats {
defaults.mean = true defaults.mean = true
defaults.variance = true defaults.variance = true
defaults.stdev = true defaults.stdev = true
defaults.sum = false
return defaults return defaults
} }

View File

@ -7,6 +7,7 @@ import (
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
) )
var m1, _ = metric.New("m1", var m1, _ = metric.New("m1",
@ -250,6 +251,83 @@ func TestBasicStatsWithOnlyMean(t *testing.T) {
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
} }
// Test only aggregating sum
func TestBasicStatsWithOnlySum(t *testing.T) {
aggregator := NewBasicStats()
aggregator.Stats = []string{"sum"}
aggregator.Add(m1)
aggregator.Add(m2)
acc := testutil.Accumulator{}
aggregator.Push(&acc)
expectedFields := map[string]interface{}{
"a_sum": float64(2),
"b_sum": float64(4),
"c_sum": float64(6),
"d_sum": float64(8),
"e_sum": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}
// Verify that sum doesn't suffer from floating point errors. Early
// implementations of sum were calulated from mean and count, which
// e.g. summed "1, 1, 5, 1" as "7.999999..." instead of 8.
func TestBasicStatsWithOnlySumFloatingPointErrata(t *testing.T) {
var sum1, _ = metric.New("m1",
map[string]string{},
map[string]interface{}{
"a": int64(1),
},
time.Now(),
)
var sum2, _ = metric.New("m1",
map[string]string{},
map[string]interface{}{
"a": int64(1),
},
time.Now(),
)
var sum3, _ = metric.New("m1",
map[string]string{},
map[string]interface{}{
"a": int64(5),
},
time.Now(),
)
var sum4, _ = metric.New("m1",
map[string]string{},
map[string]interface{}{
"a": int64(1),
},
time.Now(),
)
aggregator := NewBasicStats()
aggregator.Stats = []string{"sum"}
aggregator.Add(sum1)
aggregator.Add(sum2)
aggregator.Add(sum3)
aggregator.Add(sum4)
acc := testutil.Accumulator{}
aggregator.Push(&acc)
expectedFields := map[string]interface{}{
"a_sum": float64(8),
}
expectedTags := map[string]string{}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}
// Test only aggregating variance // Test only aggregating variance
func TestBasicStatsWithOnlyVariance(t *testing.T) { func TestBasicStatsWithOnlyVariance(t *testing.T) {
@ -328,6 +406,57 @@ func TestBasicStatsWithMinAndMax(t *testing.T) {
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
} }
// Test aggregating with all stats
func TestBasicStatsWithAllStats(t *testing.T) {
acc := testutil.Accumulator{}
minmax := NewBasicStats()
minmax.Stats = []string{"count", "min", "max", "mean", "stdev", "s2", "sum"}
minmax.Add(m1)
minmax.Add(m2)
minmax.Push(&acc)
expectedFields := map[string]interface{}{
"a_count": float64(2), //a
"a_max": float64(1),
"a_min": float64(1),
"a_mean": float64(1),
"a_stdev": float64(0),
"a_s2": float64(0),
"a_sum": float64(2),
"b_count": float64(2), //b
"b_max": float64(3),
"b_min": float64(1),
"b_mean": float64(2),
"b_s2": float64(2),
"b_sum": float64(4),
"b_stdev": math.Sqrt(2),
"c_count": float64(2), //c
"c_max": float64(4),
"c_min": float64(2),
"c_mean": float64(3),
"c_s2": float64(2),
"c_stdev": math.Sqrt(2),
"c_sum": float64(6),
"d_count": float64(2), //d
"d_max": float64(6),
"d_min": float64(2),
"d_mean": float64(4),
"d_s2": float64(8),
"d_stdev": math.Sqrt(8),
"d_sum": float64(8),
"e_count": float64(1), //e
"e_max": float64(200),
"e_min": float64(200),
"e_mean": float64(200),
"e_sum": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}
// Test that if an empty array is passed, no points are pushed // Test that if an empty array is passed, no points are pushed
func TestBasicStatsWithNoStats(t *testing.T) { func TestBasicStatsWithNoStats(t *testing.T) {
@ -357,3 +486,26 @@ func TestBasicStatsWithUnknownStat(t *testing.T) {
acc.AssertDoesNotContainMeasurement(t, "m1") acc.AssertDoesNotContainMeasurement(t, "m1")
} }
// Test that if Stats isn't supplied, then we only do count, min, max, mean,
// stdev, and s2. We purposely exclude sum for backwards compatability,
// otherwise user's working systems will suddenly (and surprisingly) start
// capturing sum without their input.
func TestBasicStatsWithDefaultStats(t *testing.T) {
aggregator := NewBasicStats()
aggregator.Add(m1)
aggregator.Add(m2)
acc := testutil.Accumulator{}
aggregator.Push(&acc)
assert.True(t, acc.HasField("m1", "a_count"))
assert.True(t, acc.HasField("m1", "a_min"))
assert.True(t, acc.HasField("m1", "a_max"))
assert.True(t, acc.HasField("m1", "a_mean"))
assert.True(t, acc.HasField("m1", "a_stdev"))
assert.True(t, acc.HasField("m1", "a_s2"))
assert.False(t, acc.HasField("m1", "a_sum"))
}