Add histogram aggregator plugin (#2387)
This commit is contained in:
parent
9dad79eeb4
commit
265a558958
|
@ -606,6 +606,32 @@
|
||||||
# drop_original = false
|
# drop_original = false
|
||||||
|
|
||||||
|
|
||||||
|
# # Configuration for aggregate histogram metrics
|
||||||
|
# [[aggregators.histogram]]
|
||||||
|
# ## General Aggregator Arguments:
|
||||||
|
# ## The period on which to flush & clear the aggregator.
|
||||||
|
# period = "30s"
|
||||||
|
# ## If true, the original metric will be dropped by the
|
||||||
|
# ## aggregator and will not get sent to the output plugins.
|
||||||
|
# drop_original = false
|
||||||
|
#
|
||||||
|
# ## The example of config to aggregate histogram for all fields of specified metric.
|
||||||
|
# [[aggregators.histogram.config]]
|
||||||
|
# ## The set of buckets.
|
||||||
|
# buckets = [0.0, 15.6, 34.5, 49.1, 71.5, 80.5, 94.5, 100.0]
|
||||||
|
# ## The name of metric.
|
||||||
|
# metric_name = "cpu"
|
||||||
|
#
|
||||||
|
# ## The example of config to aggregate for specified fields of metric.
|
||||||
|
# [[aggregators.histogram.config]]
|
||||||
|
# ## The set of buckets.
|
||||||
|
# buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0]
|
||||||
|
# ## The name of metric.
|
||||||
|
# metric_name = "diskio"
|
||||||
|
# ## The concrete fields of metric
|
||||||
|
# metric_fields = ["io_time", "read_time", "write_time"]
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
###############################################################################
|
###############################################################################
|
||||||
# INPUT PLUGINS #
|
# INPUT PLUGINS #
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package all
|
package all
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/aggregators/histogram"
|
||||||
_ "github.com/influxdata/telegraf/plugins/aggregators/minmax"
|
_ "github.com/influxdata/telegraf/plugins/aggregators/minmax"
|
||||||
)
|
)
|
||||||
|
|
|
@ -0,0 +1,128 @@
|
||||||
|
# Histogram Aggregator Plugin
|
||||||
|
|
||||||
|
#### Goal
|
||||||
|
|
||||||
|
This plugin was added for ability to build histograms.
|
||||||
|
|
||||||
|
#### Description
|
||||||
|
|
||||||
|
The histogram aggregator plugin aggregates values of specified metric's
|
||||||
|
fields. The metric is emitted every `period` seconds. All you need to do
|
||||||
|
is to specify borders of histogram buckets and fields, for which you want
|
||||||
|
to aggregate histogram.
|
||||||
|
|
||||||
|
#### How it works
|
||||||
|
|
||||||
|
The each metric is passed to the aggregator and this aggregator searches
|
||||||
|
histogram buckets for those fields, which have been specified in the
|
||||||
|
config. If buckets are found, the aggregator will put +1 to appropriate
|
||||||
|
bucket. Otherwise, nothing will happen. Every `period` seconds these data
|
||||||
|
will be pushed to output.
|
||||||
|
|
||||||
|
Note, that the all hits of current bucket will be also added to all next
|
||||||
|
buckets in final result of distribution. Why does it work this way? In
|
||||||
|
configuration you define right borders for each bucket in a ascending
|
||||||
|
sequence. Internally buckets are presented as ranges with borders
|
||||||
|
(0..bucketBorder]: 0..1, 0..10, 0..50, …, 0..+Inf. So the value "+1" will be
|
||||||
|
put into those buckets, in which the metric value fell with such ranges of
|
||||||
|
buckets.
|
||||||
|
|
||||||
|
This plugin creates cumulative histograms. It means, that the hits in the
|
||||||
|
buckets will always increase from the moment of telegraf start. But if you
|
||||||
|
restart telegraf, all hits in the buckets will be reset to 0.
|
||||||
|
|
||||||
|
Also, the algorithm of hit counting to buckets was implemented on the base
|
||||||
|
of the algorithm, which is implemented in the Prometheus
|
||||||
|
[client](https://github.com/prometheus/client_golang/blob/master/prometheus/histogram.go).
|
||||||
|
|
||||||
|
### Configuration
|
||||||
|
|
||||||
|
```toml
|
||||||
|
# Configuration for aggregate histogram metrics
|
||||||
|
[[aggregators.histogram]]
|
||||||
|
## General Aggregator Arguments:
|
||||||
|
## The period on which to flush & clear the aggregator.
|
||||||
|
period = "30s"
|
||||||
|
## If true, the original metric will be dropped by the
|
||||||
|
## aggregator and will not get sent to the output plugins.
|
||||||
|
drop_original = false
|
||||||
|
|
||||||
|
## The example of config to aggregate histogram for all fields of specified metric.
|
||||||
|
[[aggregators.histogram.config]]
|
||||||
|
## The set of buckets.
|
||||||
|
buckets = [0.0, 15.6, 34.5, 49.1, 71.5, 80.5, 94.5, 100.0]
|
||||||
|
## The name of metric.
|
||||||
|
metric_name = "cpu"
|
||||||
|
|
||||||
|
## The example of config to aggregate histogram for concrete fields of specified metric.
|
||||||
|
[[aggregators.histogram.config]]
|
||||||
|
## The set of buckets.
|
||||||
|
buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0]
|
||||||
|
## The name of metric.
|
||||||
|
metric_name = "diskio"
|
||||||
|
## The concrete fields of metric.
|
||||||
|
metric_fields = ["io_time", "read_time", "write_time"]
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Explanation
|
||||||
|
|
||||||
|
The field `metric_fields` is the list of metric fields. For example, the
|
||||||
|
metric `cpu` has the following fields: usage_user, usage_system,
|
||||||
|
usage_idle, usage_nice, usage_iowait, usage_irq, usage_softirq, usage_steal,
|
||||||
|
usage_guest, usage_guest_nice.
|
||||||
|
|
||||||
|
Note that histogram metrics will be pushed every `period` seconds.
|
||||||
|
As you know telegraf calls aggregator `Reset()` func each `period` seconds.
|
||||||
|
Histogram aggregator ignores `Reset()` and continues to count hits.
|
||||||
|
|
||||||
|
#### Use cases
|
||||||
|
|
||||||
|
You can specify fields using two cases:
|
||||||
|
|
||||||
|
1. The specifying only metric name. In this case all fields of metric
|
||||||
|
will be aggregated.
|
||||||
|
2. The specifying metric name and concrete field.
|
||||||
|
|
||||||
|
#### Some rules
|
||||||
|
|
||||||
|
- The setting of each histogram must be in separate section with title
|
||||||
|
`aggregators.histogram.config`.
|
||||||
|
|
||||||
|
- The each value of bucket must be float value.
|
||||||
|
|
||||||
|
- Don\`t include the border bucket `+Inf`. It will be done automatically.
|
||||||
|
|
||||||
|
### Measurements & Fields:
|
||||||
|
|
||||||
|
The postfix `bucket` will be added to each field.
|
||||||
|
|
||||||
|
- measurement1
|
||||||
|
- field1_bucket
|
||||||
|
- field2_bucket
|
||||||
|
|
||||||
|
### Tags:
|
||||||
|
|
||||||
|
All measurements have tag `le`. This tag has the border value of bucket. It
|
||||||
|
means that the metric value is less or equal to the value of this tag. For
|
||||||
|
example, let assume that we have the metric value 10 and the following
|
||||||
|
buckets: [5, 10, 30, 70, 100]. Then the tag `le` will have the value 10,
|
||||||
|
because the metrics value is passed into bucket with right border value `10`.
|
||||||
|
|
||||||
|
### Example Output:
|
||||||
|
|
||||||
|
The following output will return to the Prometheus client.
|
||||||
|
|
||||||
|
```
|
||||||
|
cpu,cpu=cpu1,host=localhost,le=0.0 usage_idle_bucket=0i 1486998330000000000
|
||||||
|
cpu,cpu=cpu1,host=localhost,le=10.0 usage_idle_bucket=0i 1486998330000000000
|
||||||
|
cpu,cpu=cpu1,host=localhost,le=20.0 usage_idle_bucket=1i 1486998330000000000
|
||||||
|
cpu,cpu=cpu1,host=localhost,le=30.0 usage_idle_bucket=2i 1486998330000000000
|
||||||
|
cpu,cpu=cpu1,host=localhost,le=40.0 usage_idle_bucket=2i 1486998330000000000
|
||||||
|
cpu,cpu=cpu1,host=localhost,le=50.0 usage_idle_bucket=2i 1486998330000000000
|
||||||
|
cpu,cpu=cpu1,host=localhost,le=60.0 usage_idle_bucket=2i 1486998330000000000
|
||||||
|
cpu,cpu=cpu1,host=localhost,le=70.0 usage_idle_bucket=2i 1486998330000000000
|
||||||
|
cpu,cpu=cpu1,host=localhost,le=80.0 usage_idle_bucket=2i 1486998330000000000
|
||||||
|
cpu,cpu=cpu1,host=localhost,le=90.0 usage_idle_bucket=2i 1486998330000000000
|
||||||
|
cpu,cpu=cpu1,host=localhost,le=100.0 usage_idle_bucket=2i 1486998330000000000
|
||||||
|
cpu,cpu=cpu1,host=localhost,le=+Inf usage_idle_bucket=2i 1486998330000000000
|
||||||
|
```
|
|
@ -0,0 +1,315 @@
|
||||||
|
package histogram
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sort"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/plugins/aggregators"
|
||||||
|
)
|
||||||
|
|
||||||
|
// bucketTag is the tag, which contains right bucket border
|
||||||
|
const bucketTag = "le"
|
||||||
|
|
||||||
|
// bucketInf is the right bucket border for infinite values
|
||||||
|
const bucketInf = "+Inf"
|
||||||
|
|
||||||
|
// HistogramAggregator is aggregator with histogram configs and particular histograms for defined metrics
|
||||||
|
type HistogramAggregator struct {
|
||||||
|
Configs []config `toml:"config"`
|
||||||
|
|
||||||
|
buckets bucketsByMetrics
|
||||||
|
cache map[uint64]metricHistogramCollection
|
||||||
|
}
|
||||||
|
|
||||||
|
// config is the config, which contains name, field of metric and histogram buckets.
|
||||||
|
type config struct {
|
||||||
|
Metric string `toml:"metric_name"`
|
||||||
|
Fields []string `toml:"metric_fields"`
|
||||||
|
Buckets buckets `toml:"buckets"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// bucketsByMetrics contains the buckets grouped by metric and field name
|
||||||
|
type bucketsByMetrics map[string]bucketsByFields
|
||||||
|
|
||||||
|
// bucketsByFields contains the buckets grouped by field name
|
||||||
|
type bucketsByFields map[string]buckets
|
||||||
|
|
||||||
|
// buckets contains the right borders buckets
|
||||||
|
type buckets []float64
|
||||||
|
|
||||||
|
// metricHistogramCollection aggregates the histogram data
|
||||||
|
type metricHistogramCollection struct {
|
||||||
|
histogramCollection map[string]counts
|
||||||
|
name string
|
||||||
|
tags map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
// counts is the number of hits in the bucket
|
||||||
|
type counts []int64
|
||||||
|
|
||||||
|
// groupedByCountFields contains grouped fields by their count and fields values
|
||||||
|
type groupedByCountFields struct {
|
||||||
|
name string
|
||||||
|
tags map[string]string
|
||||||
|
fieldsWithCount map[string]int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewHistogramAggregator creates new histogram aggregator
|
||||||
|
func NewHistogramAggregator() telegraf.Aggregator {
|
||||||
|
h := &HistogramAggregator{}
|
||||||
|
h.buckets = make(bucketsByMetrics)
|
||||||
|
h.resetCache()
|
||||||
|
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
|
var sampleConfig = `
|
||||||
|
## General Aggregator Arguments:
|
||||||
|
## The period on which to flush & clear the aggregator.
|
||||||
|
period = "30s"
|
||||||
|
## If true, the original metric will be dropped by the
|
||||||
|
## aggregator and will not get sent to the output plugins.
|
||||||
|
drop_original = false
|
||||||
|
|
||||||
|
## The example of config to aggregate histogram for all fields of specified metric.
|
||||||
|
[[aggregators.histogram.config]]
|
||||||
|
## The set of buckets.
|
||||||
|
buckets = [0.0, 15.6, 34.5, 49.1, 71.5, 80.5, 94.5, 100.0]
|
||||||
|
## The name of metric.
|
||||||
|
metric_name = "cpu"
|
||||||
|
|
||||||
|
## The example of config to aggregate for specified fields of metric.
|
||||||
|
[[aggregators.histogram.config]]
|
||||||
|
## The set of buckets.
|
||||||
|
buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0]
|
||||||
|
## The name of metric.
|
||||||
|
metric_name = "diskio"
|
||||||
|
## The concrete fields of metric
|
||||||
|
metric_fields = ["io_time", "read_time", "write_time"]
|
||||||
|
`
|
||||||
|
|
||||||
|
// SampleConfig returns sample of config
|
||||||
|
func (h *HistogramAggregator) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
// Description returns description of aggregator plugin
|
||||||
|
func (h *HistogramAggregator) Description() string {
|
||||||
|
return "Keep the aggregate histogram of each metric passing through."
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add adds new hit to the buckets
|
||||||
|
func (h *HistogramAggregator) Add(in telegraf.Metric) {
|
||||||
|
bucketsByField := make(map[string][]float64)
|
||||||
|
for field := range in.Fields() {
|
||||||
|
buckets := h.getBuckets(in.Name(), field)
|
||||||
|
if buckets != nil {
|
||||||
|
bucketsByField[field] = buckets
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(bucketsByField) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
id := in.HashID()
|
||||||
|
agr, ok := h.cache[id]
|
||||||
|
if !ok {
|
||||||
|
agr = metricHistogramCollection{
|
||||||
|
name: in.Name(),
|
||||||
|
tags: in.Tags(),
|
||||||
|
histogramCollection: make(map[string]counts),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for field, value := range in.Fields() {
|
||||||
|
if buckets, ok := bucketsByField[field]; ok {
|
||||||
|
if agr.histogramCollection[field] == nil {
|
||||||
|
agr.histogramCollection[field] = make(counts, len(buckets)+1)
|
||||||
|
}
|
||||||
|
|
||||||
|
if value, ok := convert(value); ok {
|
||||||
|
index := sort.SearchFloat64s(buckets, value)
|
||||||
|
agr.histogramCollection[field][index]++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
h.cache[id] = agr
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push returns histogram values for metrics
|
||||||
|
func (h *HistogramAggregator) Push(acc telegraf.Accumulator) {
|
||||||
|
metricsWithGroupedFields := []groupedByCountFields{}
|
||||||
|
|
||||||
|
for _, aggregate := range h.cache {
|
||||||
|
for field, counts := range aggregate.histogramCollection {
|
||||||
|
h.groupFieldsByBuckets(&metricsWithGroupedFields, aggregate.name, field, copyTags(aggregate.tags), counts)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, metric := range metricsWithGroupedFields {
|
||||||
|
acc.AddFields(metric.name, makeFieldsWithCount(metric.fieldsWithCount), metric.tags)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// groupFieldsByBuckets groups fields by metric buckets which are represented as tags
|
||||||
|
func (h *HistogramAggregator) groupFieldsByBuckets(
|
||||||
|
metricsWithGroupedFields *[]groupedByCountFields,
|
||||||
|
name string,
|
||||||
|
field string,
|
||||||
|
tags map[string]string,
|
||||||
|
counts []int64,
|
||||||
|
) {
|
||||||
|
count := int64(0)
|
||||||
|
for index, bucket := range h.getBuckets(name, field) {
|
||||||
|
count += counts[index]
|
||||||
|
|
||||||
|
tags[bucketTag] = strconv.FormatFloat(bucket, 'f', -1, 64)
|
||||||
|
h.groupField(metricsWithGroupedFields, name, field, count, copyTags(tags))
|
||||||
|
}
|
||||||
|
|
||||||
|
count += counts[len(counts)-1]
|
||||||
|
tags[bucketTag] = bucketInf
|
||||||
|
|
||||||
|
h.groupField(metricsWithGroupedFields, name, field, count, tags)
|
||||||
|
}
|
||||||
|
|
||||||
|
// groupField groups field by count value
|
||||||
|
func (h *HistogramAggregator) groupField(
|
||||||
|
metricsWithGroupedFields *[]groupedByCountFields,
|
||||||
|
name string,
|
||||||
|
field string,
|
||||||
|
count int64,
|
||||||
|
tags map[string]string,
|
||||||
|
) {
|
||||||
|
for key, metric := range *metricsWithGroupedFields {
|
||||||
|
if name == metric.name && isTagsIdentical(tags, metric.tags) {
|
||||||
|
(*metricsWithGroupedFields)[key].fieldsWithCount[field] = count
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fieldsWithCount := map[string]int64{
|
||||||
|
field: count,
|
||||||
|
}
|
||||||
|
|
||||||
|
*metricsWithGroupedFields = append(
|
||||||
|
*metricsWithGroupedFields,
|
||||||
|
groupedByCountFields{name: name, tags: tags, fieldsWithCount: fieldsWithCount},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset does nothing, because we need to collect counts for a long time, otherwise if config parameter 'reset' has
|
||||||
|
// small value, we will get a histogram with a small amount of the distribution.
|
||||||
|
func (h *HistogramAggregator) Reset() {}
|
||||||
|
|
||||||
|
// resetCache resets cached counts(hits) in the buckets
|
||||||
|
func (h *HistogramAggregator) resetCache() {
|
||||||
|
h.cache = make(map[uint64]metricHistogramCollection)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getBuckets finds buckets and returns them
|
||||||
|
func (h *HistogramAggregator) getBuckets(metric string, field string) []float64 {
|
||||||
|
if buckets, ok := h.buckets[metric][field]; ok {
|
||||||
|
return buckets
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, config := range h.Configs {
|
||||||
|
if config.Metric == metric {
|
||||||
|
if !isBucketExists(field, config) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := h.buckets[metric]; !ok {
|
||||||
|
h.buckets[metric] = make(bucketsByFields)
|
||||||
|
}
|
||||||
|
|
||||||
|
h.buckets[metric][field] = sortBuckets(config.Buckets)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return h.buckets[metric][field]
|
||||||
|
}
|
||||||
|
|
||||||
|
// isBucketExists checks if buckets exists for the passed field
|
||||||
|
func isBucketExists(field string, cfg config) bool {
|
||||||
|
if len(cfg.Fields) == 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, fl := range cfg.Fields {
|
||||||
|
if fl == field {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// sortBuckets sorts the buckets if it is needed
|
||||||
|
func sortBuckets(buckets []float64) []float64 {
|
||||||
|
for i, bucket := range buckets {
|
||||||
|
if i < len(buckets)-1 && bucket >= buckets[i+1] {
|
||||||
|
sort.Float64s(buckets)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return buckets
|
||||||
|
}
|
||||||
|
|
||||||
|
// convert converts interface to concrete type
|
||||||
|
func convert(in interface{}) (float64, bool) {
|
||||||
|
switch v := in.(type) {
|
||||||
|
case float64:
|
||||||
|
return v, true
|
||||||
|
case int64:
|
||||||
|
return float64(v), true
|
||||||
|
default:
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// copyTags copies tags
|
||||||
|
func copyTags(tags map[string]string) map[string]string {
|
||||||
|
copiedTags := map[string]string{}
|
||||||
|
for key, val := range tags {
|
||||||
|
copiedTags[key] = val
|
||||||
|
}
|
||||||
|
|
||||||
|
return copiedTags
|
||||||
|
}
|
||||||
|
|
||||||
|
// isTagsIdentical checks the identity of two list of tags
|
||||||
|
func isTagsIdentical(originalTags, checkedTags map[string]string) bool {
|
||||||
|
if len(originalTags) != len(checkedTags) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
for tagName, tagValue := range originalTags {
|
||||||
|
if tagValue != checkedTags[tagName] {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// makeFieldsWithCount assigns count value to all metric fields
|
||||||
|
func makeFieldsWithCount(fieldsWithCountIn map[string]int64) map[string]interface{} {
|
||||||
|
fieldsWithCountOut := map[string]interface{}{}
|
||||||
|
for field, count := range fieldsWithCountIn {
|
||||||
|
fieldsWithCountOut[field+"_bucket"] = count
|
||||||
|
}
|
||||||
|
|
||||||
|
return fieldsWithCountOut
|
||||||
|
}
|
||||||
|
|
||||||
|
// init initializes histogram aggregator plugin
|
||||||
|
func init() {
|
||||||
|
aggregators.Add("histogram", func() telegraf.Aggregator {
|
||||||
|
return NewHistogramAggregator()
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,210 @@
|
||||||
|
package histogram
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewTestHistogram creates new test histogram aggregation with specified config
|
||||||
|
func NewTestHistogram(cfg []config) telegraf.Aggregator {
|
||||||
|
htm := &HistogramAggregator{Configs: cfg}
|
||||||
|
htm.buckets = make(bucketsByMetrics)
|
||||||
|
htm.resetCache()
|
||||||
|
|
||||||
|
return htm
|
||||||
|
}
|
||||||
|
|
||||||
|
// firstMetric1 is the first test metric
|
||||||
|
var firstMetric1, _ = metric.New(
|
||||||
|
"first_metric_name",
|
||||||
|
map[string]string{"tag_name": "tag_value"},
|
||||||
|
map[string]interface{}{
|
||||||
|
"a": float64(15.3),
|
||||||
|
"b": float64(40),
|
||||||
|
},
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
// firstMetric1 is the first test metric with other value
|
||||||
|
var firstMetric2, _ = metric.New(
|
||||||
|
"first_metric_name",
|
||||||
|
map[string]string{"tag_name": "tag_value"},
|
||||||
|
map[string]interface{}{
|
||||||
|
"a": float64(15.9),
|
||||||
|
"c": float64(40),
|
||||||
|
},
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
// secondMetric is the second metric
|
||||||
|
var secondMetric, _ = metric.New(
|
||||||
|
"second_metric_name",
|
||||||
|
map[string]string{"tag_name": "tag_value"},
|
||||||
|
map[string]interface{}{
|
||||||
|
"a": float64(105),
|
||||||
|
"ignoreme": "string",
|
||||||
|
"andme": true,
|
||||||
|
},
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
// BenchmarkApply runs benchmarks
|
||||||
|
func BenchmarkApply(b *testing.B) {
|
||||||
|
histogram := NewHistogramAggregator()
|
||||||
|
|
||||||
|
for n := 0; n < b.N; n++ {
|
||||||
|
histogram.Add(firstMetric1)
|
||||||
|
histogram.Add(firstMetric2)
|
||||||
|
histogram.Add(secondMetric)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestHistogramWithPeriodAndOneField tests metrics for one period and for one field
|
||||||
|
func TestHistogramWithPeriodAndOneField(t *testing.T) {
|
||||||
|
var cfg []config
|
||||||
|
cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
|
||||||
|
histogram := NewTestHistogram(cfg)
|
||||||
|
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
|
||||||
|
histogram.Add(firstMetric1)
|
||||||
|
histogram.Add(firstMetric2)
|
||||||
|
histogram.Push(acc)
|
||||||
|
|
||||||
|
if len(acc.Metrics) != 6 {
|
||||||
|
assert.Fail(t, "Incorrect number of metrics")
|
||||||
|
}
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0)}, "0")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0)}, "10")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "20")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "30")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "40")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, bucketInf)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestHistogramWithPeriodAndAllFields tests two metrics for one period and for all fields
|
||||||
|
func TestHistogramWithPeriodAndAllFields(t *testing.T) {
|
||||||
|
var cfg []config
|
||||||
|
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 15.5, 20.0, 30.0, 40.0}})
|
||||||
|
cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}})
|
||||||
|
histogram := NewTestHistogram(cfg)
|
||||||
|
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
|
||||||
|
histogram.Add(firstMetric1)
|
||||||
|
histogram.Add(firstMetric2)
|
||||||
|
histogram.Add(secondMetric)
|
||||||
|
histogram.Push(acc)
|
||||||
|
|
||||||
|
if len(acc.Metrics) != 12 {
|
||||||
|
assert.Fail(t, "Incorrect number of metrics")
|
||||||
|
}
|
||||||
|
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0), "c_bucket": int64(0)}, "0")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(0), "c_bucket": int64(0)}, "15.5")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(0), "c_bucket": int64(0)}, "20")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(0), "c_bucket": int64(0)}, "30")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(1), "c_bucket": int64(1)}, "40")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(1), "c_bucket": int64(1)}, bucketInf)
|
||||||
|
|
||||||
|
assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "0")
|
||||||
|
assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "4")
|
||||||
|
assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "10")
|
||||||
|
assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "23")
|
||||||
|
assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "30")
|
||||||
|
assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(1), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, bucketInf)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestHistogramDifferentPeriodsAndAllFields tests two metrics getting added with a push/reset in between (simulates
|
||||||
|
// getting added in different periods) for all fields
|
||||||
|
func TestHistogramDifferentPeriodsAndAllFields(t *testing.T) {
|
||||||
|
|
||||||
|
var cfg []config
|
||||||
|
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
|
||||||
|
histogram := NewTestHistogram(cfg)
|
||||||
|
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
histogram.Add(firstMetric1)
|
||||||
|
histogram.Push(acc)
|
||||||
|
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0)}, "0")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0)}, "10")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(0)}, "20")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(0)}, "30")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(1)}, "40")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(1)}, bucketInf)
|
||||||
|
|
||||||
|
acc.ClearMetrics()
|
||||||
|
histogram.Add(firstMetric2)
|
||||||
|
histogram.Push(acc)
|
||||||
|
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0), "c_bucket": int64(0)}, "0")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0), "c_bucket": int64(0)}, "10")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(0), "c_bucket": int64(0)}, "20")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(0), "c_bucket": int64(0)}, "30")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(1), "c_bucket": int64(1)}, "40")
|
||||||
|
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(1), "c_bucket": int64(1)}, bucketInf)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestWrongBucketsOrder tests the calling panic with incorrect order of buckets
|
||||||
|
func TestWrongBucketsOrder(t *testing.T) {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
assert.Equal(
|
||||||
|
t,
|
||||||
|
"histogram buckets must be in increasing order: 90.00 >= 20.00, metrics: first_metric_name, field: a",
|
||||||
|
fmt.Sprint(r),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
var cfg []config
|
||||||
|
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 90.0, 20.0, 30.0, 40.0}})
|
||||||
|
histogram := NewTestHistogram(cfg)
|
||||||
|
histogram.Add(firstMetric2)
|
||||||
|
}
|
||||||
|
|
||||||
|
// assertContainsTaggedField is help functions to test histogram data
|
||||||
|
func assertContainsTaggedField(t *testing.T, acc *testutil.Accumulator, metricName string, fields map[string]interface{}, le string) {
|
||||||
|
acc.Lock()
|
||||||
|
defer acc.Unlock()
|
||||||
|
|
||||||
|
for _, checkedMetric := range acc.Metrics {
|
||||||
|
// check metric name
|
||||||
|
if checkedMetric.Measurement != metricName {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// check "le" tag
|
||||||
|
if checkedMetric.Tags[bucketTag] != le {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// check fields
|
||||||
|
isFieldsIdentical := true
|
||||||
|
for field := range fields {
|
||||||
|
if _, ok := checkedMetric.Fields[field]; !ok {
|
||||||
|
isFieldsIdentical = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !isFieldsIdentical {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// check fields with their counts
|
||||||
|
if assert.Equal(t, fields, checkedMetric.Fields) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Fail(t, fmt.Sprintf("incorrect fields %v of metric %s", fields, metricName))
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Fail(t, fmt.Sprintf("unknown measurement '%s' with tags: %v, fields: %v", metricName, map[string]string{"le": le}, fields))
|
||||||
|
}
|
Loading…
Reference in New Issue