Add non-cumulative histogram (#7071)

This commit is contained in:
Felix Edelmann 2020-03-02 19:59:19 +01:00 committed by GitHub
parent a6dc099be4
commit 32d80d2a08
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 228 additions and 115 deletions

View File

@ -3,8 +3,9 @@
The histogram aggregator plugin creates histograms containing the counts of The histogram aggregator plugin creates histograms containing the counts of
field values within a range. field values within a range.
Values added to a bucket are also added to the larger buckets in the If `cumulative` is set to true, values added to a bucket are also added to the
distribution. This creates a [cumulative histogram](https://en.wikipedia.org/wiki/Histogram#/media/File:Cumulative_vs_normal_histogram.svg). larger buckets in the distribution. This creates a [cumulative histogram](https://en.wikipedia.org/wiki/Histogram#/media/File:Cumulative_vs_normal_histogram.svg).
Otherwise, values are added to only one bucket, which creates an [ordinary histogram](https://en.wikipedia.org/wiki/Histogram#/media/File:Cumulative_vs_normal_histogram.svg)
Like other Telegraf aggregators, the metric is emitted every `period` seconds. Like other Telegraf aggregators, the metric is emitted every `period` seconds.
By default bucket counts are not reset between periods and will be non-strictly By default bucket counts are not reset between periods and will be non-strictly
@ -16,7 +17,7 @@ increasing while Telegraf is running. This behavior can be changed by setting th
Each metric is passed to the aggregator and this aggregator searches Each metric is passed to the aggregator and this aggregator searches
histogram buckets for those fields, which have been specified in the histogram buckets for those fields, which have been specified in the
config. If buckets are found, the aggregator will increment +1 to the appropriate config. If buckets are found, the aggregator will increment +1 to the appropriate
bucket otherwise it will be added to the `+Inf` bucket. Every `period` bucket. Otherwise, it will be added to the `+Inf` bucket. Every `period`
seconds this data will be forwarded to the outputs. seconds this data will be forwarded to the outputs.
The algorithm of hit counting to buckets was implemented on the base The algorithm of hit counting to buckets was implemented on the base
@ -39,16 +40,20 @@ of the algorithm which is implemented in the Prometheus
## of accumulating the results. ## of accumulating the results.
reset = false reset = false
## Whether bucket values should be accumulated. If set to false, "gt" tag will be added.
## Defaults to true.
cumulative = true
## Example config that aggregates all fields of the metric. ## Example config that aggregates all fields of the metric.
# [[aggregators.histogram.config]] # [[aggregators.histogram.config]]
# ## The set of buckets. # ## Right borders of buckets (with +Inf implicitly added).
# buckets = [0.0, 15.6, 34.5, 49.1, 71.5, 80.5, 94.5, 100.0] # buckets = [0.0, 15.6, 34.5, 49.1, 71.5, 80.5, 94.5, 100.0]
# ## The name of metric. # ## The name of metric.
# measurement_name = "cpu" # measurement_name = "cpu"
## Example config that aggregates only specific fields of the metric. ## Example config that aggregates only specific fields of the metric.
# [[aggregators.histogram.config]] # [[aggregators.histogram.config]]
# ## The set of buckets. # ## Right borders of buckets (with +Inf implicitly added).
# buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] # buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0]
# ## The name of metric. # ## The name of metric.
# measurement_name = "diskio" # measurement_name = "diskio"
@ -64,8 +69,9 @@ option. Optionally, if `fields` is set only the fields listed will be
aggregated. If `fields` is not set all fields are aggregated. aggregated. If `fields` is not set all fields are aggregated.
The `buckets` option contains a list of floats which specify the bucket The `buckets` option contains a list of floats which specify the bucket
boundaries. Each float value defines the inclusive upper bound of the bucket. boundaries. Each float value defines the inclusive upper (right) bound of the bucket.
The `+Inf` bucket is added automatically and does not need to be defined. The `+Inf` bucket is added automatically and does not need to be defined.
(For left boundaries, these specified bucket borders and `-Inf` will be used).
### Measurements & Fields: ### Measurements & Fields:
@ -77,26 +83,43 @@ The postfix `bucket` will be added to each field key.
### Tags: ### Tags:
All measurements are given the tag `le`. This tag has the border value of * `cumulative = true` (default):
bucket. It means that the metric value is less than or equal to the value of * `le`: Right bucket border. It means that the metric value is less than or
this tag. For example, let assume that we have the metric value 10 and the equal to the value of this tag. If a metric value is sorted into a bucket,
following buckets: [5, 10, 30, 70, 100]. Then the tag `le` will have the value it is also sorted into all larger buckets. As a result, the value of
10, because the metrics value is passed into bucket with right border value `<field>_bucket` is rising with rising `le` value. When `le` is `+Inf`,
`10`. the bucket value is the count of all metrics, because all metric values are
less than or equal to positive infinity.
* `cumulative = false`:
* `gt`: Left bucket border. It means that the metric value is greater than
(and not equal to) the value of this tag.
* `le`: Right bucket border. It means that the metric value is less than or
equal to the value of this tag.
* As both `gt` and `le` are present, each metric is sorted in only exactly
one bucket.
### Example Output: ### Example Output:
Let assume we have the buckets [0, 10, 50, 100] and the following field values
for `usage_idle`: [50, 7, 99, 12]
With `cumulative = true`:
``` ```
cpu,cpu=cpu1,host=localhost,le=0.0 usage_idle_bucket=0i 1486998330000000000 cpu,cpu=cpu1,host=localhost,le=0.0 usage_idle_bucket=0i 1486998330000000000 # none
cpu,cpu=cpu1,host=localhost,le=10.0 usage_idle_bucket=0i 1486998330000000000 cpu,cpu=cpu1,host=localhost,le=10.0 usage_idle_bucket=1i 1486998330000000000 # 7
cpu,cpu=cpu1,host=localhost,le=20.0 usage_idle_bucket=1i 1486998330000000000 cpu,cpu=cpu1,host=localhost,le=50.0 usage_idle_bucket=2i 1486998330000000000 # 7, 12
cpu,cpu=cpu1,host=localhost,le=30.0 usage_idle_bucket=2i 1486998330000000000 cpu,cpu=cpu1,host=localhost,le=100.0 usage_idle_bucket=4i 1486998330000000000 # 7, 12, 50, 99
cpu,cpu=cpu1,host=localhost,le=40.0 usage_idle_bucket=2i 1486998330000000000 cpu,cpu=cpu1,host=localhost,le=+Inf usage_idle_bucket=4i 1486998330000000000 # 7, 12, 50, 99
cpu,cpu=cpu1,host=localhost,le=50.0 usage_idle_bucket=2i 1486998330000000000 ```
cpu,cpu=cpu1,host=localhost,le=60.0 usage_idle_bucket=2i 1486998330000000000
cpu,cpu=cpu1,host=localhost,le=70.0 usage_idle_bucket=2i 1486998330000000000 With `cumulative = false`:
cpu,cpu=cpu1,host=localhost,le=80.0 usage_idle_bucket=2i 1486998330000000000
cpu,cpu=cpu1,host=localhost,le=90.0 usage_idle_bucket=2i 1486998330000000000 ```
cpu,cpu=cpu1,host=localhost,le=100.0 usage_idle_bucket=2i 1486998330000000000 cpu,cpu=cpu1,host=localhost,gt=-Inf,le=0.0 usage_idle_bucket=0i 1486998330000000000 # none
cpu,cpu=cpu1,host=localhost,le=+Inf usage_idle_bucket=2i 1486998330000000000 cpu,cpu=cpu1,host=localhost,gt=0.0,le=10.0 usage_idle_bucket=1i 1486998330000000000 # 7
cpu,cpu=cpu1,host=localhost,gt=10.0,le=50.0 usage_idle_bucket=1i 1486998330000000000 # 12
cpu,cpu=cpu1,host=localhost,gt=50.0,le=100.0 usage_idle_bucket=2i 1486998330000000000 # 50, 99
cpu,cpu=cpu1,host=localhost,gt=100.0,le=+Inf usage_idle_bucket=0i 1486998330000000000 # none
``` ```

View File

@ -8,16 +8,23 @@ import (
"github.com/influxdata/telegraf/plugins/aggregators" "github.com/influxdata/telegraf/plugins/aggregators"
) )
// bucketTag is the tag, which contains right bucket border // bucketRightTag is the tag, which contains right bucket border
const bucketTag = "le" const bucketRightTag = "le"
// bucketInf is the right bucket border for infinite values // bucketPosInf is the right bucket border for infinite values
const bucketInf = "+Inf" const bucketPosInf = "+Inf"
// bucketLeftTag is the tag, which contains left bucket border (exclusive)
const bucketLeftTag = "gt"
// bucketNegInf is the left bucket border for infinite values
const bucketNegInf = "-Inf"
// HistogramAggregator is aggregator with histogram configs and particular histograms for defined metrics // HistogramAggregator is aggregator with histogram configs and particular histograms for defined metrics
type HistogramAggregator struct { type HistogramAggregator struct {
Configs []config `toml:"config"` Configs []config `toml:"config"`
ResetBuckets bool `toml:"reset"` ResetBuckets bool `toml:"reset"`
Cumulative bool `toml:"cumulative"`
buckets bucketsByMetrics buckets bucketsByMetrics
cache map[uint64]metricHistogramCollection cache map[uint64]metricHistogramCollection
@ -57,8 +64,10 @@ type groupedByCountFields struct {
} }
// NewHistogramAggregator creates new histogram aggregator // NewHistogramAggregator creates new histogram aggregator
func NewHistogramAggregator() telegraf.Aggregator { func NewHistogramAggregator() *HistogramAggregator {
h := &HistogramAggregator{} h := &HistogramAggregator{
Cumulative: true,
}
h.buckets = make(bucketsByMetrics) h.buckets = make(bucketsByMetrics)
h.resetCache() h.resetCache()
@ -77,16 +86,20 @@ var sampleConfig = `
## of accumulating the results. ## of accumulating the results.
reset = false reset = false
## Whether bucket values should be accumulated. If set to false, "gt" tag will be added.
## Defaults to true.
cumulative = true
## Example config that aggregates all fields of the metric. ## Example config that aggregates all fields of the metric.
# [[aggregators.histogram.config]] # [[aggregators.histogram.config]]
# ## The set of buckets. # ## Right borders of buckets (with +Inf implicitly added).
# buckets = [0.0, 15.6, 34.5, 49.1, 71.5, 80.5, 94.5, 100.0] # buckets = [0.0, 15.6, 34.5, 49.1, 71.5, 80.5, 94.5, 100.0]
# ## The name of metric. # ## The name of metric.
# measurement_name = "cpu" # measurement_name = "cpu"
## Example config that aggregates only specific fields of the metric. ## Example config that aggregates only specific fields of the metric.
# [[aggregators.histogram.config]] # [[aggregators.histogram.config]]
# ## The set of buckets. # ## Right borders of buckets (with +Inf implicitly added).
# buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] # buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0]
# ## The name of metric. # ## The name of metric.
# measurement_name = "diskio" # measurement_name = "diskio"
@ -167,18 +180,27 @@ func (h *HistogramAggregator) groupFieldsByBuckets(
tags map[string]string, tags map[string]string,
counts []int64, counts []int64,
) { ) {
count := int64(0) sum := int64(0)
for index, bucket := range h.getBuckets(name, field) { buckets := h.getBuckets(name, field) // note that len(buckets) + 1 == len(counts)
count += counts[index]
tags[bucketTag] = strconv.FormatFloat(bucket, 'f', -1, 64) for index, count := range counts {
h.groupField(metricsWithGroupedFields, name, field, count, copyTags(tags)) if !h.Cumulative {
sum = 0 // reset sum -> don't store cumulative counts
tags[bucketLeftTag] = bucketNegInf
if index > 0 {
tags[bucketLeftTag] = strconv.FormatFloat(buckets[index-1], 'f', -1, 64)
}
}
tags[bucketRightTag] = bucketPosInf
if index < len(buckets) {
tags[bucketRightTag] = strconv.FormatFloat(buckets[index], 'f', -1, 64)
}
sum += count
h.groupField(metricsWithGroupedFields, name, field, sum, copyTags(tags))
} }
count += counts[len(counts)-1]
tags[bucketTag] = bucketInf
h.groupField(metricsWithGroupedFields, name, field, count, tags)
} }
// groupField groups field by count value // groupField groups field by count value

View File

@ -11,11 +11,15 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
type fields map[string]interface{}
type tags map[string]string
// NewTestHistogram creates new test histogram aggregation with specified config // NewTestHistogram creates new test histogram aggregation with specified config
func NewTestHistogram(cfg []config, reset bool) telegraf.Aggregator { func NewTestHistogram(cfg []config, reset bool, cumulative bool) telegraf.Aggregator {
htm := &HistogramAggregator{Configs: cfg, ResetBuckets: reset} htm := NewHistogramAggregator()
htm.buckets = make(bucketsByMetrics) htm.Configs = cfg
htm.resetCache() htm.ResetBuckets = reset
htm.Cumulative = cumulative
return htm return htm
} }
@ -23,8 +27,8 @@ func NewTestHistogram(cfg []config, reset bool) telegraf.Aggregator {
// firstMetric1 is the first test metric // firstMetric1 is the first test metric
var firstMetric1, _ = metric.New( var firstMetric1, _ = metric.New(
"first_metric_name", "first_metric_name",
map[string]string{"tag_name": "tag_value"}, tags{},
map[string]interface{}{ fields{
"a": float64(15.3), "a": float64(15.3),
"b": float64(40), "b": float64(40),
}, },
@ -34,8 +38,8 @@ var firstMetric1, _ = metric.New(
// firstMetric1 is the first test metric with other value // firstMetric1 is the first test metric with other value
var firstMetric2, _ = metric.New( var firstMetric2, _ = metric.New(
"first_metric_name", "first_metric_name",
map[string]string{"tag_name": "tag_value"}, tags{},
map[string]interface{}{ fields{
"a": float64(15.9), "a": float64(15.9),
"c": float64(40), "c": float64(40),
}, },
@ -45,8 +49,8 @@ var firstMetric2, _ = metric.New(
// secondMetric is the second metric // secondMetric is the second metric
var secondMetric, _ = metric.New( var secondMetric, _ = metric.New(
"second_metric_name", "second_metric_name",
map[string]string{"tag_name": "tag_value"}, tags{},
map[string]interface{}{ fields{
"a": float64(105), "a": float64(105),
"ignoreme": "string", "ignoreme": "string",
"andme": true, "andme": true,
@ -65,11 +69,11 @@ func BenchmarkApply(b *testing.B) {
} }
} }
// TestHistogramWithPeriodAndOneField tests metrics for one period and for one field // TestHistogram tests metrics for one period and for one field
func TestHistogramWithPeriodAndOneField(t *testing.T) { func TestHistogram(t *testing.T) {
var cfg []config var cfg []config
cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}}) cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
histogram := NewTestHistogram(cfg, false) histogram := NewTestHistogram(cfg, false, true)
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
@ -81,19 +85,43 @@ func TestHistogramWithPeriodAndOneField(t *testing.T) {
if len(acc.Metrics) != 6 { if len(acc.Metrics) != 6 {
assert.Fail(t, "Incorrect number of metrics") assert.Fail(t, "Incorrect number of metrics")
} }
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0)}, "0") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0)}, tags{bucketRightTag: "0"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0)}, "10") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0)}, tags{bucketRightTag: "10"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "20") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(2)}, tags{bucketRightTag: "20"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "30") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(2)}, tags{bucketRightTag: "30"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "40") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(2)}, tags{bucketRightTag: "40"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, bucketInf) assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(2)}, tags{bucketRightTag: bucketPosInf})
} }
// TestHistogramWithPeriodAndOneField tests metrics for one period and for one field // TestHistogramNonCumulative tests metrics for one period and for one field
func TestHistogramNonCumulative(t *testing.T) {
var cfg []config
cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
histogram := NewTestHistogram(cfg, false, false)
acc := &testutil.Accumulator{}
histogram.Add(firstMetric1)
histogram.Reset()
histogram.Add(firstMetric2)
histogram.Push(acc)
if len(acc.Metrics) != 6 {
assert.Fail(t, "Incorrect number of metrics")
}
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0)}, tags{bucketLeftTag: bucketNegInf, bucketRightTag: "0"})
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0)}, tags{bucketLeftTag: "0", bucketRightTag: "10"})
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(2)}, tags{bucketLeftTag: "10", bucketRightTag: "20"})
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0)}, tags{bucketLeftTag: "20", bucketRightTag: "30"})
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0)}, tags{bucketLeftTag: "30", bucketRightTag: "40"})
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0)}, tags{bucketLeftTag: "40", bucketRightTag: bucketPosInf})
}
// TestHistogramWithReset tests metrics for one period and for one field, with reset between metrics adding
func TestHistogramWithReset(t *testing.T) { func TestHistogramWithReset(t *testing.T) {
var cfg []config var cfg []config
cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}}) cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
histogram := NewTestHistogram(cfg, true) histogram := NewTestHistogram(cfg, true, true)
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
@ -105,20 +133,20 @@ func TestHistogramWithReset(t *testing.T) {
if len(acc.Metrics) != 6 { if len(acc.Metrics) != 6 {
assert.Fail(t, "Incorrect number of metrics") assert.Fail(t, "Incorrect number of metrics")
} }
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0)}, "0") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0)}, tags{bucketRightTag: "0"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0)}, "10") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0)}, tags{bucketRightTag: "10"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, "20") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(1)}, tags{bucketRightTag: "20"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, "30") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(1)}, tags{bucketRightTag: "30"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, "40") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(1)}, tags{bucketRightTag: "40"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, bucketInf) assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(1)}, tags{bucketRightTag: bucketPosInf})
} }
// TestHistogramWithPeriodAndAllFields tests two metrics for one period and for all fields // TestHistogramWithAllFields tests two metrics for one period and for all fields
func TestHistogramWithPeriodAndAllFields(t *testing.T) { func TestHistogramWithAllFields(t *testing.T) {
var cfg []config var cfg []config
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 15.5, 20.0, 30.0, 40.0}}) cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 15.5, 20.0, 30.0, 40.0}})
cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}}) cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}})
histogram := NewTestHistogram(cfg, false) histogram := NewTestHistogram(cfg, false, true)
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
@ -131,50 +159,83 @@ func TestHistogramWithPeriodAndAllFields(t *testing.T) {
assert.Fail(t, "Incorrect number of metrics") assert.Fail(t, "Incorrect number of metrics")
} }
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0), "c_bucket": int64(0)}, "0") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0), "b_bucket": int64(0), "c_bucket": int64(0)}, tags{bucketRightTag: "0"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(0), "c_bucket": int64(0)}, "15.5") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(1), "b_bucket": int64(0), "c_bucket": int64(0)}, tags{bucketRightTag: "15.5"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(0), "c_bucket": int64(0)}, "20") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(2), "b_bucket": int64(0), "c_bucket": int64(0)}, tags{bucketRightTag: "20"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(0), "c_bucket": int64(0)}, "30") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(2), "b_bucket": int64(0), "c_bucket": int64(0)}, tags{bucketRightTag: "30"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(1), "c_bucket": int64(1)}, "40") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(2), "b_bucket": int64(1), "c_bucket": int64(1)}, tags{bucketRightTag: "40"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(1), "c_bucket": int64(1)}, bucketInf) assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(2), "b_bucket": int64(1), "c_bucket": int64(1)}, tags{bucketRightTag: bucketPosInf})
assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "0") assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "0"})
assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "4") assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "4"})
assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "10") assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "10"})
assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "23") assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "23"})
assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "30") assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: "30"})
assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(1), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, bucketInf) assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(1), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketRightTag: bucketPosInf})
} }
// TestHistogramDifferentPeriodsAndAllFields tests two metrics getting added with a push/reset in between (simulates // TestHistogramWithAllFieldsNonCumulative tests two metrics for one period and for all fields
func TestHistogramWithAllFieldsNonCumulative(t *testing.T) {
var cfg []config
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 15.5, 20.0, 30.0, 40.0}})
cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}})
histogram := NewTestHistogram(cfg, false, false)
acc := &testutil.Accumulator{}
histogram.Add(firstMetric1)
histogram.Add(firstMetric2)
histogram.Add(secondMetric)
histogram.Push(acc)
if len(acc.Metrics) != 12 {
assert.Fail(t, "Incorrect number of metrics")
}
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0), "b_bucket": int64(0), "c_bucket": int64(0)}, tags{bucketLeftTag: bucketNegInf, bucketRightTag: "0"})
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(1), "b_bucket": int64(0), "c_bucket": int64(0)}, tags{bucketLeftTag: "0", bucketRightTag: "15.5"})
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(1), "b_bucket": int64(0), "c_bucket": int64(0)}, tags{bucketLeftTag: "15.5", bucketRightTag: "20"})
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0), "b_bucket": int64(0), "c_bucket": int64(0)}, tags{bucketLeftTag: "20", bucketRightTag: "30"})
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0), "b_bucket": int64(1), "c_bucket": int64(1)}, tags{bucketLeftTag: "30", bucketRightTag: "40"})
assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0), "b_bucket": int64(0), "c_bucket": int64(0)}, tags{bucketLeftTag: "40", bucketRightTag: bucketPosInf})
assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketLeftTag: bucketNegInf, bucketRightTag: "0"})
assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketLeftTag: "0", bucketRightTag: "4"})
assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketLeftTag: "4", bucketRightTag: "10"})
assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketLeftTag: "10", bucketRightTag: "23"})
assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketLeftTag: "23", bucketRightTag: "30"})
assertContainsTaggedField(t, acc, "second_metric_name", fields{"a_bucket": int64(1), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, tags{bucketLeftTag: "30", bucketRightTag: bucketPosInf})
}
// TestHistogramWithTwoPeriodsAndAllFields tests two metrics getting added with a push/reset in between (simulates
// getting added in different periods) for all fields // getting added in different periods) for all fields
func TestHistogramDifferentPeriodsAndAllFields(t *testing.T) { func TestHistogramWithTwoPeriodsAndAllFields(t *testing.T) {
var cfg []config var cfg []config
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}}) cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
histogram := NewTestHistogram(cfg, false) histogram := NewTestHistogram(cfg, false, true)
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
histogram.Add(firstMetric1) histogram.Add(firstMetric1)
histogram.Push(acc) histogram.Push(acc)
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0)}, "0") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0), "b_bucket": int64(0)}, tags{bucketRightTag: "0"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0)}, "10") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0), "b_bucket": int64(0)}, tags{bucketRightTag: "10"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(0)}, "20") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(1), "b_bucket": int64(0)}, tags{bucketRightTag: "20"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(0)}, "30") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(1), "b_bucket": int64(0)}, tags{bucketRightTag: "30"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(1)}, "40") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(1), "b_bucket": int64(1)}, tags{bucketRightTag: "40"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(1)}, bucketInf) assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(1), "b_bucket": int64(1)}, tags{bucketRightTag: bucketPosInf})
acc.ClearMetrics() acc.ClearMetrics()
histogram.Add(firstMetric2) histogram.Add(firstMetric2)
histogram.Push(acc) histogram.Push(acc)
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0), "c_bucket": int64(0)}, "0") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0), "b_bucket": int64(0), "c_bucket": int64(0)}, tags{bucketRightTag: "0"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0), "c_bucket": int64(0)}, "10") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(0), "b_bucket": int64(0), "c_bucket": int64(0)}, tags{bucketRightTag: "10"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(0), "c_bucket": int64(0)}, "20") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(2), "b_bucket": int64(0), "c_bucket": int64(0)}, tags{bucketRightTag: "20"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(0), "c_bucket": int64(0)}, "30") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(2), "b_bucket": int64(0), "c_bucket": int64(0)}, tags{bucketRightTag: "30"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(1), "c_bucket": int64(1)}, "40") assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(2), "b_bucket": int64(1), "c_bucket": int64(1)}, tags{bucketRightTag: "40"})
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(1), "c_bucket": int64(1)}, bucketInf) assertContainsTaggedField(t, acc, "first_metric_name", fields{"a_bucket": int64(2), "b_bucket": int64(1), "c_bucket": int64(1)}, tags{bucketRightTag: bucketPosInf})
} }
// TestWrongBucketsOrder tests the calling panic with incorrect order of buckets // TestWrongBucketsOrder tests the calling panic with incorrect order of buckets
@ -191,35 +252,42 @@ func TestWrongBucketsOrder(t *testing.T) {
var cfg []config var cfg []config
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 90.0, 20.0, 30.0, 40.0}}) cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 90.0, 20.0, 30.0, 40.0}})
histogram := NewTestHistogram(cfg, false) histogram := NewTestHistogram(cfg, false, true)
histogram.Add(firstMetric2) histogram.Add(firstMetric2)
} }
// assertContainsTaggedField is help functions to test histogram data // assertContainsTaggedField is help functions to test histogram data
func assertContainsTaggedField(t *testing.T, acc *testutil.Accumulator, metricName string, fields map[string]interface{}, le string) { func assertContainsTaggedField(t *testing.T, acc *testutil.Accumulator, metricName string, fields map[string]interface{}, tags map[string]string) {
acc.Lock() acc.Lock()
defer acc.Unlock() defer acc.Unlock()
for _, checkedMetric := range acc.Metrics { for _, checkedMetric := range acc.Metrics {
// check metric name // filter by metric name
if checkedMetric.Measurement != metricName { if checkedMetric.Measurement != metricName {
continue continue
} }
// check "le" tag // filter by tags
if checkedMetric.Tags[bucketTag] != le { isTagsIdentical := true
continue for tag := range tags {
} if val, ok := checkedMetric.Tags[tag]; !ok || val != tags[tag] {
isTagsIdentical = false
// check fields
isFieldsIdentical := true
for field := range fields {
if _, ok := checkedMetric.Fields[field]; !ok {
isFieldsIdentical = false
break break
} }
} }
if !isFieldsIdentical { if !isTagsIdentical {
continue
}
// filter by field keys
isFieldKeysIdentical := true
for field := range fields {
if _, ok := checkedMetric.Fields[field]; !ok {
isFieldKeysIdentical = false
break
}
}
if !isFieldKeysIdentical {
continue continue
} }
@ -228,8 +296,8 @@ func assertContainsTaggedField(t *testing.T, acc *testutil.Accumulator, metricNa
return return
} }
assert.Fail(t, fmt.Sprintf("incorrect fields %v of metric %s", fields, metricName)) assert.Fail(t, fmt.Sprintf("incorrect fields %v of metric %s", checkedMetric.Fields, metricName))
} }
assert.Fail(t, fmt.Sprintf("unknown measurement '%s' with tags: %v, fields: %v", metricName, map[string]string{"le": le}, fields)) assert.Fail(t, fmt.Sprintf("unknown measurement '%s' with tags: %v, fields: %v", metricName, tags, fields))
} }