Add sum stat to basicstats aggregator (#3797)
This commit is contained in:
		
							parent
							
								
									00a52a67b9
								
							
						
					
					
						commit
						0a37386c5e
					
				|  | @ -1,6 +1,6 @@ | ||||||
| # BasicStats Aggregator Plugin | # BasicStats Aggregator Plugin | ||||||
| 
 | 
 | ||||||
| The BasicStats aggregator plugin give us count,max,min,mean,s2(variance), stdev for a set of values, | The BasicStats aggregator plugin give us count,max,min,mean,sum,s2(variance), stdev for a set of values, | ||||||
| emitting the aggregate every `period` seconds. | emitting the aggregate every `period` seconds. | ||||||
| 
 | 
 | ||||||
| ### Configuration: | ### Configuration: | ||||||
|  | @ -21,11 +21,11 @@ emitting the aggregate every `period` seconds. | ||||||
|   ## BasicStats Arguments: |   ## BasicStats Arguments: | ||||||
| 
 | 
 | ||||||
|   ## Configures which basic stats to push as fields |   ## Configures which basic stats to push as fields | ||||||
|   stats = ["count","min","max","mean","stdev","s2"] |   stats = ["count","min","max","mean","stdev","s2","sum"] | ||||||
| ``` | ``` | ||||||
| 
 | 
 | ||||||
| - stats | - stats | ||||||
|     - If not specified, all stats are aggregated and pushed as fields |     - If not specified, then `count`, `min`, `max`, `mean`, `stdev`, and `s2` are aggregated and pushed as fields.  `sum` is not aggregated by default to maintain backwards compatibility. | ||||||
|     - If empty array, no stats are aggregated |     - If empty array, no stats are aggregated | ||||||
| 
 | 
 | ||||||
| ### Measurements & Fields: | ### Measurements & Fields: | ||||||
|  | @ -35,6 +35,7 @@ emitting the aggregate every `period` seconds. | ||||||
|     - field1_max |     - field1_max | ||||||
|     - field1_min |     - field1_min | ||||||
|     - field1_mean |     - field1_mean | ||||||
|  |     - field1_sum | ||||||
|     - field1_s2 (variance) |     - field1_s2 (variance) | ||||||
|     - field1_stdev (standard deviation) |     - field1_stdev (standard deviation) | ||||||
| 
 | 
 | ||||||
|  | @ -48,8 +49,8 @@ No tags are applied by this aggregator. | ||||||
| $ telegraf --config telegraf.conf --quiet | $ telegraf --config telegraf.conf --quiet | ||||||
| system,host=tars load1=1 1475583980000000000 | system,host=tars load1=1 1475583980000000000 | ||||||
| system,host=tars load1=1 1475583990000000000 | system,host=tars load1=1 1475583990000000000 | ||||||
| system,host=tars load1_count=2,load1_max=1,load1_min=1,load1_mean=1,load1_s2=0,load1_stdev=0 1475584010000000000 | system,host=tars load1_count=2,load1_max=1,load1_min=1,load1_mean=1,load1_sum=2,load1_s2=0,load1_stdev=0 1475584010000000000 | ||||||
| system,host=tars load1=1 1475584020000000000 | system,host=tars load1=1 1475584020000000000 | ||||||
| system,host=tars load1=3 1475584030000000000 | system,host=tars load1=3 1475584030000000000 | ||||||
| system,host=tars load1_count=2,load1_max=3,load1_min=1,load1_mean=2,load1_s2=2,load1_stdev=1.414162 1475584010000000000 | system,host=tars load1_count=2,load1_max=3,load1_min=1,load1_mean=2,load1_sum=4,load1_s2=2,load1_stdev=1.414162 1475584010000000000 | ||||||
| ``` | ``` | ||||||
|  |  | ||||||
|  | @ -22,6 +22,7 @@ type configuredStats struct { | ||||||
| 	mean     bool | 	mean     bool | ||||||
| 	variance bool | 	variance bool | ||||||
| 	stdev    bool | 	stdev    bool | ||||||
|  | 	sum      bool | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func NewBasicStats() *BasicStats { | func NewBasicStats() *BasicStats { | ||||||
|  | @ -40,6 +41,7 @@ type basicstats struct { | ||||||
| 	count float64 | 	count float64 | ||||||
| 	min   float64 | 	min   float64 | ||||||
| 	max   float64 | 	max   float64 | ||||||
|  | 	sum   float64 | ||||||
| 	mean  float64 | 	mean  float64 | ||||||
| 	M2    float64 //intermedia value for variance/stdev
 | 	M2    float64 //intermedia value for variance/stdev
 | ||||||
| } | } | ||||||
|  | @ -77,6 +79,7 @@ func (m *BasicStats) Add(in telegraf.Metric) { | ||||||
| 					min:   fv, | 					min:   fv, | ||||||
| 					max:   fv, | 					max:   fv, | ||||||
| 					mean:  fv, | 					mean:  fv, | ||||||
|  | 					sum:   fv, | ||||||
| 					M2:    0.0, | 					M2:    0.0, | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
|  | @ -92,6 +95,7 @@ func (m *BasicStats) Add(in telegraf.Metric) { | ||||||
| 						min:   fv, | 						min:   fv, | ||||||
| 						max:   fv, | 						max:   fv, | ||||||
| 						mean:  fv, | 						mean:  fv, | ||||||
|  | 						sum:   fv, | ||||||
| 						M2:    0.0, | 						M2:    0.0, | ||||||
| 					} | 					} | ||||||
| 					continue | 					continue | ||||||
|  | @ -119,6 +123,8 @@ func (m *BasicStats) Add(in telegraf.Metric) { | ||||||
| 				} else if fv > tmp.max { | 				} else if fv > tmp.max { | ||||||
| 					tmp.max = fv | 					tmp.max = fv | ||||||
| 				} | 				} | ||||||
|  | 				//sum compute
 | ||||||
|  | 				tmp.sum += fv | ||||||
| 				//store final data
 | 				//store final data
 | ||||||
| 				m.cache[id].fields[k] = tmp | 				m.cache[id].fields[k] = tmp | ||||||
| 			} | 			} | ||||||
|  | @ -146,6 +152,9 @@ func (m *BasicStats) Push(acc telegraf.Accumulator) { | ||||||
| 			if config.mean { | 			if config.mean { | ||||||
| 				fields[k+"_mean"] = v.mean | 				fields[k+"_mean"] = v.mean | ||||||
| 			} | 			} | ||||||
|  | 			if config.sum { | ||||||
|  | 				fields[k+"_sum"] = v.sum | ||||||
|  | 			} | ||||||
| 
 | 
 | ||||||
| 			//v.count always >=1
 | 			//v.count always >=1
 | ||||||
| 			if v.count > 1 { | 			if v.count > 1 { | ||||||
|  | @ -187,6 +196,8 @@ func parseStats(names []string) *configuredStats { | ||||||
| 			parsed.variance = true | 			parsed.variance = true | ||||||
| 		case "stdev": | 		case "stdev": | ||||||
| 			parsed.stdev = true | 			parsed.stdev = true | ||||||
|  | 		case "sum": | ||||||
|  | 			parsed.sum = true | ||||||
| 
 | 
 | ||||||
| 		default: | 		default: | ||||||
| 			log.Printf("W! Unrecognized basic stat '%s', ignoring", name) | 			log.Printf("W! Unrecognized basic stat '%s', ignoring", name) | ||||||
|  | @ -206,6 +217,7 @@ func defaultStats() *configuredStats { | ||||||
| 	defaults.mean = true | 	defaults.mean = true | ||||||
| 	defaults.variance = true | 	defaults.variance = true | ||||||
| 	defaults.stdev = true | 	defaults.stdev = true | ||||||
|  | 	defaults.sum = false | ||||||
| 
 | 
 | ||||||
| 	return defaults | 	return defaults | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -7,6 +7,7 @@ import ( | ||||||
| 
 | 
 | ||||||
| 	"github.com/influxdata/telegraf/metric" | 	"github.com/influxdata/telegraf/metric" | ||||||
| 	"github.com/influxdata/telegraf/testutil" | 	"github.com/influxdata/telegraf/testutil" | ||||||
|  | 	"github.com/stretchr/testify/assert" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| var m1, _ = metric.New("m1", | var m1, _ = metric.New("m1", | ||||||
|  | @ -250,6 +251,83 @@ func TestBasicStatsWithOnlyMean(t *testing.T) { | ||||||
| 	acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | 	acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // Test only aggregating sum
 | ||||||
|  | func TestBasicStatsWithOnlySum(t *testing.T) { | ||||||
|  | 
 | ||||||
|  | 	aggregator := NewBasicStats() | ||||||
|  | 	aggregator.Stats = []string{"sum"} | ||||||
|  | 
 | ||||||
|  | 	aggregator.Add(m1) | ||||||
|  | 	aggregator.Add(m2) | ||||||
|  | 
 | ||||||
|  | 	acc := testutil.Accumulator{} | ||||||
|  | 	aggregator.Push(&acc) | ||||||
|  | 
 | ||||||
|  | 	expectedFields := map[string]interface{}{ | ||||||
|  | 		"a_sum": float64(2), | ||||||
|  | 		"b_sum": float64(4), | ||||||
|  | 		"c_sum": float64(6), | ||||||
|  | 		"d_sum": float64(8), | ||||||
|  | 		"e_sum": float64(200), | ||||||
|  | 	} | ||||||
|  | 	expectedTags := map[string]string{ | ||||||
|  | 		"foo": "bar", | ||||||
|  | 	} | ||||||
|  | 	acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Verify that sum doesn't suffer from floating point errors.  Early
 | ||||||
|  | // implementations of sum were calulated from mean and count, which
 | ||||||
|  | // e.g. summed "1, 1, 5, 1" as "7.999999..." instead of 8.
 | ||||||
|  | func TestBasicStatsWithOnlySumFloatingPointErrata(t *testing.T) { | ||||||
|  | 
 | ||||||
|  | 	var sum1, _ = metric.New("m1", | ||||||
|  | 		map[string]string{}, | ||||||
|  | 		map[string]interface{}{ | ||||||
|  | 			"a": int64(1), | ||||||
|  | 		}, | ||||||
|  | 		time.Now(), | ||||||
|  | 	) | ||||||
|  | 	var sum2, _ = metric.New("m1", | ||||||
|  | 		map[string]string{}, | ||||||
|  | 		map[string]interface{}{ | ||||||
|  | 			"a": int64(1), | ||||||
|  | 		}, | ||||||
|  | 		time.Now(), | ||||||
|  | 	) | ||||||
|  | 	var sum3, _ = metric.New("m1", | ||||||
|  | 		map[string]string{}, | ||||||
|  | 		map[string]interface{}{ | ||||||
|  | 			"a": int64(5), | ||||||
|  | 		}, | ||||||
|  | 		time.Now(), | ||||||
|  | 	) | ||||||
|  | 	var sum4, _ = metric.New("m1", | ||||||
|  | 		map[string]string{}, | ||||||
|  | 		map[string]interface{}{ | ||||||
|  | 			"a": int64(1), | ||||||
|  | 		}, | ||||||
|  | 		time.Now(), | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	aggregator := NewBasicStats() | ||||||
|  | 	aggregator.Stats = []string{"sum"} | ||||||
|  | 
 | ||||||
|  | 	aggregator.Add(sum1) | ||||||
|  | 	aggregator.Add(sum2) | ||||||
|  | 	aggregator.Add(sum3) | ||||||
|  | 	aggregator.Add(sum4) | ||||||
|  | 
 | ||||||
|  | 	acc := testutil.Accumulator{} | ||||||
|  | 	aggregator.Push(&acc) | ||||||
|  | 
 | ||||||
|  | 	expectedFields := map[string]interface{}{ | ||||||
|  | 		"a_sum": float64(8), | ||||||
|  | 	} | ||||||
|  | 	expectedTags := map[string]string{} | ||||||
|  | 	acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // Test only aggregating variance
 | // Test only aggregating variance
 | ||||||
| func TestBasicStatsWithOnlyVariance(t *testing.T) { | func TestBasicStatsWithOnlyVariance(t *testing.T) { | ||||||
| 
 | 
 | ||||||
|  | @ -328,6 +406,57 @@ func TestBasicStatsWithMinAndMax(t *testing.T) { | ||||||
| 	acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | 	acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // Test aggregating with all stats
 | ||||||
|  | func TestBasicStatsWithAllStats(t *testing.T) { | ||||||
|  | 	acc := testutil.Accumulator{} | ||||||
|  | 	minmax := NewBasicStats() | ||||||
|  | 	minmax.Stats = []string{"count", "min", "max", "mean", "stdev", "s2", "sum"} | ||||||
|  | 
 | ||||||
|  | 	minmax.Add(m1) | ||||||
|  | 	minmax.Add(m2) | ||||||
|  | 	minmax.Push(&acc) | ||||||
|  | 
 | ||||||
|  | 	expectedFields := map[string]interface{}{ | ||||||
|  | 		"a_count": float64(2), //a
 | ||||||
|  | 		"a_max":   float64(1), | ||||||
|  | 		"a_min":   float64(1), | ||||||
|  | 		"a_mean":  float64(1), | ||||||
|  | 		"a_stdev": float64(0), | ||||||
|  | 		"a_s2":    float64(0), | ||||||
|  | 		"a_sum":   float64(2), | ||||||
|  | 		"b_count": float64(2), //b
 | ||||||
|  | 		"b_max":   float64(3), | ||||||
|  | 		"b_min":   float64(1), | ||||||
|  | 		"b_mean":  float64(2), | ||||||
|  | 		"b_s2":    float64(2), | ||||||
|  | 		"b_sum":   float64(4), | ||||||
|  | 		"b_stdev": math.Sqrt(2), | ||||||
|  | 		"c_count": float64(2), //c
 | ||||||
|  | 		"c_max":   float64(4), | ||||||
|  | 		"c_min":   float64(2), | ||||||
|  | 		"c_mean":  float64(3), | ||||||
|  | 		"c_s2":    float64(2), | ||||||
|  | 		"c_stdev": math.Sqrt(2), | ||||||
|  | 		"c_sum":   float64(6), | ||||||
|  | 		"d_count": float64(2), //d
 | ||||||
|  | 		"d_max":   float64(6), | ||||||
|  | 		"d_min":   float64(2), | ||||||
|  | 		"d_mean":  float64(4), | ||||||
|  | 		"d_s2":    float64(8), | ||||||
|  | 		"d_stdev": math.Sqrt(8), | ||||||
|  | 		"d_sum":   float64(8), | ||||||
|  | 		"e_count": float64(1), //e
 | ||||||
|  | 		"e_max":   float64(200), | ||||||
|  | 		"e_min":   float64(200), | ||||||
|  | 		"e_mean":  float64(200), | ||||||
|  | 		"e_sum":   float64(200), | ||||||
|  | 	} | ||||||
|  | 	expectedTags := map[string]string{ | ||||||
|  | 		"foo": "bar", | ||||||
|  | 	} | ||||||
|  | 	acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // Test that if an empty array is passed, no points are pushed
 | // Test that if an empty array is passed, no points are pushed
 | ||||||
| func TestBasicStatsWithNoStats(t *testing.T) { | func TestBasicStatsWithNoStats(t *testing.T) { | ||||||
| 
 | 
 | ||||||
|  | @ -357,3 +486,26 @@ func TestBasicStatsWithUnknownStat(t *testing.T) { | ||||||
| 
 | 
 | ||||||
| 	acc.AssertDoesNotContainMeasurement(t, "m1") | 	acc.AssertDoesNotContainMeasurement(t, "m1") | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | // Test that if Stats isn't supplied, then we only do count, min, max, mean,
 | ||||||
|  | // stdev, and s2.  We purposely exclude sum for backwards compatability,
 | ||||||
|  | // otherwise user's working systems will suddenly (and surprisingly) start
 | ||||||
|  | // capturing sum without their input.
 | ||||||
|  | func TestBasicStatsWithDefaultStats(t *testing.T) { | ||||||
|  | 
 | ||||||
|  | 	aggregator := NewBasicStats() | ||||||
|  | 
 | ||||||
|  | 	aggregator.Add(m1) | ||||||
|  | 	aggregator.Add(m2) | ||||||
|  | 
 | ||||||
|  | 	acc := testutil.Accumulator{} | ||||||
|  | 	aggregator.Push(&acc) | ||||||
|  | 
 | ||||||
|  | 	assert.True(t, acc.HasField("m1", "a_count")) | ||||||
|  | 	assert.True(t, acc.HasField("m1", "a_min")) | ||||||
|  | 	assert.True(t, acc.HasField("m1", "a_max")) | ||||||
|  | 	assert.True(t, acc.HasField("m1", "a_mean")) | ||||||
|  | 	assert.True(t, acc.HasField("m1", "a_stdev")) | ||||||
|  | 	assert.True(t, acc.HasField("m1", "a_s2")) | ||||||
|  | 	assert.False(t, acc.HasField("m1", "a_sum")) | ||||||
|  | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue