Add control over which stats to gather in basicstats aggregator (#3580)
This commit is contained in:
parent
dfb68c5810
commit
e1bc191f9f
|
@ -8,14 +8,26 @@ emitting the aggregate every `period` seconds.
|
||||||
```toml
|
```toml
|
||||||
# Keep the aggregate basicstats of each metric passing through.
|
# Keep the aggregate basicstats of each metric passing through.
|
||||||
[[aggregators.basicstats]]
|
[[aggregators.basicstats]]
|
||||||
|
|
||||||
## General Aggregator Arguments:
|
## General Aggregator Arguments:
|
||||||
|
|
||||||
## The period on which to flush & clear the aggregator.
|
## The period on which to flush & clear the aggregator.
|
||||||
period = "30s"
|
period = "30s"
|
||||||
|
|
||||||
## If true, the original metric will be dropped by the
|
## If true, the original metric will be dropped by the
|
||||||
## aggregator and will not get sent to the output plugins.
|
## aggregator and will not get sent to the output plugins.
|
||||||
drop_original = false
|
drop_original = false
|
||||||
|
|
||||||
|
## BasicStats Arguments:
|
||||||
|
|
||||||
|
## Configures which basic stats to push as fields
|
||||||
|
stats = ["count","min","max","mean","stdev","s2"]
|
||||||
```
|
```
|
||||||
|
|
||||||
|
- stats
|
||||||
|
- If not specified, all stats are aggregated and pushed as fields
|
||||||
|
- If empty array, no stats are aggregated
|
||||||
|
|
||||||
### Measurements & Fields:
|
### Measurements & Fields:
|
||||||
|
|
||||||
- measurement1
|
- measurement1
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package basicstats
|
package basicstats
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"log"
|
||||||
"math"
|
"math"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
@ -8,10 +9,22 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type BasicStats struct {
|
type BasicStats struct {
|
||||||
cache map[uint64]aggregate
|
Stats []string `toml:"stats"`
|
||||||
|
|
||||||
|
cache map[uint64]aggregate
|
||||||
|
statsConfig *configuredStats
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBasicStats() telegraf.Aggregator {
|
type configuredStats struct {
|
||||||
|
count bool
|
||||||
|
min bool
|
||||||
|
max bool
|
||||||
|
mean bool
|
||||||
|
variance bool
|
||||||
|
stdev bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBasicStats() *BasicStats {
|
||||||
mm := &BasicStats{}
|
mm := &BasicStats{}
|
||||||
mm.Reset()
|
mm.Reset()
|
||||||
return mm
|
return mm
|
||||||
|
@ -114,25 +127,103 @@ func (m *BasicStats) Add(in telegraf.Metric) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *BasicStats) Push(acc telegraf.Accumulator) {
|
func (m *BasicStats) Push(acc telegraf.Accumulator) {
|
||||||
|
|
||||||
|
config := getConfiguredStats(m)
|
||||||
|
|
||||||
for _, aggregate := range m.cache {
|
for _, aggregate := range m.cache {
|
||||||
fields := map[string]interface{}{}
|
fields := map[string]interface{}{}
|
||||||
for k, v := range aggregate.fields {
|
for k, v := range aggregate.fields {
|
||||||
fields[k+"_count"] = v.count
|
|
||||||
fields[k+"_min"] = v.min
|
if config.count {
|
||||||
fields[k+"_max"] = v.max
|
fields[k+"_count"] = v.count
|
||||||
fields[k+"_mean"] = v.mean
|
}
|
||||||
|
if config.min {
|
||||||
|
fields[k+"_min"] = v.min
|
||||||
|
}
|
||||||
|
if config.max {
|
||||||
|
fields[k+"_max"] = v.max
|
||||||
|
}
|
||||||
|
if config.mean {
|
||||||
|
fields[k+"_mean"] = v.mean
|
||||||
|
}
|
||||||
|
|
||||||
//v.count always >=1
|
//v.count always >=1
|
||||||
if v.count > 1 {
|
if v.count > 1 {
|
||||||
variance := v.M2 / (v.count - 1)
|
variance := v.M2 / (v.count - 1)
|
||||||
fields[k+"_s2"] = variance
|
|
||||||
fields[k+"_stdev"] = math.Sqrt(variance)
|
if config.variance {
|
||||||
|
fields[k+"_s2"] = variance
|
||||||
|
}
|
||||||
|
if config.stdev {
|
||||||
|
fields[k+"_stdev"] = math.Sqrt(variance)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
//if count == 1 StdDev = infinite => so I won't send data
|
//if count == 1 StdDev = infinite => so I won't send data
|
||||||
}
|
}
|
||||||
acc.AddFields(aggregate.name, fields, aggregate.tags)
|
|
||||||
|
if len(fields) > 0 {
|
||||||
|
acc.AddFields(aggregate.name, fields, aggregate.tags)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func parseStats(names []string) *configuredStats {
|
||||||
|
|
||||||
|
parsed := &configuredStats{}
|
||||||
|
|
||||||
|
for _, name := range names {
|
||||||
|
|
||||||
|
switch name {
|
||||||
|
|
||||||
|
case "count":
|
||||||
|
parsed.count = true
|
||||||
|
case "min":
|
||||||
|
parsed.min = true
|
||||||
|
case "max":
|
||||||
|
parsed.max = true
|
||||||
|
case "mean":
|
||||||
|
parsed.mean = true
|
||||||
|
case "s2":
|
||||||
|
parsed.variance = true
|
||||||
|
case "stdev":
|
||||||
|
parsed.stdev = true
|
||||||
|
|
||||||
|
default:
|
||||||
|
log.Printf("W! Unrecognized basic stat '%s', ignoring", name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return parsed
|
||||||
|
}
|
||||||
|
|
||||||
|
func defaultStats() *configuredStats {
|
||||||
|
|
||||||
|
defaults := &configuredStats{}
|
||||||
|
|
||||||
|
defaults.count = true
|
||||||
|
defaults.min = true
|
||||||
|
defaults.max = true
|
||||||
|
defaults.mean = true
|
||||||
|
defaults.variance = true
|
||||||
|
defaults.stdev = true
|
||||||
|
|
||||||
|
return defaults
|
||||||
|
}
|
||||||
|
|
||||||
|
func getConfiguredStats(m *BasicStats) *configuredStats {
|
||||||
|
|
||||||
|
if m.statsConfig == nil {
|
||||||
|
|
||||||
|
if m.Stats == nil {
|
||||||
|
m.statsConfig = defaultStats()
|
||||||
|
} else {
|
||||||
|
m.statsConfig = parseStats(m.Stats)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return m.statsConfig
|
||||||
|
}
|
||||||
|
|
||||||
func (m *BasicStats) Reset() {
|
func (m *BasicStats) Reset() {
|
||||||
m.cache = make(map[uint64]aggregate)
|
m.cache = make(map[uint64]aggregate)
|
||||||
}
|
}
|
||||||
|
|
|
@ -149,3 +149,211 @@ func TestBasicStatsDifferentPeriods(t *testing.T) {
|
||||||
}
|
}
|
||||||
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
|
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test only aggregating count
|
||||||
|
func TestBasicStatsWithOnlyCount(t *testing.T) {
|
||||||
|
|
||||||
|
aggregator := NewBasicStats()
|
||||||
|
aggregator.Stats = []string{"count"}
|
||||||
|
|
||||||
|
aggregator.Add(m1)
|
||||||
|
aggregator.Add(m2)
|
||||||
|
|
||||||
|
acc := testutil.Accumulator{}
|
||||||
|
aggregator.Push(&acc)
|
||||||
|
|
||||||
|
expectedFields := map[string]interface{}{
|
||||||
|
"a_count": float64(2),
|
||||||
|
"b_count": float64(2),
|
||||||
|
"c_count": float64(2),
|
||||||
|
"d_count": float64(2),
|
||||||
|
"e_count": float64(1),
|
||||||
|
}
|
||||||
|
expectedTags := map[string]string{
|
||||||
|
"foo": "bar",
|
||||||
|
}
|
||||||
|
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test only aggregating minimum
|
||||||
|
func TestBasicStatsWithOnlyMin(t *testing.T) {
|
||||||
|
|
||||||
|
aggregator := NewBasicStats()
|
||||||
|
aggregator.Stats = []string{"min"}
|
||||||
|
|
||||||
|
aggregator.Add(m1)
|
||||||
|
aggregator.Add(m2)
|
||||||
|
|
||||||
|
acc := testutil.Accumulator{}
|
||||||
|
aggregator.Push(&acc)
|
||||||
|
|
||||||
|
expectedFields := map[string]interface{}{
|
||||||
|
"a_min": float64(1),
|
||||||
|
"b_min": float64(1),
|
||||||
|
"c_min": float64(2),
|
||||||
|
"d_min": float64(2),
|
||||||
|
"e_min": float64(200),
|
||||||
|
}
|
||||||
|
expectedTags := map[string]string{
|
||||||
|
"foo": "bar",
|
||||||
|
}
|
||||||
|
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test only aggregating maximum
|
||||||
|
func TestBasicStatsWithOnlyMax(t *testing.T) {
|
||||||
|
|
||||||
|
aggregator := NewBasicStats()
|
||||||
|
aggregator.Stats = []string{"max"}
|
||||||
|
|
||||||
|
aggregator.Add(m1)
|
||||||
|
aggregator.Add(m2)
|
||||||
|
|
||||||
|
acc := testutil.Accumulator{}
|
||||||
|
aggregator.Push(&acc)
|
||||||
|
|
||||||
|
expectedFields := map[string]interface{}{
|
||||||
|
"a_max": float64(1),
|
||||||
|
"b_max": float64(3),
|
||||||
|
"c_max": float64(4),
|
||||||
|
"d_max": float64(6),
|
||||||
|
"e_max": float64(200),
|
||||||
|
}
|
||||||
|
expectedTags := map[string]string{
|
||||||
|
"foo": "bar",
|
||||||
|
}
|
||||||
|
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test only aggregating mean
|
||||||
|
func TestBasicStatsWithOnlyMean(t *testing.T) {
|
||||||
|
|
||||||
|
aggregator := NewBasicStats()
|
||||||
|
aggregator.Stats = []string{"mean"}
|
||||||
|
|
||||||
|
aggregator.Add(m1)
|
||||||
|
aggregator.Add(m2)
|
||||||
|
|
||||||
|
acc := testutil.Accumulator{}
|
||||||
|
aggregator.Push(&acc)
|
||||||
|
|
||||||
|
expectedFields := map[string]interface{}{
|
||||||
|
"a_mean": float64(1),
|
||||||
|
"b_mean": float64(2),
|
||||||
|
"c_mean": float64(3),
|
||||||
|
"d_mean": float64(4),
|
||||||
|
"e_mean": float64(200),
|
||||||
|
}
|
||||||
|
expectedTags := map[string]string{
|
||||||
|
"foo": "bar",
|
||||||
|
}
|
||||||
|
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test only aggregating variance
|
||||||
|
func TestBasicStatsWithOnlyVariance(t *testing.T) {
|
||||||
|
|
||||||
|
aggregator := NewBasicStats()
|
||||||
|
aggregator.Stats = []string{"s2"}
|
||||||
|
|
||||||
|
aggregator.Add(m1)
|
||||||
|
aggregator.Add(m2)
|
||||||
|
|
||||||
|
acc := testutil.Accumulator{}
|
||||||
|
aggregator.Push(&acc)
|
||||||
|
|
||||||
|
expectedFields := map[string]interface{}{
|
||||||
|
"a_s2": float64(0),
|
||||||
|
"b_s2": float64(2),
|
||||||
|
"c_s2": float64(2),
|
||||||
|
"d_s2": float64(8),
|
||||||
|
}
|
||||||
|
expectedTags := map[string]string{
|
||||||
|
"foo": "bar",
|
||||||
|
}
|
||||||
|
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test only aggregating standard deviation
|
||||||
|
func TestBasicStatsWithOnlyStandardDeviation(t *testing.T) {
|
||||||
|
|
||||||
|
aggregator := NewBasicStats()
|
||||||
|
aggregator.Stats = []string{"stdev"}
|
||||||
|
|
||||||
|
aggregator.Add(m1)
|
||||||
|
aggregator.Add(m2)
|
||||||
|
|
||||||
|
acc := testutil.Accumulator{}
|
||||||
|
aggregator.Push(&acc)
|
||||||
|
|
||||||
|
expectedFields := map[string]interface{}{
|
||||||
|
"a_stdev": float64(0),
|
||||||
|
"b_stdev": math.Sqrt(2),
|
||||||
|
"c_stdev": math.Sqrt(2),
|
||||||
|
"d_stdev": math.Sqrt(8),
|
||||||
|
}
|
||||||
|
expectedTags := map[string]string{
|
||||||
|
"foo": "bar",
|
||||||
|
}
|
||||||
|
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test only aggregating minimum and maximum
|
||||||
|
func TestBasicStatsWithMinAndMax(t *testing.T) {
|
||||||
|
|
||||||
|
aggregator := NewBasicStats()
|
||||||
|
aggregator.Stats = []string{"min", "max"}
|
||||||
|
|
||||||
|
aggregator.Add(m1)
|
||||||
|
aggregator.Add(m2)
|
||||||
|
|
||||||
|
acc := testutil.Accumulator{}
|
||||||
|
aggregator.Push(&acc)
|
||||||
|
|
||||||
|
expectedFields := map[string]interface{}{
|
||||||
|
"a_max": float64(1), //a
|
||||||
|
"a_min": float64(1),
|
||||||
|
"b_max": float64(3), //b
|
||||||
|
"b_min": float64(1),
|
||||||
|
"c_max": float64(4), //c
|
||||||
|
"c_min": float64(2),
|
||||||
|
"d_max": float64(6), //d
|
||||||
|
"d_min": float64(2),
|
||||||
|
"e_max": float64(200), //e
|
||||||
|
"e_min": float64(200),
|
||||||
|
}
|
||||||
|
expectedTags := map[string]string{
|
||||||
|
"foo": "bar",
|
||||||
|
}
|
||||||
|
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that if an empty array is passed, no points are pushed
|
||||||
|
func TestBasicStatsWithNoStats(t *testing.T) {
|
||||||
|
|
||||||
|
aggregator := NewBasicStats()
|
||||||
|
aggregator.Stats = []string{}
|
||||||
|
|
||||||
|
aggregator.Add(m1)
|
||||||
|
aggregator.Add(m2)
|
||||||
|
|
||||||
|
acc := testutil.Accumulator{}
|
||||||
|
aggregator.Push(&acc)
|
||||||
|
|
||||||
|
acc.AssertDoesNotContainMeasurement(t, "m1")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that if an unknown stat is configured, it doesn't explode
|
||||||
|
func TestBasicStatsWithUnknownStat(t *testing.T) {
|
||||||
|
|
||||||
|
aggregator := NewBasicStats()
|
||||||
|
aggregator.Stats = []string{"crazy"}
|
||||||
|
|
||||||
|
aggregator.Add(m1)
|
||||||
|
aggregator.Add(m2)
|
||||||
|
|
||||||
|
acc := testutil.Accumulator{}
|
||||||
|
aggregator.Push(&acc)
|
||||||
|
|
||||||
|
acc.AssertDoesNotContainMeasurement(t, "m1")
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue