telegraf/plugins/processors/topk/topk.go

446 lines
12 KiB
Go

package topk
import (
"fmt"
"log"
"math"
"sort"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/processors"
)
type TopK struct {
Period internal.Duration
K int
GroupBy []string `toml:"group_by"`
Fields []string
Aggregation string
Bottomk bool
AddGroupByTag string `toml:"add_groupby_tag"`
AddRankFields []string `toml:"add_rank_fields"`
AddAggregateFields []string `toml:"add_aggregate_fields"`
cache map[string][]telegraf.Metric
tagsGlobs filter.Filter
rankFieldSet map[string]bool
aggFieldSet map[string]bool
lastAggregation time.Time
}
func New() *TopK {
// Create object
topk := TopK{}
// Setup defaults
topk.Period = internal.Duration{Duration: time.Second * time.Duration(10)}
topk.K = 10
topk.Fields = []string{"value"}
topk.Aggregation = "mean"
topk.GroupBy = []string{"*"}
topk.AddGroupByTag = ""
topk.AddRankFields = []string{}
topk.AddAggregateFields = []string{}
// Initialize cache
topk.Reset()
return &topk
}
var sampleConfig = `
## How many seconds between aggregations
# period = 10
## How many top metrics to return
# k = 10
## Over which tags should the aggregation be done. Globs can be specified, in
## which case any tag matching the glob will aggregated over. If set to an
## empty list is no aggregation over tags is done
# group_by = ['*']
## Over which fields are the top k are calculated
# fields = ["value"]
## What aggregation to use. Options: sum, mean, min, max
# aggregation = "mean"
## Instead of the top k largest metrics, return the bottom k lowest metrics
# bottomk = false
## The plugin assigns each metric a GroupBy tag generated from its name and
## tags. If this setting is different than "" the plugin will add a
## tag (which name will be the value of this setting) to each metric with
## the value of the calculated GroupBy tag. Useful for debugging
# add_groupby_tag = ""
## These settings provide a way to know the position of each metric in
## the top k. The 'add_rank_field' setting allows to specify for which
## fields the position is required. If the list is non empty, then a field
## will be added to each and every metric for each string present in this
## setting. This field will contain the ranking of the group that
## the metric belonged to when aggregated over that field.
## The name of the field will be set to the name of the aggregation field,
## suffixed with the string '_topk_rank'
# add_rank_fields = []
## These settings provide a way to know what values the plugin is generating
## when aggregating metrics. The 'add_agregate_field' setting allows to
## specify for which fields the final aggregation value is required. If the
## list is non empty, then a field will be added to each every metric for
## each field present in this setting. This field will contain
## the computed aggregation for the group that the metric belonged to when
## aggregated over that field.
## The name of the field will be set to the name of the aggregation field,
## suffixed with the string '_topk_aggregate'
# add_aggregate_fields = []
`
type MetricAggregation struct {
groupbykey string
values map[string]float64
}
func sortMetrics(metrics []MetricAggregation, field string, reverse bool) {
less := func(i, j int) bool {
iv := metrics[i].values[field]
jv := metrics[j].values[field]
if iv < jv {
return true
} else {
return false
}
}
if reverse {
sort.SliceStable(metrics, less)
} else {
sort.SliceStable(metrics, func(i, j int) bool { return !less(i, j) })
}
}
func (t *TopK) SampleConfig() string {
return sampleConfig
}
func (t *TopK) Reset() {
t.cache = make(map[string][]telegraf.Metric)
t.lastAggregation = time.Now()
}
func (t *TopK) Description() string {
return "Print all metrics that pass through this filter."
}
func (t *TopK) generateGroupByKey(m telegraf.Metric) (string, error) {
// Create the filter.Filter objects if they have not been created
if t.tagsGlobs == nil && len(t.GroupBy) > 0 {
var err error
t.tagsGlobs, err = filter.Compile(t.GroupBy)
if err != nil {
return "", fmt.Errorf("could not compile pattern: %v %v", t.GroupBy, err)
}
}
groupkey := m.Name() + "&"
if len(t.GroupBy) > 0 {
tags := m.Tags()
keys := make([]string, 0, len(tags))
for tag, value := range tags {
if t.tagsGlobs.Match(tag) {
keys = append(keys, tag+"="+value+"&")
}
}
// Sorting the selected tags is necessary because dictionaries
// do not ensure any specific or deterministic ordering
sort.SliceStable(keys, func(i, j int) bool { return keys[i] < keys[j] })
for _, str := range keys {
groupkey += str
}
}
return groupkey, nil
}
func (t *TopK) groupBy(m telegraf.Metric) {
// Generate the metric group key
groupkey, err := t.generateGroupByKey(m)
if err != nil {
// If we could not generate the groupkey, fail hard
// by dropping this and all subsequent metrics
log.Printf("E! [processors.topk]: could not generate group key: %v", err)
return
}
// Initialize the key with an empty list if necessary
if _, ok := t.cache[groupkey]; !ok {
t.cache[groupkey] = make([]telegraf.Metric, 0, 10)
}
// Append the metric to the corresponding key list
t.cache[groupkey] = append(t.cache[groupkey], m)
// Add the generated groupby key tag to the metric if requested
if t.AddGroupByTag != "" {
m.AddTag(t.AddGroupByTag, groupkey)
}
}
func (t *TopK) Apply(in ...telegraf.Metric) []telegraf.Metric {
// Init any internal datastructures that are not initialized yet
if t.rankFieldSet == nil {
t.rankFieldSet = make(map[string]bool)
for _, f := range t.AddRankFields {
t.rankFieldSet[f] = true
}
}
if t.aggFieldSet == nil {
t.aggFieldSet = make(map[string]bool)
for _, f := range t.AddAggregateFields {
if f != "" {
t.aggFieldSet[f] = true
}
}
}
// Add the metrics received to our internal cache
for _, m := range in {
// When tracking metrics this plugin could deadlock the input by
// holding undelivered metrics while the input waits for metrics to be
// delivered. Instead, treat all handled metrics as delivered and
// produced metrics as untracked in a similar way to aggregators.
m.Drop()
// Check if the metric has any of the fields over which we are aggregating
hasField := false
for _, f := range t.Fields {
if m.HasField(f) {
hasField = true
break
}
}
if !hasField {
continue
}
// Add the metric to the internal cache
t.groupBy(m)
}
// If enough time has passed
elapsed := time.Since(t.lastAggregation)
if elapsed >= t.Period.Duration {
return t.push()
}
return []telegraf.Metric{}
}
func min(a, b int) int {
if a > b {
return b
}
return a
}
func convert(in interface{}) (float64, bool) {
switch v := in.(type) {
case float64:
return v, true
case int64:
return float64(v), true
case uint64:
return float64(v), true
default:
return 0, false
}
}
func (t *TopK) push() []telegraf.Metric {
// Generate aggregations list using the selected fields
aggregations := make([]MetricAggregation, 0, 100)
aggregator, err := t.getAggregationFunction(t.Aggregation)
if err != nil {
// If we could not generate the aggregation
// function, fail hard by dropping all metrics
log.Printf("E! [processors.topk]: %v", err)
return []telegraf.Metric{}
}
for k, ms := range t.cache {
aggregations = append(aggregations, MetricAggregation{groupbykey: k, values: aggregator(ms, t.Fields)})
}
// The return value that will hold the returned metrics
var ret []telegraf.Metric = make([]telegraf.Metric, 0, 0)
// Get the top K metrics for each field and add them to the return value
addedKeys := make(map[string]bool)
for _, field := range t.Fields {
// Sort the aggregations
sortMetrics(aggregations, field, t.Bottomk)
// Create a one dimensional list with the top K metrics of each key
for i, ag := range aggregations[0:min(t.K, len(aggregations))] {
// Check whether of not we need to add fields of tags to the selected metrics
if len(t.aggFieldSet) != 0 || len(t.rankFieldSet) != 0 || t.AddGroupByTag != "" {
for _, m := range t.cache[ag.groupbykey] {
// Add the aggregation final value if requested
_, addAggField := t.aggFieldSet[field]
if addAggField && m.HasField(field) {
m.AddField(field+"_topk_aggregate", ag.values[field])
}
// Add the rank relative to the current field if requested
_, addRankField := t.rankFieldSet[field]
if addRankField && m.HasField(field) {
m.AddField(field+"_topk_rank", i+1)
}
}
}
// Add metrics if we have not already appended them to the return value
_, ok := addedKeys[ag.groupbykey]
if !ok {
ret = append(ret, t.cache[ag.groupbykey]...)
addedKeys[ag.groupbykey] = true
}
}
}
t.Reset()
result := make([]telegraf.Metric, 0, len(ret))
for _, m := range ret {
copy, err := metric.New(m.Name(), m.Tags(), m.Fields(), m.Time(), m.Type())
if err != nil {
continue
}
result = append(result, copy)
}
return result
}
// Function that generates the aggregation functions
func (t *TopK) getAggregationFunction(aggOperation string) (func([]telegraf.Metric, []string) map[string]float64, error) {
// This is a function aggregates a set of metrics using a given aggregation function
var aggregator = func(ms []telegraf.Metric, fields []string, f func(map[string]float64, float64, string)) map[string]float64 {
agg := make(map[string]float64)
// Compute the sums of the selected fields over all the measurements collected for this metric
for _, m := range ms {
for _, field := range fields {
fieldVal, ok := m.Fields()[field]
if !ok {
continue // Skip if this metric doesn't have this field set
}
val, ok := convert(fieldVal)
if !ok {
log.Printf("Cannot convert value '%s' from metric '%s' with tags '%s'",
m.Fields()[field], m.Name(), m.Tags())
continue
}
f(agg, val, field)
}
}
return agg
}
switch aggOperation {
case "sum":
return func(ms []telegraf.Metric, fields []string) map[string]float64 {
sum := func(agg map[string]float64, val float64, field string) {
agg[field] += val
}
return aggregator(ms, fields, sum)
}, nil
case "min":
return func(ms []telegraf.Metric, fields []string) map[string]float64 {
min := func(agg map[string]float64, val float64, field string) {
// If this field has not been set, set it to the maximum float64
_, ok := agg[field]
if !ok {
agg[field] = math.MaxFloat64
}
// Check if we've found a new minimum
if agg[field] > val {
agg[field] = val
}
}
return aggregator(ms, fields, min)
}, nil
case "max":
return func(ms []telegraf.Metric, fields []string) map[string]float64 {
max := func(agg map[string]float64, val float64, field string) {
// If this field has not been set, set it to the minimum float64
_, ok := agg[field]
if !ok {
agg[field] = -math.MaxFloat64
}
// Check if we've found a new maximum
if agg[field] < val {
agg[field] = val
}
}
return aggregator(ms, fields, max)
}, nil
case "mean":
return func(ms []telegraf.Metric, fields []string) map[string]float64 {
mean := make(map[string]float64)
meanCounters := make(map[string]float64)
// Compute the sums of the selected fields over all the measurements collected for this metric
for _, m := range ms {
for _, field := range fields {
fieldVal, ok := m.Fields()[field]
if !ok {
continue // Skip if this metric doesn't have this field set
}
val, ok := convert(fieldVal)
if !ok {
log.Printf("Cannot convert value '%s' from metric '%s' with tags '%s'",
m.Fields()[field], m.Name(), m.Tags())
continue
}
mean[field] += val
meanCounters[field] += 1
}
}
// Divide by the number of recorded measurements collected for every field
noMeasurementsFound := true // Canary to check if no field with values was found, so we can return nil
for k := range mean {
if meanCounters[k] == 0 {
mean[k] = 0
continue
}
mean[k] = mean[k] / meanCounters[k]
noMeasurementsFound = noMeasurementsFound && false
}
if noMeasurementsFound {
return nil
}
return mean
}, nil
default:
return nil, fmt.Errorf("Unknown aggregation function '%s'. No metrics will be processed", t.Aggregation)
}
}
func init() {
processors.Add("topk", func() telegraf.Processor {
return New()
})
}