From 13a00eeca54c90c70c76a475c92abd1dfa37e500 Mon Sep 17 00:00:00 2001 From: Olli-Pekka Lehto Date: Mon, 1 Apr 2019 13:53:50 -0500 Subject: [PATCH] Add option to reset buckets on flush to histogram aggregator (#5641) --- plugins/aggregators/histogram/README.md | 9 ++++- plugins/aggregators/histogram/histogram.go | 19 ++++++++-- .../aggregators/histogram/histogram_test.go | 37 ++++++++++++++++--- 3 files changed, 53 insertions(+), 12 deletions(-) diff --git a/plugins/aggregators/histogram/README.md b/plugins/aggregators/histogram/README.md index b4525681e..f9dafd789 100644 --- a/plugins/aggregators/histogram/README.md +++ b/plugins/aggregators/histogram/README.md @@ -7,8 +7,9 @@ Values added to a bucket are also added to the larger buckets in the distribution. This creates a [cumulative histogram](https://en.wikipedia.org/wiki/Histogram#/media/File:Cumulative_vs_normal_histogram.svg). Like other Telegraf aggregators, the metric is emitted every `period` seconds. -Bucket counts however are not reset between periods and will be non-strictly -increasing while Telegraf is running. +By default bucket counts are not reset between periods and will be non-strictly +increasing while Telegraf is running. This behavior can be changed by setting the +`reset` parameter to true. #### Design @@ -34,6 +35,10 @@ of the algorithm which is implemented in the Prometheus ## aggregator and will not get sent to the output plugins. drop_original = false + ## If true, the histogram will be reset on flush instead + ## of accumulating the results. + reset = false + ## Example config that aggregates all fields of the metric. # [[aggregators.histogram.config]] # ## The set of buckets. diff --git a/plugins/aggregators/histogram/histogram.go b/plugins/aggregators/histogram/histogram.go index a60cede3d..a565d8902 100644 --- a/plugins/aggregators/histogram/histogram.go +++ b/plugins/aggregators/histogram/histogram.go @@ -16,7 +16,8 @@ const bucketInf = "+Inf" // HistogramAggregator is aggregator with histogram configs and particular histograms for defined metrics type HistogramAggregator struct { - Configs []config `toml:"config"` + Configs []config `toml:"config"` + ResetBuckets bool `toml:"reset"` buckets bucketsByMetrics cache map[uint64]metricHistogramCollection @@ -72,6 +73,10 @@ var sampleConfig = ` ## aggregator and will not get sent to the output plugins. drop_original = false + ## If true, the histogram will be reset on flush instead + ## of accumulating the results. + reset = false + ## Example config that aggregates all fields of the metric. # [[aggregators.histogram.config]] # ## The set of buckets. @@ -201,9 +206,15 @@ func (h *HistogramAggregator) groupField( ) } -// Reset does nothing, because we need to collect counts for a long time, otherwise if config parameter 'reset' has -// small value, we will get a histogram with a small amount of the distribution. -func (h *HistogramAggregator) Reset() {} +// Reset does nothing by default, because we typically need to collect counts for a long time. +// Otherwise if config parameter 'reset' has 'true' value, we will get a histogram +// with a small amount of the distribution. However in some use cases a reset is useful. +func (h *HistogramAggregator) Reset() { + if h.ResetBuckets { + h.resetCache() + h.buckets = make(bucketsByMetrics) + } +} // resetCache resets cached counts(hits) in the buckets func (h *HistogramAggregator) resetCache() { diff --git a/plugins/aggregators/histogram/histogram_test.go b/plugins/aggregators/histogram/histogram_test.go index 8c4a2b9d3..694235831 100644 --- a/plugins/aggregators/histogram/histogram_test.go +++ b/plugins/aggregators/histogram/histogram_test.go @@ -12,8 +12,8 @@ import ( ) // NewTestHistogram creates new test histogram aggregation with specified config -func NewTestHistogram(cfg []config) telegraf.Aggregator { - htm := &HistogramAggregator{Configs: cfg} +func NewTestHistogram(cfg []config, reset bool) telegraf.Aggregator { + htm := &HistogramAggregator{Configs: cfg, ResetBuckets: reset} htm.buckets = make(bucketsByMetrics) htm.resetCache() @@ -69,11 +69,12 @@ func BenchmarkApply(b *testing.B) { func TestHistogramWithPeriodAndOneField(t *testing.T) { var cfg []config cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}}) - histogram := NewTestHistogram(cfg) + histogram := NewTestHistogram(cfg, false) acc := &testutil.Accumulator{} histogram.Add(firstMetric1) + histogram.Reset() histogram.Add(firstMetric2) histogram.Push(acc) @@ -88,12 +89,36 @@ func TestHistogramWithPeriodAndOneField(t *testing.T) { assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, bucketInf) } +// TestHistogramWithPeriodAndOneField tests metrics for one period and for one field +func TestHistogramWithReset(t *testing.T) { + var cfg []config + cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}}) + histogram := NewTestHistogram(cfg, true) + + acc := &testutil.Accumulator{} + + histogram.Add(firstMetric1) + histogram.Reset() + histogram.Add(firstMetric2) + histogram.Push(acc) + + if len(acc.Metrics) != 6 { + assert.Fail(t, "Incorrect number of metrics") + } + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0)}, "0") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0)}, "10") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, "20") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, "30") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, "40") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, bucketInf) +} + // TestHistogramWithPeriodAndAllFields tests two metrics for one period and for all fields func TestHistogramWithPeriodAndAllFields(t *testing.T) { var cfg []config cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 15.5, 20.0, 30.0, 40.0}}) cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}}) - histogram := NewTestHistogram(cfg) + histogram := NewTestHistogram(cfg, false) acc := &testutil.Accumulator{} @@ -127,7 +152,7 @@ func TestHistogramDifferentPeriodsAndAllFields(t *testing.T) { var cfg []config cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}}) - histogram := NewTestHistogram(cfg) + histogram := NewTestHistogram(cfg, false) acc := &testutil.Accumulator{} histogram.Add(firstMetric1) @@ -166,7 +191,7 @@ func TestWrongBucketsOrder(t *testing.T) { var cfg []config cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 90.0, 20.0, 30.0, 40.0}}) - histogram := NewTestHistogram(cfg) + histogram := NewTestHistogram(cfg, false) histogram.Add(firstMetric2) }