From 265a558958c877631032c272ab4ac65efaf497dd Mon Sep 17 00:00:00 2001 From: Vladislav Mugultyanov Date: Mon, 31 Jul 2017 21:33:51 +0300 Subject: [PATCH] Add histogram aggregator plugin (#2387) --- etc/telegraf.conf | 26 ++ plugins/aggregators/all/all.go | 1 + plugins/aggregators/histogram/README.md | 128 +++++++ plugins/aggregators/histogram/histogram.go | 315 ++++++++++++++++++ .../aggregators/histogram/histogram_test.go | 210 ++++++++++++ 5 files changed, 680 insertions(+) create mode 100644 plugins/aggregators/histogram/README.md create mode 100644 plugins/aggregators/histogram/histogram.go create mode 100644 plugins/aggregators/histogram/histogram_test.go diff --git a/etc/telegraf.conf b/etc/telegraf.conf index b7999b3e0..3e1395c02 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -606,6 +606,32 @@ # drop_original = false +# # Configuration for aggregate histogram metrics +# [[aggregators.histogram]] +# ## General Aggregator Arguments: +# ## The period on which to flush & clear the aggregator. +# period = "30s" +# ## If true, the original metric will be dropped by the +# ## aggregator and will not get sent to the output plugins. +# drop_original = false +# +# ## The example of config to aggregate histogram for all fields of specified metric. +# [[aggregators.histogram.config]] +# ## The set of buckets. +# buckets = [0.0, 15.6, 34.5, 49.1, 71.5, 80.5, 94.5, 100.0] +# ## The name of metric. +# metric_name = "cpu" +# +# ## The example of config to aggregate for specified fields of metric. +# [[aggregators.histogram.config]] +# ## The set of buckets. +# buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] +# ## The name of metric. +# metric_name = "diskio" +# ## The concrete fields of metric +# metric_fields = ["io_time", "read_time", "write_time"] + + ############################################################################### # INPUT PLUGINS # diff --git a/plugins/aggregators/all/all.go b/plugins/aggregators/all/all.go index 1041a0c9c..c4d430cc9 100644 --- a/plugins/aggregators/all/all.go +++ b/plugins/aggregators/all/all.go @@ -1,5 +1,6 @@ package all import ( + _ "github.com/influxdata/telegraf/plugins/aggregators/histogram" _ "github.com/influxdata/telegraf/plugins/aggregators/minmax" ) diff --git a/plugins/aggregators/histogram/README.md b/plugins/aggregators/histogram/README.md new file mode 100644 index 000000000..29b7a6dc0 --- /dev/null +++ b/plugins/aggregators/histogram/README.md @@ -0,0 +1,128 @@ +# Histogram Aggregator Plugin + +#### Goal + +This plugin was added for ability to build histograms. + +#### Description + +The histogram aggregator plugin aggregates values of specified metric's +fields. The metric is emitted every `period` seconds. All you need to do +is to specify borders of histogram buckets and fields, for which you want +to aggregate histogram. + +#### How it works + +The each metric is passed to the aggregator and this aggregator searches +histogram buckets for those fields, which have been specified in the +config. If buckets are found, the aggregator will put +1 to appropriate +bucket. Otherwise, nothing will happen. Every `period` seconds these data +will be pushed to output. + +Note, that the all hits of current bucket will be also added to all next +buckets in final result of distribution. Why does it work this way? In +configuration you define right borders for each bucket in a ascending +sequence. Internally buckets are presented as ranges with borders +(0..bucketBorder]: 0..1, 0..10, 0..50, …, 0..+Inf. So the value "+1" will be +put into those buckets, in which the metric value fell with such ranges of +buckets. + +This plugin creates cumulative histograms. It means, that the hits in the +buckets will always increase from the moment of telegraf start. But if you +restart telegraf, all hits in the buckets will be reset to 0. + +Also, the algorithm of hit counting to buckets was implemented on the base +of the algorithm, which is implemented in the Prometheus +[client](https://github.com/prometheus/client_golang/blob/master/prometheus/histogram.go). + +### Configuration + +```toml +# Configuration for aggregate histogram metrics +[[aggregators.histogram]] + ## General Aggregator Arguments: + ## The period on which to flush & clear the aggregator. + period = "30s" + ## If true, the original metric will be dropped by the + ## aggregator and will not get sent to the output plugins. + drop_original = false + + ## The example of config to aggregate histogram for all fields of specified metric. + [[aggregators.histogram.config]] + ## The set of buckets. + buckets = [0.0, 15.6, 34.5, 49.1, 71.5, 80.5, 94.5, 100.0] + ## The name of metric. + metric_name = "cpu" + + ## The example of config to aggregate histogram for concrete fields of specified metric. + [[aggregators.histogram.config]] + ## The set of buckets. + buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] + ## The name of metric. + metric_name = "diskio" + ## The concrete fields of metric. + metric_fields = ["io_time", "read_time", "write_time"] +``` + +#### Explanation + +The field `metric_fields` is the list of metric fields. For example, the +metric `cpu` has the following fields: usage_user, usage_system, +usage_idle, usage_nice, usage_iowait, usage_irq, usage_softirq, usage_steal, +usage_guest, usage_guest_nice. + +Note that histogram metrics will be pushed every `period` seconds. +As you know telegraf calls aggregator `Reset()` func each `period` seconds. +Histogram aggregator ignores `Reset()` and continues to count hits. + +#### Use cases + +You can specify fields using two cases: + + 1. The specifying only metric name. In this case all fields of metric + will be aggregated. + 2. The specifying metric name and concrete field. + +#### Some rules + + - The setting of each histogram must be in separate section with title + `aggregators.histogram.config`. + + - The each value of bucket must be float value. + + - Don\`t include the border bucket `+Inf`. It will be done automatically. + +### Measurements & Fields: + +The postfix `bucket` will be added to each field. + +- measurement1 + - field1_bucket + - field2_bucket + +### Tags: + +All measurements have tag `le`. This tag has the border value of bucket. It +means that the metric value is less or equal to the value of this tag. For +example, let assume that we have the metric value 10 and the following +buckets: [5, 10, 30, 70, 100]. Then the tag `le` will have the value 10, +because the metrics value is passed into bucket with right border value `10`. + +### Example Output: + +The following output will return to the Prometheus client. + +``` +cpu,cpu=cpu1,host=localhost,le=0.0 usage_idle_bucket=0i 1486998330000000000 +cpu,cpu=cpu1,host=localhost,le=10.0 usage_idle_bucket=0i 1486998330000000000 +cpu,cpu=cpu1,host=localhost,le=20.0 usage_idle_bucket=1i 1486998330000000000 +cpu,cpu=cpu1,host=localhost,le=30.0 usage_idle_bucket=2i 1486998330000000000 +cpu,cpu=cpu1,host=localhost,le=40.0 usage_idle_bucket=2i 1486998330000000000 +cpu,cpu=cpu1,host=localhost,le=50.0 usage_idle_bucket=2i 1486998330000000000 +cpu,cpu=cpu1,host=localhost,le=60.0 usage_idle_bucket=2i 1486998330000000000 +cpu,cpu=cpu1,host=localhost,le=70.0 usage_idle_bucket=2i 1486998330000000000 +cpu,cpu=cpu1,host=localhost,le=80.0 usage_idle_bucket=2i 1486998330000000000 +cpu,cpu=cpu1,host=localhost,le=90.0 usage_idle_bucket=2i 1486998330000000000 +cpu,cpu=cpu1,host=localhost,le=100.0 usage_idle_bucket=2i 1486998330000000000 +cpu,cpu=cpu1,host=localhost,le=+Inf usage_idle_bucket=2i 1486998330000000000 +``` diff --git a/plugins/aggregators/histogram/histogram.go b/plugins/aggregators/histogram/histogram.go new file mode 100644 index 000000000..491955339 --- /dev/null +++ b/plugins/aggregators/histogram/histogram.go @@ -0,0 +1,315 @@ +package histogram + +import ( + "sort" + "strconv" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/aggregators" +) + +// bucketTag is the tag, which contains right bucket border +const bucketTag = "le" + +// bucketInf is the right bucket border for infinite values +const bucketInf = "+Inf" + +// HistogramAggregator is aggregator with histogram configs and particular histograms for defined metrics +type HistogramAggregator struct { + Configs []config `toml:"config"` + + buckets bucketsByMetrics + cache map[uint64]metricHistogramCollection +} + +// config is the config, which contains name, field of metric and histogram buckets. +type config struct { + Metric string `toml:"metric_name"` + Fields []string `toml:"metric_fields"` + Buckets buckets `toml:"buckets"` +} + +// bucketsByMetrics contains the buckets grouped by metric and field name +type bucketsByMetrics map[string]bucketsByFields + +// bucketsByFields contains the buckets grouped by field name +type bucketsByFields map[string]buckets + +// buckets contains the right borders buckets +type buckets []float64 + +// metricHistogramCollection aggregates the histogram data +type metricHistogramCollection struct { + histogramCollection map[string]counts + name string + tags map[string]string +} + +// counts is the number of hits in the bucket +type counts []int64 + +// groupedByCountFields contains grouped fields by their count and fields values +type groupedByCountFields struct { + name string + tags map[string]string + fieldsWithCount map[string]int64 +} + +// NewHistogramAggregator creates new histogram aggregator +func NewHistogramAggregator() telegraf.Aggregator { + h := &HistogramAggregator{} + h.buckets = make(bucketsByMetrics) + h.resetCache() + + return h +} + +var sampleConfig = ` + ## General Aggregator Arguments: + ## The period on which to flush & clear the aggregator. + period = "30s" + ## If true, the original metric will be dropped by the + ## aggregator and will not get sent to the output plugins. + drop_original = false + + ## The example of config to aggregate histogram for all fields of specified metric. + [[aggregators.histogram.config]] + ## The set of buckets. + buckets = [0.0, 15.6, 34.5, 49.1, 71.5, 80.5, 94.5, 100.0] + ## The name of metric. + metric_name = "cpu" + + ## The example of config to aggregate for specified fields of metric. + [[aggregators.histogram.config]] + ## The set of buckets. + buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] + ## The name of metric. + metric_name = "diskio" + ## The concrete fields of metric + metric_fields = ["io_time", "read_time", "write_time"] +` + +// SampleConfig returns sample of config +func (h *HistogramAggregator) SampleConfig() string { + return sampleConfig +} + +// Description returns description of aggregator plugin +func (h *HistogramAggregator) Description() string { + return "Keep the aggregate histogram of each metric passing through." +} + +// Add adds new hit to the buckets +func (h *HistogramAggregator) Add(in telegraf.Metric) { + bucketsByField := make(map[string][]float64) + for field := range in.Fields() { + buckets := h.getBuckets(in.Name(), field) + if buckets != nil { + bucketsByField[field] = buckets + } + } + + if len(bucketsByField) == 0 { + return + } + + id := in.HashID() + agr, ok := h.cache[id] + if !ok { + agr = metricHistogramCollection{ + name: in.Name(), + tags: in.Tags(), + histogramCollection: make(map[string]counts), + } + } + + for field, value := range in.Fields() { + if buckets, ok := bucketsByField[field]; ok { + if agr.histogramCollection[field] == nil { + agr.histogramCollection[field] = make(counts, len(buckets)+1) + } + + if value, ok := convert(value); ok { + index := sort.SearchFloat64s(buckets, value) + agr.histogramCollection[field][index]++ + } + } + } + + h.cache[id] = agr +} + +// Push returns histogram values for metrics +func (h *HistogramAggregator) Push(acc telegraf.Accumulator) { + metricsWithGroupedFields := []groupedByCountFields{} + + for _, aggregate := range h.cache { + for field, counts := range aggregate.histogramCollection { + h.groupFieldsByBuckets(&metricsWithGroupedFields, aggregate.name, field, copyTags(aggregate.tags), counts) + } + } + + for _, metric := range metricsWithGroupedFields { + acc.AddFields(metric.name, makeFieldsWithCount(metric.fieldsWithCount), metric.tags) + } +} + +// groupFieldsByBuckets groups fields by metric buckets which are represented as tags +func (h *HistogramAggregator) groupFieldsByBuckets( + metricsWithGroupedFields *[]groupedByCountFields, + name string, + field string, + tags map[string]string, + counts []int64, +) { + count := int64(0) + for index, bucket := range h.getBuckets(name, field) { + count += counts[index] + + tags[bucketTag] = strconv.FormatFloat(bucket, 'f', -1, 64) + h.groupField(metricsWithGroupedFields, name, field, count, copyTags(tags)) + } + + count += counts[len(counts)-1] + tags[bucketTag] = bucketInf + + h.groupField(metricsWithGroupedFields, name, field, count, tags) +} + +// groupField groups field by count value +func (h *HistogramAggregator) groupField( + metricsWithGroupedFields *[]groupedByCountFields, + name string, + field string, + count int64, + tags map[string]string, +) { + for key, metric := range *metricsWithGroupedFields { + if name == metric.name && isTagsIdentical(tags, metric.tags) { + (*metricsWithGroupedFields)[key].fieldsWithCount[field] = count + return + } + } + + fieldsWithCount := map[string]int64{ + field: count, + } + + *metricsWithGroupedFields = append( + *metricsWithGroupedFields, + groupedByCountFields{name: name, tags: tags, fieldsWithCount: fieldsWithCount}, + ) +} + +// Reset does nothing, because we need to collect counts for a long time, otherwise if config parameter 'reset' has +// small value, we will get a histogram with a small amount of the distribution. +func (h *HistogramAggregator) Reset() {} + +// resetCache resets cached counts(hits) in the buckets +func (h *HistogramAggregator) resetCache() { + h.cache = make(map[uint64]metricHistogramCollection) +} + +// getBuckets finds buckets and returns them +func (h *HistogramAggregator) getBuckets(metric string, field string) []float64 { + if buckets, ok := h.buckets[metric][field]; ok { + return buckets + } + + for _, config := range h.Configs { + if config.Metric == metric { + if !isBucketExists(field, config) { + continue + } + + if _, ok := h.buckets[metric]; !ok { + h.buckets[metric] = make(bucketsByFields) + } + + h.buckets[metric][field] = sortBuckets(config.Buckets) + } + } + + return h.buckets[metric][field] +} + +// isBucketExists checks if buckets exists for the passed field +func isBucketExists(field string, cfg config) bool { + if len(cfg.Fields) == 0 { + return true + } + + for _, fl := range cfg.Fields { + if fl == field { + return true + } + } + + return false +} + +// sortBuckets sorts the buckets if it is needed +func sortBuckets(buckets []float64) []float64 { + for i, bucket := range buckets { + if i < len(buckets)-1 && bucket >= buckets[i+1] { + sort.Float64s(buckets) + break + } + } + + return buckets +} + +// convert converts interface to concrete type +func convert(in interface{}) (float64, bool) { + switch v := in.(type) { + case float64: + return v, true + case int64: + return float64(v), true + default: + return 0, false + } +} + +// copyTags copies tags +func copyTags(tags map[string]string) map[string]string { + copiedTags := map[string]string{} + for key, val := range tags { + copiedTags[key] = val + } + + return copiedTags +} + +// isTagsIdentical checks the identity of two list of tags +func isTagsIdentical(originalTags, checkedTags map[string]string) bool { + if len(originalTags) != len(checkedTags) { + return false + } + + for tagName, tagValue := range originalTags { + if tagValue != checkedTags[tagName] { + return false + } + } + + return true +} + +// makeFieldsWithCount assigns count value to all metric fields +func makeFieldsWithCount(fieldsWithCountIn map[string]int64) map[string]interface{} { + fieldsWithCountOut := map[string]interface{}{} + for field, count := range fieldsWithCountIn { + fieldsWithCountOut[field+"_bucket"] = count + } + + return fieldsWithCountOut +} + +// init initializes histogram aggregator plugin +func init() { + aggregators.Add("histogram", func() telegraf.Aggregator { + return NewHistogramAggregator() + }) +} diff --git a/plugins/aggregators/histogram/histogram_test.go b/plugins/aggregators/histogram/histogram_test.go new file mode 100644 index 000000000..8c4a2b9d3 --- /dev/null +++ b/plugins/aggregators/histogram/histogram_test.go @@ -0,0 +1,210 @@ +package histogram + +import ( + "fmt" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" +) + +// NewTestHistogram creates new test histogram aggregation with specified config +func NewTestHistogram(cfg []config) telegraf.Aggregator { + htm := &HistogramAggregator{Configs: cfg} + htm.buckets = make(bucketsByMetrics) + htm.resetCache() + + return htm +} + +// firstMetric1 is the first test metric +var firstMetric1, _ = metric.New( + "first_metric_name", + map[string]string{"tag_name": "tag_value"}, + map[string]interface{}{ + "a": float64(15.3), + "b": float64(40), + }, + time.Now(), +) + +// firstMetric1 is the first test metric with other value +var firstMetric2, _ = metric.New( + "first_metric_name", + map[string]string{"tag_name": "tag_value"}, + map[string]interface{}{ + "a": float64(15.9), + "c": float64(40), + }, + time.Now(), +) + +// secondMetric is the second metric +var secondMetric, _ = metric.New( + "second_metric_name", + map[string]string{"tag_name": "tag_value"}, + map[string]interface{}{ + "a": float64(105), + "ignoreme": "string", + "andme": true, + }, + time.Now(), +) + +// BenchmarkApply runs benchmarks +func BenchmarkApply(b *testing.B) { + histogram := NewHistogramAggregator() + + for n := 0; n < b.N; n++ { + histogram.Add(firstMetric1) + histogram.Add(firstMetric2) + histogram.Add(secondMetric) + } +} + +// TestHistogramWithPeriodAndOneField tests metrics for one period and for one field +func TestHistogramWithPeriodAndOneField(t *testing.T) { + var cfg []config + cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}}) + histogram := NewTestHistogram(cfg) + + acc := &testutil.Accumulator{} + + histogram.Add(firstMetric1) + histogram.Add(firstMetric2) + histogram.Push(acc) + + if len(acc.Metrics) != 6 { + assert.Fail(t, "Incorrect number of metrics") + } + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0)}, "0") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0)}, "10") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "20") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "30") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "40") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, bucketInf) +} + +// TestHistogramWithPeriodAndAllFields tests two metrics for one period and for all fields +func TestHistogramWithPeriodAndAllFields(t *testing.T) { + var cfg []config + cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 15.5, 20.0, 30.0, 40.0}}) + cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}}) + histogram := NewTestHistogram(cfg) + + acc := &testutil.Accumulator{} + + histogram.Add(firstMetric1) + histogram.Add(firstMetric2) + histogram.Add(secondMetric) + histogram.Push(acc) + + if len(acc.Metrics) != 12 { + assert.Fail(t, "Incorrect number of metrics") + } + + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0), "c_bucket": int64(0)}, "0") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(0), "c_bucket": int64(0)}, "15.5") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(0), "c_bucket": int64(0)}, "20") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(0), "c_bucket": int64(0)}, "30") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(1), "c_bucket": int64(1)}, "40") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(1), "c_bucket": int64(1)}, bucketInf) + + assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "0") + assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "4") + assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "10") + assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "23") + assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "30") + assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(1), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, bucketInf) +} + +// TestHistogramDifferentPeriodsAndAllFields tests two metrics getting added with a push/reset in between (simulates +// getting added in different periods) for all fields +func TestHistogramDifferentPeriodsAndAllFields(t *testing.T) { + + var cfg []config + cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}}) + histogram := NewTestHistogram(cfg) + + acc := &testutil.Accumulator{} + histogram.Add(firstMetric1) + histogram.Push(acc) + + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0)}, "0") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0)}, "10") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(0)}, "20") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(0)}, "30") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(1)}, "40") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(1)}, bucketInf) + + acc.ClearMetrics() + histogram.Add(firstMetric2) + histogram.Push(acc) + + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0), "c_bucket": int64(0)}, "0") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0), "c_bucket": int64(0)}, "10") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(0), "c_bucket": int64(0)}, "20") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(0), "c_bucket": int64(0)}, "30") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(1), "c_bucket": int64(1)}, "40") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(1), "c_bucket": int64(1)}, bucketInf) +} + +// TestWrongBucketsOrder tests the calling panic with incorrect order of buckets +func TestWrongBucketsOrder(t *testing.T) { + defer func() { + if r := recover(); r != nil { + assert.Equal( + t, + "histogram buckets must be in increasing order: 90.00 >= 20.00, metrics: first_metric_name, field: a", + fmt.Sprint(r), + ) + } + }() + + var cfg []config + cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 90.0, 20.0, 30.0, 40.0}}) + histogram := NewTestHistogram(cfg) + histogram.Add(firstMetric2) +} + +// assertContainsTaggedField is help functions to test histogram data +func assertContainsTaggedField(t *testing.T, acc *testutil.Accumulator, metricName string, fields map[string]interface{}, le string) { + acc.Lock() + defer acc.Unlock() + + for _, checkedMetric := range acc.Metrics { + // check metric name + if checkedMetric.Measurement != metricName { + continue + } + + // check "le" tag + if checkedMetric.Tags[bucketTag] != le { + continue + } + + // check fields + isFieldsIdentical := true + for field := range fields { + if _, ok := checkedMetric.Fields[field]; !ok { + isFieldsIdentical = false + break + } + } + if !isFieldsIdentical { + continue + } + + // check fields with their counts + if assert.Equal(t, fields, checkedMetric.Fields) { + return + } + + assert.Fail(t, fmt.Sprintf("incorrect fields %v of metric %s", fields, metricName)) + } + + assert.Fail(t, fmt.Sprintf("unknown measurement '%s' with tags: %v, fields: %v", metricName, map[string]string{"le": le}, fields)) +}