package histogram import ( "sort" "strconv" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/aggregators" ) // bucketTag is the tag, which contains right bucket border const bucketTag = "le" // bucketInf is the right bucket border for infinite values const bucketInf = "+Inf" // HistogramAggregator is aggregator with histogram configs and particular histograms for defined metrics type HistogramAggregator struct { Configs []config `toml:"config"` buckets bucketsByMetrics cache map[uint64]metricHistogramCollection } // config is the config, which contains name, field of metric and histogram buckets. type config struct { Metric string `toml:"metric_name"` Fields []string `toml:"metric_fields"` Buckets buckets `toml:"buckets"` } // bucketsByMetrics contains the buckets grouped by metric and field name type bucketsByMetrics map[string]bucketsByFields // bucketsByFields contains the buckets grouped by field name type bucketsByFields map[string]buckets // buckets contains the right borders buckets type buckets []float64 // metricHistogramCollection aggregates the histogram data type metricHistogramCollection struct { histogramCollection map[string]counts name string tags map[string]string } // counts is the number of hits in the bucket type counts []int64 // groupedByCountFields contains grouped fields by their count and fields values type groupedByCountFields struct { name string tags map[string]string fieldsWithCount map[string]int64 } // NewHistogramAggregator creates new histogram aggregator func NewHistogramAggregator() telegraf.Aggregator { h := &HistogramAggregator{} h.buckets = make(bucketsByMetrics) h.resetCache() return h } var sampleConfig = ` ## General Aggregator Arguments: ## The period on which to flush & clear the aggregator. period = "30s" ## If true, the original metric will be dropped by the ## aggregator and will not get sent to the output plugins. drop_original = false ## The example of config to aggregate histogram for all fields of specified metric. [[aggregators.histogram.config]] ## The set of buckets. buckets = [0.0, 15.6, 34.5, 49.1, 71.5, 80.5, 94.5, 100.0] ## The name of metric. metric_name = "cpu" ## The example of config to aggregate for specified fields of metric. [[aggregators.histogram.config]] ## The set of buckets. buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] ## The name of metric. metric_name = "diskio" ## The concrete fields of metric metric_fields = ["io_time", "read_time", "write_time"] ` // SampleConfig returns sample of config func (h *HistogramAggregator) SampleConfig() string { return sampleConfig } // Description returns description of aggregator plugin func (h *HistogramAggregator) Description() string { return "Keep the aggregate histogram of each metric passing through." } // Add adds new hit to the buckets func (h *HistogramAggregator) Add(in telegraf.Metric) { bucketsByField := make(map[string][]float64) for field := range in.Fields() { buckets := h.getBuckets(in.Name(), field) if buckets != nil { bucketsByField[field] = buckets } } if len(bucketsByField) == 0 { return } id := in.HashID() agr, ok := h.cache[id] if !ok { agr = metricHistogramCollection{ name: in.Name(), tags: in.Tags(), histogramCollection: make(map[string]counts), } } for field, value := range in.Fields() { if buckets, ok := bucketsByField[field]; ok { if agr.histogramCollection[field] == nil { agr.histogramCollection[field] = make(counts, len(buckets)+1) } if value, ok := convert(value); ok { index := sort.SearchFloat64s(buckets, value) agr.histogramCollection[field][index]++ } } } h.cache[id] = agr } // Push returns histogram values for metrics func (h *HistogramAggregator) Push(acc telegraf.Accumulator) { metricsWithGroupedFields := []groupedByCountFields{} for _, aggregate := range h.cache { for field, counts := range aggregate.histogramCollection { h.groupFieldsByBuckets(&metricsWithGroupedFields, aggregate.name, field, copyTags(aggregate.tags), counts) } } for _, metric := range metricsWithGroupedFields { acc.AddFields(metric.name, makeFieldsWithCount(metric.fieldsWithCount), metric.tags) } } // groupFieldsByBuckets groups fields by metric buckets which are represented as tags func (h *HistogramAggregator) groupFieldsByBuckets( metricsWithGroupedFields *[]groupedByCountFields, name string, field string, tags map[string]string, counts []int64, ) { count := int64(0) for index, bucket := range h.getBuckets(name, field) { count += counts[index] tags[bucketTag] = strconv.FormatFloat(bucket, 'f', -1, 64) h.groupField(metricsWithGroupedFields, name, field, count, copyTags(tags)) } count += counts[len(counts)-1] tags[bucketTag] = bucketInf h.groupField(metricsWithGroupedFields, name, field, count, tags) } // groupField groups field by count value func (h *HistogramAggregator) groupField( metricsWithGroupedFields *[]groupedByCountFields, name string, field string, count int64, tags map[string]string, ) { for key, metric := range *metricsWithGroupedFields { if name == metric.name && isTagsIdentical(tags, metric.tags) { (*metricsWithGroupedFields)[key].fieldsWithCount[field] = count return } } fieldsWithCount := map[string]int64{ field: count, } *metricsWithGroupedFields = append( *metricsWithGroupedFields, groupedByCountFields{name: name, tags: tags, fieldsWithCount: fieldsWithCount}, ) } // Reset does nothing, because we need to collect counts for a long time, otherwise if config parameter 'reset' has // small value, we will get a histogram with a small amount of the distribution. func (h *HistogramAggregator) Reset() {} // resetCache resets cached counts(hits) in the buckets func (h *HistogramAggregator) resetCache() { h.cache = make(map[uint64]metricHistogramCollection) } // getBuckets finds buckets and returns them func (h *HistogramAggregator) getBuckets(metric string, field string) []float64 { if buckets, ok := h.buckets[metric][field]; ok { return buckets } for _, config := range h.Configs { if config.Metric == metric { if !isBucketExists(field, config) { continue } if _, ok := h.buckets[metric]; !ok { h.buckets[metric] = make(bucketsByFields) } h.buckets[metric][field] = sortBuckets(config.Buckets) } } return h.buckets[metric][field] } // isBucketExists checks if buckets exists for the passed field func isBucketExists(field string, cfg config) bool { if len(cfg.Fields) == 0 { return true } for _, fl := range cfg.Fields { if fl == field { return true } } return false } // sortBuckets sorts the buckets if it is needed func sortBuckets(buckets []float64) []float64 { for i, bucket := range buckets { if i < len(buckets)-1 && bucket >= buckets[i+1] { sort.Float64s(buckets) break } } return buckets } // convert converts interface to concrete type func convert(in interface{}) (float64, bool) { switch v := in.(type) { case float64: return v, true case int64: return float64(v), true default: return 0, false } } // copyTags copies tags func copyTags(tags map[string]string) map[string]string { copiedTags := map[string]string{} for key, val := range tags { copiedTags[key] = val } return copiedTags } // isTagsIdentical checks the identity of two list of tags func isTagsIdentical(originalTags, checkedTags map[string]string) bool { if len(originalTags) != len(checkedTags) { return false } for tagName, tagValue := range originalTags { if tagValue != checkedTags[tagName] { return false } } return true } // makeFieldsWithCount assigns count value to all metric fields func makeFieldsWithCount(fieldsWithCountIn map[string]int64) map[string]interface{} { fieldsWithCountOut := map[string]interface{}{} for field, count := range fieldsWithCountIn { fieldsWithCountOut[field+"_bucket"] = count } return fieldsWithCountOut } // init initializes histogram aggregator plugin func init() { aggregators.Add("histogram", func() telegraf.Aggregator { return NewHistogramAggregator() }) }