Add option to reset buckets on flush to histogram aggregator (#5641)
This commit is contained in:
parent
db1e902c81
commit
13a00eeca5
|
@ -7,8 +7,9 @@ Values added to a bucket are also added to the larger buckets in the
|
||||||
distribution. This creates a [cumulative histogram](https://en.wikipedia.org/wiki/Histogram#/media/File:Cumulative_vs_normal_histogram.svg).
|
distribution. This creates a [cumulative histogram](https://en.wikipedia.org/wiki/Histogram#/media/File:Cumulative_vs_normal_histogram.svg).
|
||||||
|
|
||||||
Like other Telegraf aggregators, the metric is emitted every `period` seconds.
|
Like other Telegraf aggregators, the metric is emitted every `period` seconds.
|
||||||
Bucket counts however are not reset between periods and will be non-strictly
|
By default bucket counts are not reset between periods and will be non-strictly
|
||||||
increasing while Telegraf is running.
|
increasing while Telegraf is running. This behavior can be changed by setting the
|
||||||
|
`reset` parameter to true.
|
||||||
|
|
||||||
#### Design
|
#### Design
|
||||||
|
|
||||||
|
@ -34,6 +35,10 @@ of the algorithm which is implemented in the Prometheus
|
||||||
## aggregator and will not get sent to the output plugins.
|
## aggregator and will not get sent to the output plugins.
|
||||||
drop_original = false
|
drop_original = false
|
||||||
|
|
||||||
|
## If true, the histogram will be reset on flush instead
|
||||||
|
## of accumulating the results.
|
||||||
|
reset = false
|
||||||
|
|
||||||
## Example config that aggregates all fields of the metric.
|
## Example config that aggregates all fields of the metric.
|
||||||
# [[aggregators.histogram.config]]
|
# [[aggregators.histogram.config]]
|
||||||
# ## The set of buckets.
|
# ## The set of buckets.
|
||||||
|
|
|
@ -17,6 +17,7 @@ const bucketInf = "+Inf"
|
||||||
// HistogramAggregator is aggregator with histogram configs and particular histograms for defined metrics
|
// HistogramAggregator is aggregator with histogram configs and particular histograms for defined metrics
|
||||||
type HistogramAggregator struct {
|
type HistogramAggregator struct {
|
||||||
Configs []config `toml:"config"`
|
Configs []config `toml:"config"`
|
||||||
|
ResetBuckets bool `toml:"reset"`
|
||||||
|
|
||||||
buckets bucketsByMetrics
|
buckets bucketsByMetrics
|
||||||
cache map[uint64]metricHistogramCollection
|
cache map[uint64]metricHistogramCollection
|
||||||
|
@ -72,6 +73,10 @@ var sampleConfig = `
|
||||||
## aggregator and will not get sent to the output plugins.
|
## aggregator and will not get sent to the output plugins.
|
||||||
drop_original = false
|
drop_original = false
|
||||||
|
|
||||||
|
## If true, the histogram will be reset on flush instead
|
||||||
|
## of accumulating the results.
|
||||||
|
reset = false
|
||||||
|
|
||||||
## Example config that aggregates all fields of the metric.
|
## Example config that aggregates all fields of the metric.
|
||||||
# [[aggregators.histogram.config]]
|
# [[aggregators.histogram.config]]
|
||||||
# ## The set of buckets.
|
# ## The set of buckets.
|
||||||
|
@ -201,9 +206,15 @@ func (h *HistogramAggregator) groupField(
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset does nothing, because we need to collect counts for a long time, otherwise if config parameter 'reset' has
|
// Reset does nothing by default, because we typically need to collect counts for a long time.
|
||||||
// small value, we will get a histogram with a small amount of the distribution.
|
// Otherwise if config parameter 'reset' has 'true' value, we will get a histogram
|
||||||
func (h *HistogramAggregator) Reset() {}
|
// with a small amount of the distribution. However in some use cases a reset is useful.
|
||||||
|
func (h *HistogramAggregator) Reset() {
|
||||||
|
if h.ResetBuckets {
|
||||||
|
h.resetCache()
|
||||||
|
h.buckets = make(bucketsByMetrics)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// resetCache resets cached counts(hits) in the buckets
|
// resetCache resets cached counts(hits) in the buckets
|
||||||
func (h *HistogramAggregator) resetCache() {
|
func (h *HistogramAggregator) resetCache() {
|
||||||
|
|
|
@ -12,8 +12,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewTestHistogram creates new test histogram aggregation with specified config
|
// NewTestHistogram creates new test histogram aggregation with specified config
|
||||||
func NewTestHistogram(cfg []config) telegraf.Aggregator {
|
func NewTestHistogram(cfg []config, reset bool) telegraf.Aggregator {
|
||||||
htm := &HistogramAggregator{Configs: cfg}
|
htm := &HistogramAggregator{Configs: cfg, ResetBuckets: reset}
|
||||||
htm.buckets = make(bucketsByMetrics)
|
htm.buckets = make(bucketsByMetrics)
|
||||||
htm.resetCache()
|
htm.resetCache()
|
||||||
|
|
||||||
|
@ -69,11 +69,12 @@ func BenchmarkApply(b *testing.B) {
|
||||||
func TestHistogramWithPeriodAndOneField(t *testing.T) {
|
func TestHistogramWithPeriodAndOneField(t *testing.T) {
|
||||||
var cfg []config
|
var cfg []config
|
||||||
cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
|
cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
|
||||||
histogram := NewTestHistogram(cfg)
|
histogram := NewTestHistogram(cfg, false)
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
|
|
||||||
histogram.Add(firstMetric1)
|
histogram.Add(firstMetric1)
|
||||||
|
histogram.Reset()
|
||||||
histogram.Add(firstMetric2)
|
histogram.Add(firstMetric2)
|
||||||
histogram.Push(acc)
|
histogram.Push(acc)
|
||||||
|
|
||||||
|
@ -88,12 +89,36 @@ func TestHistogramWithPeriodAndOneField(t *testing.T) {
|
||||||
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, bucketInf)
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, bucketInf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestHistogramWithPeriodAndOneField tests metrics for one period and for one field
|
||||||
|
func TestHistogramWithReset(t *testing.T) {
|
||||||
|
var cfg []config
|
||||||
|
cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
|
||||||
|
histogram := NewTestHistogram(cfg, true)
|
||||||
|
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
|
||||||
|
histogram.Add(firstMetric1)
|
||||||
|
histogram.Reset()
|
||||||
|
histogram.Add(firstMetric2)
|
||||||
|
histogram.Push(acc)
|
||||||
|
|
||||||
|
if len(acc.Metrics) != 6 {
|
||||||
|
assert.Fail(t, "Incorrect number of metrics")
|
||||||
|
}
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0)}, "0")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0)}, "10")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, "20")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, "30")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, "40")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, bucketInf)
|
||||||
|
}
|
||||||
|
|
||||||
// TestHistogramWithPeriodAndAllFields tests two metrics for one period and for all fields
|
// TestHistogramWithPeriodAndAllFields tests two metrics for one period and for all fields
|
||||||
func TestHistogramWithPeriodAndAllFields(t *testing.T) {
|
func TestHistogramWithPeriodAndAllFields(t *testing.T) {
|
||||||
var cfg []config
|
var cfg []config
|
||||||
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 15.5, 20.0, 30.0, 40.0}})
|
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 15.5, 20.0, 30.0, 40.0}})
|
||||||
cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}})
|
cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}})
|
||||||
histogram := NewTestHistogram(cfg)
|
histogram := NewTestHistogram(cfg, false)
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
|
|
||||||
|
@ -127,7 +152,7 @@ func TestHistogramDifferentPeriodsAndAllFields(t *testing.T) {
|
||||||
|
|
||||||
var cfg []config
|
var cfg []config
|
||||||
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
|
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
|
||||||
histogram := NewTestHistogram(cfg)
|
histogram := NewTestHistogram(cfg, false)
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
histogram.Add(firstMetric1)
|
histogram.Add(firstMetric1)
|
||||||
|
@ -166,7 +191,7 @@ func TestWrongBucketsOrder(t *testing.T) {
|
||||||
|
|
||||||
var cfg []config
|
var cfg []config
|
||||||
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 90.0, 20.0, 30.0, 40.0}})
|
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 90.0, 20.0, 30.0, 40.0}})
|
||||||
histogram := NewTestHistogram(cfg)
|
histogram := NewTestHistogram(cfg, false)
|
||||||
histogram.Add(firstMetric2)
|
histogram.Add(firstMetric2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue