Add control over which stats to gather in basicstats aggregator (#3580)
This commit is contained in:
		
							parent
							
								
									4f1ea13ebf
								
							
						
					
					
						commit
						fcc9c82d34
					
				|  | @ -8,14 +8,26 @@ emitting the aggregate every `period` seconds. | |||
| ```toml | ||||
| # Keep the aggregate basicstats of each metric passing through. | ||||
| [[aggregators.basicstats]] | ||||
| 
 | ||||
|   ## General Aggregator Arguments: | ||||
| 
 | ||||
|   ## The period on which to flush & clear the aggregator. | ||||
|   period = "30s" | ||||
| 
 | ||||
|   ## If true, the original metric will be dropped by the | ||||
|   ## aggregator and will not get sent to the output plugins. | ||||
|   drop_original = false | ||||
| 
 | ||||
|   ## BasicStats Arguments: | ||||
| 
 | ||||
|   ## Configures which basic stats to push as fields | ||||
|   stats = ["count","min","max","mean","stdev","s2"] | ||||
| ``` | ||||
| 
 | ||||
| - stats | ||||
|     - If not specified, all stats are aggregated and pushed as fields | ||||
|     - If empty array, no stats are aggregated | ||||
| 
 | ||||
| ### Measurements & Fields: | ||||
| 
 | ||||
| - measurement1 | ||||
|  |  | |||
|  | @ -1,6 +1,7 @@ | |||
| package basicstats | ||||
| 
 | ||||
| import ( | ||||
| 	"log" | ||||
| 	"math" | ||||
| 
 | ||||
| 	"github.com/influxdata/telegraf" | ||||
|  | @ -8,10 +9,22 @@ import ( | |||
| ) | ||||
| 
 | ||||
| type BasicStats struct { | ||||
| 	cache map[uint64]aggregate | ||||
| 	Stats []string `toml:"stats"` | ||||
| 
 | ||||
| 	cache       map[uint64]aggregate | ||||
| 	statsConfig *configuredStats | ||||
| } | ||||
| 
 | ||||
| func NewBasicStats() telegraf.Aggregator { | ||||
| type configuredStats struct { | ||||
| 	count    bool | ||||
| 	min      bool | ||||
| 	max      bool | ||||
| 	mean     bool | ||||
| 	variance bool | ||||
| 	stdev    bool | ||||
| } | ||||
| 
 | ||||
| func NewBasicStats() *BasicStats { | ||||
| 	mm := &BasicStats{} | ||||
| 	mm.Reset() | ||||
| 	return mm | ||||
|  | @ -114,25 +127,103 @@ func (m *BasicStats) Add(in telegraf.Metric) { | |||
| } | ||||
| 
 | ||||
| func (m *BasicStats) Push(acc telegraf.Accumulator) { | ||||
| 
 | ||||
| 	config := getConfiguredStats(m) | ||||
| 
 | ||||
| 	for _, aggregate := range m.cache { | ||||
| 		fields := map[string]interface{}{} | ||||
| 		for k, v := range aggregate.fields { | ||||
| 			fields[k+"_count"] = v.count | ||||
| 			fields[k+"_min"] = v.min | ||||
| 			fields[k+"_max"] = v.max | ||||
| 			fields[k+"_mean"] = v.mean | ||||
| 
 | ||||
| 			if config.count { | ||||
| 				fields[k+"_count"] = v.count | ||||
| 			} | ||||
| 			if config.min { | ||||
| 				fields[k+"_min"] = v.min | ||||
| 			} | ||||
| 			if config.max { | ||||
| 				fields[k+"_max"] = v.max | ||||
| 			} | ||||
| 			if config.mean { | ||||
| 				fields[k+"_mean"] = v.mean | ||||
| 			} | ||||
| 
 | ||||
| 			//v.count always >=1
 | ||||
| 			if v.count > 1 { | ||||
| 				variance := v.M2 / (v.count - 1) | ||||
| 				fields[k+"_s2"] = variance | ||||
| 				fields[k+"_stdev"] = math.Sqrt(variance) | ||||
| 
 | ||||
| 				if config.variance { | ||||
| 					fields[k+"_s2"] = variance | ||||
| 				} | ||||
| 				if config.stdev { | ||||
| 					fields[k+"_stdev"] = math.Sqrt(variance) | ||||
| 				} | ||||
| 			} | ||||
| 			//if count == 1 StdDev = infinite => so I won't send data
 | ||||
| 		} | ||||
| 		acc.AddFields(aggregate.name, fields, aggregate.tags) | ||||
| 
 | ||||
| 		if len(fields) > 0 { | ||||
| 			acc.AddFields(aggregate.name, fields, aggregate.tags) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func parseStats(names []string) *configuredStats { | ||||
| 
 | ||||
| 	parsed := &configuredStats{} | ||||
| 
 | ||||
| 	for _, name := range names { | ||||
| 
 | ||||
| 		switch name { | ||||
| 
 | ||||
| 		case "count": | ||||
| 			parsed.count = true | ||||
| 		case "min": | ||||
| 			parsed.min = true | ||||
| 		case "max": | ||||
| 			parsed.max = true | ||||
| 		case "mean": | ||||
| 			parsed.mean = true | ||||
| 		case "s2": | ||||
| 			parsed.variance = true | ||||
| 		case "stdev": | ||||
| 			parsed.stdev = true | ||||
| 
 | ||||
| 		default: | ||||
| 			log.Printf("W! Unrecognized basic stat '%s', ignoring", name) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return parsed | ||||
| } | ||||
| 
 | ||||
| func defaultStats() *configuredStats { | ||||
| 
 | ||||
| 	defaults := &configuredStats{} | ||||
| 
 | ||||
| 	defaults.count = true | ||||
| 	defaults.min = true | ||||
| 	defaults.max = true | ||||
| 	defaults.mean = true | ||||
| 	defaults.variance = true | ||||
| 	defaults.stdev = true | ||||
| 
 | ||||
| 	return defaults | ||||
| } | ||||
| 
 | ||||
| func getConfiguredStats(m *BasicStats) *configuredStats { | ||||
| 
 | ||||
| 	if m.statsConfig == nil { | ||||
| 
 | ||||
| 		if m.Stats == nil { | ||||
| 			m.statsConfig = defaultStats() | ||||
| 		} else { | ||||
| 			m.statsConfig = parseStats(m.Stats) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return m.statsConfig | ||||
| } | ||||
| 
 | ||||
| func (m *BasicStats) Reset() { | ||||
| 	m.cache = make(map[uint64]aggregate) | ||||
| } | ||||
|  |  | |||
|  | @ -149,3 +149,211 @@ func TestBasicStatsDifferentPeriods(t *testing.T) { | |||
| 	} | ||||
| 	acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||||
| } | ||||
| 
 | ||||
| // Test only aggregating count
 | ||||
| func TestBasicStatsWithOnlyCount(t *testing.T) { | ||||
| 
 | ||||
| 	aggregator := NewBasicStats() | ||||
| 	aggregator.Stats = []string{"count"} | ||||
| 
 | ||||
| 	aggregator.Add(m1) | ||||
| 	aggregator.Add(m2) | ||||
| 
 | ||||
| 	acc := testutil.Accumulator{} | ||||
| 	aggregator.Push(&acc) | ||||
| 
 | ||||
| 	expectedFields := map[string]interface{}{ | ||||
| 		"a_count": float64(2), | ||||
| 		"b_count": float64(2), | ||||
| 		"c_count": float64(2), | ||||
| 		"d_count": float64(2), | ||||
| 		"e_count": float64(1), | ||||
| 	} | ||||
| 	expectedTags := map[string]string{ | ||||
| 		"foo": "bar", | ||||
| 	} | ||||
| 	acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||||
| } | ||||
| 
 | ||||
| // Test only aggregating minimum
 | ||||
| func TestBasicStatsWithOnlyMin(t *testing.T) { | ||||
| 
 | ||||
| 	aggregator := NewBasicStats() | ||||
| 	aggregator.Stats = []string{"min"} | ||||
| 
 | ||||
| 	aggregator.Add(m1) | ||||
| 	aggregator.Add(m2) | ||||
| 
 | ||||
| 	acc := testutil.Accumulator{} | ||||
| 	aggregator.Push(&acc) | ||||
| 
 | ||||
| 	expectedFields := map[string]interface{}{ | ||||
| 		"a_min": float64(1), | ||||
| 		"b_min": float64(1), | ||||
| 		"c_min": float64(2), | ||||
| 		"d_min": float64(2), | ||||
| 		"e_min": float64(200), | ||||
| 	} | ||||
| 	expectedTags := map[string]string{ | ||||
| 		"foo": "bar", | ||||
| 	} | ||||
| 	acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||||
| } | ||||
| 
 | ||||
| // Test only aggregating maximum
 | ||||
| func TestBasicStatsWithOnlyMax(t *testing.T) { | ||||
| 
 | ||||
| 	aggregator := NewBasicStats() | ||||
| 	aggregator.Stats = []string{"max"} | ||||
| 
 | ||||
| 	aggregator.Add(m1) | ||||
| 	aggregator.Add(m2) | ||||
| 
 | ||||
| 	acc := testutil.Accumulator{} | ||||
| 	aggregator.Push(&acc) | ||||
| 
 | ||||
| 	expectedFields := map[string]interface{}{ | ||||
| 		"a_max": float64(1), | ||||
| 		"b_max": float64(3), | ||||
| 		"c_max": float64(4), | ||||
| 		"d_max": float64(6), | ||||
| 		"e_max": float64(200), | ||||
| 	} | ||||
| 	expectedTags := map[string]string{ | ||||
| 		"foo": "bar", | ||||
| 	} | ||||
| 	acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||||
| } | ||||
| 
 | ||||
| // Test only aggregating mean
 | ||||
| func TestBasicStatsWithOnlyMean(t *testing.T) { | ||||
| 
 | ||||
| 	aggregator := NewBasicStats() | ||||
| 	aggregator.Stats = []string{"mean"} | ||||
| 
 | ||||
| 	aggregator.Add(m1) | ||||
| 	aggregator.Add(m2) | ||||
| 
 | ||||
| 	acc := testutil.Accumulator{} | ||||
| 	aggregator.Push(&acc) | ||||
| 
 | ||||
| 	expectedFields := map[string]interface{}{ | ||||
| 		"a_mean": float64(1), | ||||
| 		"b_mean": float64(2), | ||||
| 		"c_mean": float64(3), | ||||
| 		"d_mean": float64(4), | ||||
| 		"e_mean": float64(200), | ||||
| 	} | ||||
| 	expectedTags := map[string]string{ | ||||
| 		"foo": "bar", | ||||
| 	} | ||||
| 	acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||||
| } | ||||
| 
 | ||||
| // Test only aggregating variance
 | ||||
| func TestBasicStatsWithOnlyVariance(t *testing.T) { | ||||
| 
 | ||||
| 	aggregator := NewBasicStats() | ||||
| 	aggregator.Stats = []string{"s2"} | ||||
| 
 | ||||
| 	aggregator.Add(m1) | ||||
| 	aggregator.Add(m2) | ||||
| 
 | ||||
| 	acc := testutil.Accumulator{} | ||||
| 	aggregator.Push(&acc) | ||||
| 
 | ||||
| 	expectedFields := map[string]interface{}{ | ||||
| 		"a_s2": float64(0), | ||||
| 		"b_s2": float64(2), | ||||
| 		"c_s2": float64(2), | ||||
| 		"d_s2": float64(8), | ||||
| 	} | ||||
| 	expectedTags := map[string]string{ | ||||
| 		"foo": "bar", | ||||
| 	} | ||||
| 	acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||||
| } | ||||
| 
 | ||||
| // Test only aggregating standard deviation
 | ||||
| func TestBasicStatsWithOnlyStandardDeviation(t *testing.T) { | ||||
| 
 | ||||
| 	aggregator := NewBasicStats() | ||||
| 	aggregator.Stats = []string{"stdev"} | ||||
| 
 | ||||
| 	aggregator.Add(m1) | ||||
| 	aggregator.Add(m2) | ||||
| 
 | ||||
| 	acc := testutil.Accumulator{} | ||||
| 	aggregator.Push(&acc) | ||||
| 
 | ||||
| 	expectedFields := map[string]interface{}{ | ||||
| 		"a_stdev": float64(0), | ||||
| 		"b_stdev": math.Sqrt(2), | ||||
| 		"c_stdev": math.Sqrt(2), | ||||
| 		"d_stdev": math.Sqrt(8), | ||||
| 	} | ||||
| 	expectedTags := map[string]string{ | ||||
| 		"foo": "bar", | ||||
| 	} | ||||
| 	acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||||
| } | ||||
| 
 | ||||
| // Test only aggregating minimum and maximum
 | ||||
| func TestBasicStatsWithMinAndMax(t *testing.T) { | ||||
| 
 | ||||
| 	aggregator := NewBasicStats() | ||||
| 	aggregator.Stats = []string{"min", "max"} | ||||
| 
 | ||||
| 	aggregator.Add(m1) | ||||
| 	aggregator.Add(m2) | ||||
| 
 | ||||
| 	acc := testutil.Accumulator{} | ||||
| 	aggregator.Push(&acc) | ||||
| 
 | ||||
| 	expectedFields := map[string]interface{}{ | ||||
| 		"a_max": float64(1), //a
 | ||||
| 		"a_min": float64(1), | ||||
| 		"b_max": float64(3), //b
 | ||||
| 		"b_min": float64(1), | ||||
| 		"c_max": float64(4), //c
 | ||||
| 		"c_min": float64(2), | ||||
| 		"d_max": float64(6), //d
 | ||||
| 		"d_min": float64(2), | ||||
| 		"e_max": float64(200), //e
 | ||||
| 		"e_min": float64(200), | ||||
| 	} | ||||
| 	expectedTags := map[string]string{ | ||||
| 		"foo": "bar", | ||||
| 	} | ||||
| 	acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||||
| } | ||||
| 
 | ||||
| // Test that if an empty array is passed, no points are pushed
 | ||||
| func TestBasicStatsWithNoStats(t *testing.T) { | ||||
| 
 | ||||
| 	aggregator := NewBasicStats() | ||||
| 	aggregator.Stats = []string{} | ||||
| 
 | ||||
| 	aggregator.Add(m1) | ||||
| 	aggregator.Add(m2) | ||||
| 
 | ||||
| 	acc := testutil.Accumulator{} | ||||
| 	aggregator.Push(&acc) | ||||
| 
 | ||||
| 	acc.AssertDoesNotContainMeasurement(t, "m1") | ||||
| } | ||||
| 
 | ||||
| // Test that if an unknown stat is configured, it doesn't explode
 | ||||
| func TestBasicStatsWithUnknownStat(t *testing.T) { | ||||
| 
 | ||||
| 	aggregator := NewBasicStats() | ||||
| 	aggregator.Stats = []string{"crazy"} | ||||
| 
 | ||||
| 	aggregator.Add(m1) | ||||
| 	aggregator.Add(m2) | ||||
| 
 | ||||
| 	acc := testutil.Accumulator{} | ||||
| 	aggregator.Push(&acc) | ||||
| 
 | ||||
| 	acc.AssertDoesNotContainMeasurement(t, "m1") | ||||
| } | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue