Add histogram aggregator plugin ()

This commit is contained in:
Vladislav Mugultyanov 2017-07-31 21:33:51 +03:00 committed by Daniel Nelson
parent 3bd14ed229
commit 9c0aadf445
5 changed files with 680 additions and 0 deletions

View File

@ -606,6 +606,32 @@
# drop_original = false # drop_original = false
# # Configuration for aggregate histogram metrics
# [[aggregators.histogram]]
# ## General Aggregator Arguments:
# ## The period on which to flush & clear the aggregator.
# period = "30s"
# ## If true, the original metric will be dropped by the
# ## aggregator and will not get sent to the output plugins.
# drop_original = false
# ## The example of config to aggregate histogram for all fields of specified metric.
# [[aggregators.histogram.config]]
# ## The set of buckets.
# buckets = [0.0, 15.6, 34.5, 49.1, 71.5, 80.5, 94.5, 100.0]
# ## The name of metric.
# metric_name = "cpu"
# ## The example of config to aggregate for specified fields of metric.
# [[aggregators.histogram.config]]
# ## The set of buckets.
# buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0]
# ## The name of metric.
# metric_name = "diskio"
# ## The concrete fields of metric
# metric_fields = ["io_time", "read_time", "write_time"]
############################################################################### ###############################################################################

View File

@ -1,5 +1,6 @@
package all package all
import ( import (
_ ""
_ "" _ ""
) )

View File

@ -0,0 +1,128 @@
# Histogram Aggregator Plugin
#### Goal
This plugin was added for ability to build histograms.
#### Description
The histogram aggregator plugin aggregates values of specified metric's
fields. The metric is emitted every `period` seconds. All you need to do
is to specify borders of histogram buckets and fields, for which you want
to aggregate histogram.
#### How it works
The each metric is passed to the aggregator and this aggregator searches
histogram buckets for those fields, which have been specified in the
config. If buckets are found, the aggregator will put +1 to appropriate
bucket. Otherwise, nothing will happen. Every `period` seconds these data
will be pushed to output.
Note, that the all hits of current bucket will be also added to all next
buckets in final result of distribution. Why does it work this way? In
configuration you define right borders for each bucket in a ascending
sequence. Internally buckets are presented as ranges with borders
(0..bucketBorder]: 0..1, 0..10, 0..50, …, 0..+Inf. So the value "+1" will be
put into those buckets, in which the metric value fell with such ranges of
This plugin creates cumulative histograms. It means, that the hits in the
buckets will always increase from the moment of telegraf start. But if you
restart telegraf, all hits in the buckets will be reset to 0.
Also, the algorithm of hit counting to buckets was implemented on the base
of the algorithm, which is implemented in the Prometheus
### Configuration
# Configuration for aggregate histogram metrics
## General Aggregator Arguments:
## The period on which to flush & clear the aggregator.
period = "30s"
## If true, the original metric will be dropped by the
## aggregator and will not get sent to the output plugins.
drop_original = false
## The example of config to aggregate histogram for all fields of specified metric.
## The set of buckets.
buckets = [0.0, 15.6, 34.5, 49.1, 71.5, 80.5, 94.5, 100.0]
## The name of metric.
metric_name = "cpu"
## The example of config to aggregate histogram for concrete fields of specified metric.
## The set of buckets.
buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0]
## The name of metric.
metric_name = "diskio"
## The concrete fields of metric.
metric_fields = ["io_time", "read_time", "write_time"]
#### Explanation
The field `metric_fields` is the list of metric fields. For example, the
metric `cpu` has the following fields: usage_user, usage_system,
usage_idle, usage_nice, usage_iowait, usage_irq, usage_softirq, usage_steal,
usage_guest, usage_guest_nice.
Note that histogram metrics will be pushed every `period` seconds.
As you know telegraf calls aggregator `Reset()` func each `period` seconds.
Histogram aggregator ignores `Reset()` and continues to count hits.
#### Use cases
You can specify fields using two cases:
1. The specifying only metric name. In this case all fields of metric
will be aggregated.
2. The specifying metric name and concrete field.
#### Some rules
- The setting of each histogram must be in separate section with title
- The each value of bucket must be float value.
- Don\`t include the border bucket `+Inf`. It will be done automatically.
### Measurements & Fields:
The postfix `bucket` will be added to each field.
- measurement1
- field1_bucket
- field2_bucket
### Tags:
All measurements have tag `le`. This tag has the border value of bucket. It
means that the metric value is less or equal to the value of this tag. For
example, let assume that we have the metric value 10 and the following
buckets: [5, 10, 30, 70, 100]. Then the tag `le` will have the value 10,
because the metrics value is passed into bucket with right border value `10`.
### Example Output:
The following output will return to the Prometheus client.
cpu,cpu=cpu1,host=localhost,le=0.0 usage_idle_bucket=0i 1486998330000000000
cpu,cpu=cpu1,host=localhost,le=10.0 usage_idle_bucket=0i 1486998330000000000
cpu,cpu=cpu1,host=localhost,le=20.0 usage_idle_bucket=1i 1486998330000000000
cpu,cpu=cpu1,host=localhost,le=30.0 usage_idle_bucket=2i 1486998330000000000
cpu,cpu=cpu1,host=localhost,le=40.0 usage_idle_bucket=2i 1486998330000000000
cpu,cpu=cpu1,host=localhost,le=50.0 usage_idle_bucket=2i 1486998330000000000
cpu,cpu=cpu1,host=localhost,le=60.0 usage_idle_bucket=2i 1486998330000000000
cpu,cpu=cpu1,host=localhost,le=70.0 usage_idle_bucket=2i 1486998330000000000
cpu,cpu=cpu1,host=localhost,le=80.0 usage_idle_bucket=2i 1486998330000000000
cpu,cpu=cpu1,host=localhost,le=90.0 usage_idle_bucket=2i 1486998330000000000
cpu,cpu=cpu1,host=localhost,le=100.0 usage_idle_bucket=2i 1486998330000000000
cpu,cpu=cpu1,host=localhost,le=+Inf usage_idle_bucket=2i 1486998330000000000

View File

@ -0,0 +1,315 @@
package histogram
import (
// bucketTag is the tag, which contains right bucket border
const bucketTag = "le"
// bucketInf is the right bucket border for infinite values
const bucketInf = "+Inf"
// HistogramAggregator is aggregator with histogram configs and particular histograms for defined metrics
type HistogramAggregator struct {
Configs []config `toml:"config"`
buckets bucketsByMetrics
cache map[uint64]metricHistogramCollection
// config is the config, which contains name, field of metric and histogram buckets.
type config struct {
Metric string `toml:"metric_name"`
Fields []string `toml:"metric_fields"`
Buckets buckets `toml:"buckets"`
// bucketsByMetrics contains the buckets grouped by metric and field name
type bucketsByMetrics map[string]bucketsByFields
// bucketsByFields contains the buckets grouped by field name
type bucketsByFields map[string]buckets
// buckets contains the right borders buckets
type buckets []float64
// metricHistogramCollection aggregates the histogram data
type metricHistogramCollection struct {
histogramCollection map[string]counts
name string
tags map[string]string
// counts is the number of hits in the bucket
type counts []int64
// groupedByCountFields contains grouped fields by their count and fields values
type groupedByCountFields struct {
name string
tags map[string]string
fieldsWithCount map[string]int64
// NewHistogramAggregator creates new histogram aggregator
func NewHistogramAggregator() telegraf.Aggregator {
h := &HistogramAggregator{}
h.buckets = make(bucketsByMetrics)
return h
var sampleConfig = `
## General Aggregator Arguments:
## The period on which to flush & clear the aggregator.
period = "30s"
## If true, the original metric will be dropped by the
## aggregator and will not get sent to the output plugins.
drop_original = false
## The example of config to aggregate histogram for all fields of specified metric.
## The set of buckets.
buckets = [0.0, 15.6, 34.5, 49.1, 71.5, 80.5, 94.5, 100.0]
## The name of metric.
metric_name = "cpu"
## The example of config to aggregate for specified fields of metric.
## The set of buckets.
buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0]
## The name of metric.
metric_name = "diskio"
## The concrete fields of metric
metric_fields = ["io_time", "read_time", "write_time"]
// SampleConfig returns sample of config
func (h *HistogramAggregator) SampleConfig() string {
return sampleConfig
// Description returns description of aggregator plugin
func (h *HistogramAggregator) Description() string {
return "Keep the aggregate histogram of each metric passing through."
// Add adds new hit to the buckets
func (h *HistogramAggregator) Add(in telegraf.Metric) {
bucketsByField := make(map[string][]float64)
for field := range in.Fields() {
buckets := h.getBuckets(in.Name(), field)
if buckets != nil {
bucketsByField[field] = buckets
if len(bucketsByField) == 0 {
id := in.HashID()
agr, ok := h.cache[id]
if !ok {
agr = metricHistogramCollection{
name: in.Name(),
tags: in.Tags(),
histogramCollection: make(map[string]counts),
for field, value := range in.Fields() {
if buckets, ok := bucketsByField[field]; ok {
if agr.histogramCollection[field] == nil {
agr.histogramCollection[field] = make(counts, len(buckets)+1)
if value, ok := convert(value); ok {
index := sort.SearchFloat64s(buckets, value)
h.cache[id] = agr
// Push returns histogram values for metrics
func (h *HistogramAggregator) Push(acc telegraf.Accumulator) {
metricsWithGroupedFields := []groupedByCountFields{}
for _, aggregate := range h.cache {
for field, counts := range aggregate.histogramCollection {
h.groupFieldsByBuckets(&metricsWithGroupedFields,, field, copyTags(aggregate.tags), counts)
for _, metric := range metricsWithGroupedFields {
acc.AddFields(, makeFieldsWithCount(metric.fieldsWithCount), metric.tags)
// groupFieldsByBuckets groups fields by metric buckets which are represented as tags
func (h *HistogramAggregator) groupFieldsByBuckets(
metricsWithGroupedFields *[]groupedByCountFields,
name string,
field string,
tags map[string]string,
counts []int64,
) {
count := int64(0)
for index, bucket := range h.getBuckets(name, field) {
count += counts[index]
tags[bucketTag] = strconv.FormatFloat(bucket, 'f', -1, 64)
h.groupField(metricsWithGroupedFields, name, field, count, copyTags(tags))
count += counts[len(counts)-1]
tags[bucketTag] = bucketInf
h.groupField(metricsWithGroupedFields, name, field, count, tags)
// groupField groups field by count value
func (h *HistogramAggregator) groupField(
metricsWithGroupedFields *[]groupedByCountFields,
name string,
field string,
count int64,
tags map[string]string,
) {
for key, metric := range *metricsWithGroupedFields {
if name == && isTagsIdentical(tags, metric.tags) {
(*metricsWithGroupedFields)[key].fieldsWithCount[field] = count
fieldsWithCount := map[string]int64{
field: count,
*metricsWithGroupedFields = append(
groupedByCountFields{name: name, tags: tags, fieldsWithCount: fieldsWithCount},
// Reset does nothing, because we need to collect counts for a long time, otherwise if config parameter 'reset' has
// small value, we will get a histogram with a small amount of the distribution.
func (h *HistogramAggregator) Reset() {}
// resetCache resets cached counts(hits) in the buckets
func (h *HistogramAggregator) resetCache() {
h.cache = make(map[uint64]metricHistogramCollection)
// getBuckets finds buckets and returns them
func (h *HistogramAggregator) getBuckets(metric string, field string) []float64 {
if buckets, ok := h.buckets[metric][field]; ok {
return buckets
for _, config := range h.Configs {
if config.Metric == metric {
if !isBucketExists(field, config) {
if _, ok := h.buckets[metric]; !ok {
h.buckets[metric] = make(bucketsByFields)
h.buckets[metric][field] = sortBuckets(config.Buckets)
return h.buckets[metric][field]
// isBucketExists checks if buckets exists for the passed field
func isBucketExists(field string, cfg config) bool {
if len(cfg.Fields) == 0 {
return true
for _, fl := range cfg.Fields {
if fl == field {
return true
return false
// sortBuckets sorts the buckets if it is needed
func sortBuckets(buckets []float64) []float64 {
for i, bucket := range buckets {
if i < len(buckets)-1 && bucket >= buckets[i+1] {
return buckets
// convert converts interface to concrete type
func convert(in interface{}) (float64, bool) {
switch v := in.(type) {
case float64:
return v, true
case int64:
return float64(v), true
return 0, false
// copyTags copies tags
func copyTags(tags map[string]string) map[string]string {
copiedTags := map[string]string{}
for key, val := range tags {
copiedTags[key] = val
return copiedTags
// isTagsIdentical checks the identity of two list of tags
func isTagsIdentical(originalTags, checkedTags map[string]string) bool {
if len(originalTags) != len(checkedTags) {
return false
for tagName, tagValue := range originalTags {
if tagValue != checkedTags[tagName] {
return false
return true
// makeFieldsWithCount assigns count value to all metric fields
func makeFieldsWithCount(fieldsWithCountIn map[string]int64) map[string]interface{} {
fieldsWithCountOut := map[string]interface{}{}
for field, count := range fieldsWithCountIn {
fieldsWithCountOut[field+"_bucket"] = count
return fieldsWithCountOut
// init initializes histogram aggregator plugin
func init() {
aggregators.Add("histogram", func() telegraf.Aggregator {
return NewHistogramAggregator()

View File

@ -0,0 +1,210 @@
package histogram
import (
// NewTestHistogram creates new test histogram aggregation with specified config
func NewTestHistogram(cfg []config) telegraf.Aggregator {
htm := &HistogramAggregator{Configs: cfg}
htm.buckets = make(bucketsByMetrics)
return htm
// firstMetric1 is the first test metric
var firstMetric1, _ = metric.New(
map[string]string{"tag_name": "tag_value"},
"a": float64(15.3),
"b": float64(40),
// firstMetric1 is the first test metric with other value
var firstMetric2, _ = metric.New(
map[string]string{"tag_name": "tag_value"},
"a": float64(15.9),
"c": float64(40),
// secondMetric is the second metric
var secondMetric, _ = metric.New(
map[string]string{"tag_name": "tag_value"},
"a": float64(105),
"ignoreme": "string",
"andme": true,
// BenchmarkApply runs benchmarks
func BenchmarkApply(b *testing.B) {
histogram := NewHistogramAggregator()
for n := 0; n < b.N; n++ {
// TestHistogramWithPeriodAndOneField tests metrics for one period and for one field
func TestHistogramWithPeriodAndOneField(t *testing.T) {
var cfg []config
cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
histogram := NewTestHistogram(cfg)
acc := &testutil.Accumulator{}
if len(acc.Metrics) != 6 {
assert.Fail(t, "Incorrect number of metrics")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0)}, "0")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0)}, "10")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "20")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "30")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "40")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, bucketInf)
// TestHistogramWithPeriodAndAllFields tests two metrics for one period and for all fields
func TestHistogramWithPeriodAndAllFields(t *testing.T) {
var cfg []config
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 15.5, 20.0, 30.0, 40.0}})
cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}})
histogram := NewTestHistogram(cfg)
acc := &testutil.Accumulator{}
if len(acc.Metrics) != 12 {
assert.Fail(t, "Incorrect number of metrics")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0), "c_bucket": int64(0)}, "0")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(0), "c_bucket": int64(0)}, "15.5")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(0), "c_bucket": int64(0)}, "20")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(0), "c_bucket": int64(0)}, "30")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(1), "c_bucket": int64(1)}, "40")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(1), "c_bucket": int64(1)}, bucketInf)
assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "0")
assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "4")
assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "10")
assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "23")
assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "30")
assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(1), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, bucketInf)
// TestHistogramDifferentPeriodsAndAllFields tests two metrics getting added with a push/reset in between (simulates
// getting added in different periods) for all fields
func TestHistogramDifferentPeriodsAndAllFields(t *testing.T) {
var cfg []config
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
histogram := NewTestHistogram(cfg)
acc := &testutil.Accumulator{}
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0)}, "0")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0)}, "10")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(0)}, "20")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(0)}, "30")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(1)}, "40")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(1)}, bucketInf)
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0), "c_bucket": int64(0)}, "0")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0), "c_bucket": int64(0)}, "10")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(0), "c_bucket": int64(0)}, "20")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(0), "c_bucket": int64(0)}, "30")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(1), "c_bucket": int64(1)}, "40")
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(1), "c_bucket": int64(1)}, bucketInf)
// TestWrongBucketsOrder tests the calling panic with incorrect order of buckets
func TestWrongBucketsOrder(t *testing.T) {
defer func() {
if r := recover(); r != nil {
"histogram buckets must be in increasing order: 90.00 >= 20.00, metrics: first_metric_name, field: a",
var cfg []config
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 90.0, 20.0, 30.0, 40.0}})
histogram := NewTestHistogram(cfg)
// assertContainsTaggedField is help functions to test histogram data
func assertContainsTaggedField(t *testing.T, acc *testutil.Accumulator, metricName string, fields map[string]interface{}, le string) {
defer acc.Unlock()
for _, checkedMetric := range acc.Metrics {
// check metric name
if checkedMetric.Measurement != metricName {
// check "le" tag
if checkedMetric.Tags[bucketTag] != le {
// check fields
isFieldsIdentical := true
for field := range fields {
if _, ok := checkedMetric.Fields[field]; !ok {
isFieldsIdentical = false
if !isFieldsIdentical {
// check fields with their counts
if assert.Equal(t, fields, checkedMetric.Fields) {
assert.Fail(t, fmt.Sprintf("incorrect fields %v of metric %s", fields, metricName))
assert.Fail(t, fmt.Sprintf("unknown measurement '%s' with tags: %v, fields: %v", metricName, map[string]string{"le": le}, fields))