Add topk processor plugin (#4096)
This commit is contained in:
parent
4c35a56edd
commit
ac9b308cee
|
@ -3,4 +3,5 @@ package all
|
||||||
import (
|
import (
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/override"
|
_ "github.com/influxdata/telegraf/plugins/processors/override"
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/printer"
|
_ "github.com/influxdata/telegraf/plugins/processors/printer"
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/processors/topk"
|
||||||
)
|
)
|
||||||
|
|
|
@ -0,0 +1,74 @@
|
||||||
|
# TopK Processor Plugin
|
||||||
|
|
||||||
|
The TopK processor plugin is a filter designed to get the top series over a period of time. It can be tweaked to do its top k computation over a period of time, so spikes can be smoothed out.
|
||||||
|
|
||||||
|
This processor goes through these steps when processing a batch of metrics:
|
||||||
|
|
||||||
|
1. Groups metrics in buckets using their tags and name as key
|
||||||
|
2. Aggregates each of the selected fields for each bucket by the selected aggregation function (sum, mean, etc)
|
||||||
|
3. Orders the buckets by one of the generated aggregations, returns all metrics in the top `K` buckets, then reorders the buckets by the next of the generated aggregations, returns all metrics in the top `K` buckets, etc, etc, etc, until it runs out of fields.
|
||||||
|
|
||||||
|
The plugin makes sure not to duplicate metrics
|
||||||
|
|
||||||
|
Note that depending on the amount of metrics on each computed bucket, more than `K` metrics may be returned
|
||||||
|
|
||||||
|
### Configuration:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[processors.topk]]
|
||||||
|
## How many seconds between aggregations
|
||||||
|
# period = 10
|
||||||
|
|
||||||
|
## How many top metrics to return
|
||||||
|
# k = 10
|
||||||
|
|
||||||
|
## Over which tags should the aggregation be done. Globs can be specified, in
|
||||||
|
## which case any tag matching the glob will aggregated over. If set to an
|
||||||
|
## empty list is no aggregation over tags is done
|
||||||
|
# group_by = ['*']
|
||||||
|
|
||||||
|
## Over which fields are the top k are calculated
|
||||||
|
# fields = ["value"]
|
||||||
|
|
||||||
|
## What aggregation to use. Options: sum, mean, min, max
|
||||||
|
# aggregation = "mean"
|
||||||
|
|
||||||
|
## Instead of the top k largest metrics, return the bottom k lowest metrics
|
||||||
|
# bottomk = false
|
||||||
|
|
||||||
|
## The plugin assigns each metric a GroupBy tag generated from its name and
|
||||||
|
## tags. If this setting is different than "" the plugin will add a
|
||||||
|
## tag (which name will be the value of this setting) to each metric with
|
||||||
|
## the value of the calculated GroupBy tag. Useful for debugging
|
||||||
|
# add_groupby_tag = ""
|
||||||
|
|
||||||
|
## These settings provide a way to know the position of each metric in
|
||||||
|
## the top k. The 'add_rank_field' setting allows to specify for which
|
||||||
|
## fields the position is required. If the list is non empty, then a field
|
||||||
|
## will be added to each and every metric for each string present in this
|
||||||
|
## setting. This field will contain the ranking of the group that
|
||||||
|
## the metric belonged to when aggregated over that field.
|
||||||
|
## The name of the field will be set to the name of the aggregation field,
|
||||||
|
## suffixed with the string '_topk_rank'
|
||||||
|
# add_rank_fields = []
|
||||||
|
|
||||||
|
## These settings provide a way to know what values the plugin is generating
|
||||||
|
## when aggregating metrics. The 'add_agregate_field' setting allows to
|
||||||
|
## specify for which fields the final aggregation value is required. If the
|
||||||
|
## list is non empty, then a field will be added to each every metric for
|
||||||
|
## each field present in this setting. This field will contain
|
||||||
|
## the computed aggregation for the group that the metric belonged to when
|
||||||
|
## aggregated over that field.
|
||||||
|
## The name of the field will be set to the name of the aggregation field,
|
||||||
|
## suffixed with the string '_topk_aggregate'
|
||||||
|
# add_aggregate_fields = []
|
||||||
|
```
|
||||||
|
|
||||||
|
### Tags:
|
||||||
|
|
||||||
|
This processor does not add tags by default. But the setting `add_groupby_tag` will add a tag if set to anything other than ""
|
||||||
|
|
||||||
|
|
||||||
|
### Fields:
|
||||||
|
|
||||||
|
This processor does not add fields by default. But the settings `add_rank_fields` and `add_aggregation_fields` will add one or several fields if set to anything other than ""
|
|
@ -0,0 +1,163 @@
|
||||||
|
package topk
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
///// Test set 1 /////
|
||||||
|
var metric11, _ = metric.New(
|
||||||
|
"m1",
|
||||||
|
map[string]string{"tag_name": "tag_value1"},
|
||||||
|
map[string]interface{}{
|
||||||
|
"a": float64(15.3),
|
||||||
|
"b": float64(40),
|
||||||
|
},
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
var metric12, _ = metric.New(
|
||||||
|
"m1",
|
||||||
|
map[string]string{"tag_name": "tag_value1"},
|
||||||
|
map[string]interface{}{
|
||||||
|
"a": float64(50),
|
||||||
|
},
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
var metric13, _ = metric.New(
|
||||||
|
"m1",
|
||||||
|
map[string]string{"tag_name": "tag_value1"},
|
||||||
|
map[string]interface{}{
|
||||||
|
"a": float64(0.3),
|
||||||
|
"c": float64(400),
|
||||||
|
},
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
var metric14, _ = metric.New(
|
||||||
|
"m1",
|
||||||
|
map[string]string{"tag_name": "tag_value1"},
|
||||||
|
map[string]interface{}{
|
||||||
|
"a": float64(24.12),
|
||||||
|
"b": float64(40),
|
||||||
|
},
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
var metric15, _ = metric.New(
|
||||||
|
"m1",
|
||||||
|
map[string]string{"tag_name": "tag_value1"},
|
||||||
|
map[string]interface{}{
|
||||||
|
"a": float64(50.5),
|
||||||
|
"h": float64(1),
|
||||||
|
"u": float64(2.4),
|
||||||
|
},
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
var MetricsSet1 = []telegraf.Metric{metric11, metric12, metric13, metric14, metric15}
|
||||||
|
|
||||||
|
///// Test set 2 /////
|
||||||
|
var metric21, _ = metric.New(
|
||||||
|
"metric1",
|
||||||
|
map[string]string{
|
||||||
|
"id": "1",
|
||||||
|
"tag1": "ONE",
|
||||||
|
"tag2": "FIVE",
|
||||||
|
"tag3": "SIX",
|
||||||
|
"tag4": "EIGHT",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": float64(31.31),
|
||||||
|
"A": float64(95.36),
|
||||||
|
"C": float64(72.41),
|
||||||
|
},
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
var metric22, _ = metric.New(
|
||||||
|
"metric1",
|
||||||
|
map[string]string{
|
||||||
|
"id": "2",
|
||||||
|
"tag1": "TWO",
|
||||||
|
"tag2": "FOUR",
|
||||||
|
"tag3": "THREE",
|
||||||
|
"tag4": "EIGHT",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": float64(59.43),
|
||||||
|
"A": float64(0.6),
|
||||||
|
},
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
var metric23, _ = metric.New(
|
||||||
|
"metric1",
|
||||||
|
map[string]string{
|
||||||
|
"id": "3",
|
||||||
|
"tag1": "TWO",
|
||||||
|
"tag2": "FOUR",
|
||||||
|
"tag3": "SIX",
|
||||||
|
"tag5": "TEN",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": float64(74.18),
|
||||||
|
"A": float64(77.42),
|
||||||
|
"B": float64(60.96),
|
||||||
|
},
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
var metric24, _ = metric.New(
|
||||||
|
"metric2",
|
||||||
|
map[string]string{
|
||||||
|
"id": "4",
|
||||||
|
"tag1": "ONE",
|
||||||
|
"tag2": "FIVE",
|
||||||
|
"tag3": "THREE",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": float64(72),
|
||||||
|
"B": float64(22.1),
|
||||||
|
"C": float64(30.8),
|
||||||
|
},
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
var metric25, _ = metric.New(
|
||||||
|
"metric2",
|
||||||
|
map[string]string{
|
||||||
|
"id": "5",
|
||||||
|
"tag1": "TWO",
|
||||||
|
"tag2": "FOUR",
|
||||||
|
"tag3": "SEVEN",
|
||||||
|
"tag4": "NINE",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": float64(87.92),
|
||||||
|
"B": float64(81.55),
|
||||||
|
"C": float64(45.1),
|
||||||
|
},
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
var metric26, _ = metric.New(
|
||||||
|
"metric2",
|
||||||
|
map[string]string{
|
||||||
|
"id": "6",
|
||||||
|
"tag1": "TWO",
|
||||||
|
"tag2": "FIVE",
|
||||||
|
"tag3": "SEVEN",
|
||||||
|
"tag4": "NINE",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": float64(75.3),
|
||||||
|
"A": float64(29.45),
|
||||||
|
"C": float64(4.86),
|
||||||
|
},
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
var MetricsSet2 = []telegraf.Metric{metric21, metric22, metric23, metric24, metric25, metric26}
|
|
@ -0,0 +1,432 @@
|
||||||
|
package topk
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"math"
|
||||||
|
"sort"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/filter"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
"github.com/influxdata/telegraf/plugins/processors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TopK struct {
|
||||||
|
Period internal.Duration
|
||||||
|
K int
|
||||||
|
GroupBy []string `toml:"group_by"`
|
||||||
|
Fields []string
|
||||||
|
Aggregation string
|
||||||
|
Bottomk bool
|
||||||
|
AddGroupByTag string `toml:"add_groupby_tag"`
|
||||||
|
AddRankFields []string `toml:"add_rank_fields"`
|
||||||
|
AddAggregateFields []string `toml:"add_aggregate_fields"`
|
||||||
|
|
||||||
|
cache map[string][]telegraf.Metric
|
||||||
|
tagsGlobs filter.Filter
|
||||||
|
rankFieldSet map[string]bool
|
||||||
|
aggFieldSet map[string]bool
|
||||||
|
lastAggregation time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func New() *TopK {
|
||||||
|
// Create object
|
||||||
|
topk := TopK{}
|
||||||
|
|
||||||
|
// Setup defaults
|
||||||
|
topk.Period = internal.Duration{Duration: time.Second * time.Duration(10)}
|
||||||
|
topk.K = 10
|
||||||
|
topk.Fields = []string{"value"}
|
||||||
|
topk.Aggregation = "mean"
|
||||||
|
topk.GroupBy = []string{"*"}
|
||||||
|
topk.AddGroupByTag = ""
|
||||||
|
topk.AddRankFields = []string{""}
|
||||||
|
topk.AddAggregateFields = []string{""}
|
||||||
|
|
||||||
|
// Initialize cache
|
||||||
|
topk.Reset()
|
||||||
|
|
||||||
|
return &topk
|
||||||
|
}
|
||||||
|
|
||||||
|
var sampleConfig = `
|
||||||
|
## How many seconds between aggregations
|
||||||
|
# period = 10
|
||||||
|
|
||||||
|
## How many top metrics to return
|
||||||
|
# k = 10
|
||||||
|
|
||||||
|
## Over which tags should the aggregation be done. Globs can be specified, in
|
||||||
|
## which case any tag matching the glob will aggregated over. If set to an
|
||||||
|
## empty list is no aggregation over tags is done
|
||||||
|
# group_by = ['*']
|
||||||
|
|
||||||
|
## Over which fields are the top k are calculated
|
||||||
|
# fields = ["value"]
|
||||||
|
|
||||||
|
## What aggregation to use. Options: sum, mean, min, max
|
||||||
|
# aggregation = "mean"
|
||||||
|
|
||||||
|
## Instead of the top k largest metrics, return the bottom k lowest metrics
|
||||||
|
# bottomk = false
|
||||||
|
|
||||||
|
## The plugin assigns each metric a GroupBy tag generated from its name and
|
||||||
|
## tags. If this setting is different than "" the plugin will add a
|
||||||
|
## tag (which name will be the value of this setting) to each metric with
|
||||||
|
## the value of the calculated GroupBy tag. Useful for debugging
|
||||||
|
# add_groupby_tag = ""
|
||||||
|
|
||||||
|
## These settings provide a way to know the position of each metric in
|
||||||
|
## the top k. The 'add_rank_field' setting allows to specify for which
|
||||||
|
## fields the position is required. If the list is non empty, then a field
|
||||||
|
## will be added to each and every metric for each string present in this
|
||||||
|
## setting. This field will contain the ranking of the group that
|
||||||
|
## the metric belonged to when aggregated over that field.
|
||||||
|
## The name of the field will be set to the name of the aggregation field,
|
||||||
|
## suffixed with the string '_topk_rank'
|
||||||
|
# add_rank_fields = []
|
||||||
|
|
||||||
|
## These settings provide a way to know what values the plugin is generating
|
||||||
|
## when aggregating metrics. The 'add_agregate_field' setting allows to
|
||||||
|
## specify for which fields the final aggregation value is required. If the
|
||||||
|
## list is non empty, then a field will be added to each every metric for
|
||||||
|
## each field present in this setting. This field will contain
|
||||||
|
## the computed aggregation for the group that the metric belonged to when
|
||||||
|
## aggregated over that field.
|
||||||
|
## The name of the field will be set to the name of the aggregation field,
|
||||||
|
## suffixed with the string '_topk_aggregate'
|
||||||
|
# add_aggregate_fields = []
|
||||||
|
`
|
||||||
|
|
||||||
|
type MetricAggregation struct {
|
||||||
|
groupbykey string
|
||||||
|
values map[string]float64
|
||||||
|
}
|
||||||
|
|
||||||
|
func sortMetrics(metrics []MetricAggregation, field string, reverse bool) {
|
||||||
|
less := func(i, j int) bool {
|
||||||
|
iv := metrics[i].values[field]
|
||||||
|
jv := metrics[j].values[field]
|
||||||
|
if iv < jv {
|
||||||
|
return true
|
||||||
|
} else {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if reverse {
|
||||||
|
sort.SliceStable(metrics, less)
|
||||||
|
} else {
|
||||||
|
sort.SliceStable(metrics, func(i, j int) bool { return !less(i, j) })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TopK) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TopK) Reset() {
|
||||||
|
t.cache = make(map[string][]telegraf.Metric)
|
||||||
|
t.lastAggregation = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TopK) Description() string {
|
||||||
|
return "Print all metrics that pass through this filter."
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TopK) generateGroupByKey(m telegraf.Metric) (string, error) {
|
||||||
|
// Create the filter.Filter objects if they have not been created
|
||||||
|
if t.tagsGlobs == nil && len(t.GroupBy) > 0 {
|
||||||
|
var err error
|
||||||
|
t.tagsGlobs, err = filter.Compile(t.GroupBy)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("could not compile pattern: %v %v", t.GroupBy, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
groupkey := m.Name() + "&"
|
||||||
|
|
||||||
|
if len(t.GroupBy) > 0 {
|
||||||
|
tags := m.Tags()
|
||||||
|
keys := make([]string, 0, len(tags))
|
||||||
|
for tag, value := range tags {
|
||||||
|
if t.tagsGlobs.Match(tag) {
|
||||||
|
keys = append(keys, tag+"="+value+"&")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Sorting the selected tags is necessary because dictionaries
|
||||||
|
// do not ensure any specific or deterministic ordering
|
||||||
|
sort.SliceStable(keys, func(i, j int) bool { return keys[i] < keys[j] })
|
||||||
|
for _, str := range keys {
|
||||||
|
groupkey += str
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return groupkey, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TopK) groupBy(m telegraf.Metric) {
|
||||||
|
// Generate the metric group key
|
||||||
|
groupkey, err := t.generateGroupByKey(m)
|
||||||
|
if err != nil {
|
||||||
|
// If we could not generate the groupkey, fail hard
|
||||||
|
// by dropping this and all subsequent metrics
|
||||||
|
log.Printf("E! [processors.topk]: could not generate group key: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize the key with an empty list if necessary
|
||||||
|
if _, ok := t.cache[groupkey]; !ok {
|
||||||
|
t.cache[groupkey] = make([]telegraf.Metric, 0, 10)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Append the metric to the corresponding key list
|
||||||
|
t.cache[groupkey] = append(t.cache[groupkey], m)
|
||||||
|
|
||||||
|
// Add the generated groupby key tag to the metric if requested
|
||||||
|
if t.AddGroupByTag != "" {
|
||||||
|
m.AddTag(t.AddGroupByTag, groupkey)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TopK) Apply(in ...telegraf.Metric) []telegraf.Metric {
|
||||||
|
// Init any internal datastructures that are not initialized yet
|
||||||
|
if t.rankFieldSet == nil {
|
||||||
|
t.rankFieldSet = make(map[string]bool)
|
||||||
|
for _, f := range t.AddRankFields {
|
||||||
|
t.rankFieldSet[f] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if t.aggFieldSet == nil {
|
||||||
|
t.aggFieldSet = make(map[string]bool)
|
||||||
|
for _, f := range t.AddAggregateFields {
|
||||||
|
t.aggFieldSet[f] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the metrics received to our internal cache
|
||||||
|
for _, m := range in {
|
||||||
|
|
||||||
|
// Check if the metric has any of the fields over wich we are aggregating
|
||||||
|
hasField := false
|
||||||
|
for _, f := range t.Fields {
|
||||||
|
if m.HasField(f) {
|
||||||
|
hasField = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !hasField {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the metric to the internal cache
|
||||||
|
t.groupBy(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If enough time has passed
|
||||||
|
elapsed := time.Since(t.lastAggregation)
|
||||||
|
if elapsed >= t.Period.Duration {
|
||||||
|
return t.push()
|
||||||
|
}
|
||||||
|
|
||||||
|
return []telegraf.Metric{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func min(a, b int) int {
|
||||||
|
if a > b {
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
|
||||||
|
func convert(in interface{}) (float64, bool) {
|
||||||
|
switch v := in.(type) {
|
||||||
|
case float64:
|
||||||
|
return v, true
|
||||||
|
case int64:
|
||||||
|
return float64(v), true
|
||||||
|
case uint64:
|
||||||
|
return float64(v), true
|
||||||
|
default:
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TopK) push() []telegraf.Metric {
|
||||||
|
// Generate aggregations list using the selected fields
|
||||||
|
aggregations := make([]MetricAggregation, 0, 100)
|
||||||
|
aggregator, err := t.getAggregationFunction(t.Aggregation)
|
||||||
|
if err != nil {
|
||||||
|
// If we could not generate the aggregation
|
||||||
|
// function, fail hard by dropping all metrics
|
||||||
|
log.Printf("E! [processors.topk]: %v", err)
|
||||||
|
return []telegraf.Metric{}
|
||||||
|
}
|
||||||
|
for k, ms := range t.cache {
|
||||||
|
aggregations = append(aggregations, MetricAggregation{groupbykey: k, values: aggregator(ms, t.Fields)})
|
||||||
|
}
|
||||||
|
|
||||||
|
// The return value that will hold the returned metrics
|
||||||
|
var ret []telegraf.Metric = make([]telegraf.Metric, 0, 0)
|
||||||
|
|
||||||
|
// Get the top K metrics for each field and add them to the return value
|
||||||
|
addedKeys := make(map[string]bool)
|
||||||
|
groupTag := t.AddGroupByTag
|
||||||
|
for _, field := range t.Fields {
|
||||||
|
|
||||||
|
// Sort the aggregations
|
||||||
|
sortMetrics(aggregations, field, t.Bottomk)
|
||||||
|
|
||||||
|
// Create a one dimentional list with the top K metrics of each key
|
||||||
|
for i, ag := range aggregations[0:min(t.K, len(aggregations))] {
|
||||||
|
|
||||||
|
// Check whether of not we need to add fields of tags to the selected metrics
|
||||||
|
if len(t.aggFieldSet) != 0 || len(t.rankFieldSet) != 0 || groupTag != "" {
|
||||||
|
for _, m := range t.cache[ag.groupbykey] {
|
||||||
|
|
||||||
|
// Add the aggregation final value if requested
|
||||||
|
_, addAggField := t.aggFieldSet[field]
|
||||||
|
if addAggField && m.HasField(field) {
|
||||||
|
m.AddField(field+"_topk_aggregate", ag.values[field])
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the rank relative to the current field if requested
|
||||||
|
_, addRankField := t.rankFieldSet[field]
|
||||||
|
if addRankField && m.HasField(field) {
|
||||||
|
m.AddField(field+"_topk_rank", i+1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add metrics if we have not already appended them to the return value
|
||||||
|
_, ok := addedKeys[ag.groupbykey]
|
||||||
|
if !ok {
|
||||||
|
ret = append(ret, t.cache[ag.groupbykey]...)
|
||||||
|
addedKeys[ag.groupbykey] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Reset()
|
||||||
|
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
|
// Function that generates the aggregation functions
|
||||||
|
func (t *TopK) getAggregationFunction(aggOperation string) (func([]telegraf.Metric, []string) map[string]float64, error) {
|
||||||
|
|
||||||
|
// This is a function aggregates a set of metrics using a given aggregation function
|
||||||
|
var aggregator = func(ms []telegraf.Metric, fields []string, f func(map[string]float64, float64, string)) map[string]float64 {
|
||||||
|
agg := make(map[string]float64)
|
||||||
|
// Compute the sums of the selected fields over all the measurements collected for this metric
|
||||||
|
for _, m := range ms {
|
||||||
|
for _, field := range fields {
|
||||||
|
fieldVal, ok := m.Fields()[field]
|
||||||
|
if !ok {
|
||||||
|
continue // Skip if this metric doesn't have this field set
|
||||||
|
}
|
||||||
|
val, ok := convert(fieldVal)
|
||||||
|
if !ok {
|
||||||
|
log.Printf("Cannot convert value '%s' from metric '%s' with tags '%s'",
|
||||||
|
m.Fields()[field], m.Name(), m.Tags())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
f(agg, val, field)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return agg
|
||||||
|
}
|
||||||
|
|
||||||
|
switch aggOperation {
|
||||||
|
case "sum":
|
||||||
|
return func(ms []telegraf.Metric, fields []string) map[string]float64 {
|
||||||
|
sum := func(agg map[string]float64, val float64, field string) {
|
||||||
|
agg[field] += val
|
||||||
|
}
|
||||||
|
return aggregator(ms, fields, sum)
|
||||||
|
}, nil
|
||||||
|
|
||||||
|
case "min":
|
||||||
|
return func(ms []telegraf.Metric, fields []string) map[string]float64 {
|
||||||
|
min := func(agg map[string]float64, val float64, field string) {
|
||||||
|
// If this field has not been set, set it to the maximum float64
|
||||||
|
_, ok := agg[field]
|
||||||
|
if !ok {
|
||||||
|
agg[field] = math.MaxFloat64
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we've found a new minimum
|
||||||
|
if agg[field] > val {
|
||||||
|
agg[field] = val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return aggregator(ms, fields, min)
|
||||||
|
}, nil
|
||||||
|
|
||||||
|
case "max":
|
||||||
|
return func(ms []telegraf.Metric, fields []string) map[string]float64 {
|
||||||
|
max := func(agg map[string]float64, val float64, field string) {
|
||||||
|
// If this field has not been set, set it to the minimum float64
|
||||||
|
_, ok := agg[field]
|
||||||
|
if !ok {
|
||||||
|
agg[field] = -math.MaxFloat64
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we've found a new maximum
|
||||||
|
if agg[field] < val {
|
||||||
|
agg[field] = val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return aggregator(ms, fields, max)
|
||||||
|
}, nil
|
||||||
|
|
||||||
|
case "mean":
|
||||||
|
return func(ms []telegraf.Metric, fields []string) map[string]float64 {
|
||||||
|
mean := make(map[string]float64)
|
||||||
|
meanCounters := make(map[string]float64)
|
||||||
|
// Compute the sums of the selected fields over all the measurements collected for this metric
|
||||||
|
for _, m := range ms {
|
||||||
|
for _, field := range fields {
|
||||||
|
fieldVal, ok := m.Fields()[field]
|
||||||
|
if !ok {
|
||||||
|
continue // Skip if this metric doesn't have this field set
|
||||||
|
}
|
||||||
|
val, ok := convert(fieldVal)
|
||||||
|
if !ok {
|
||||||
|
log.Printf("Cannot convert value '%s' from metric '%s' with tags '%s'",
|
||||||
|
m.Fields()[field], m.Name(), m.Tags())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
mean[field] += val
|
||||||
|
meanCounters[field] += 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Divide by the number of recorded measurements collected for every field
|
||||||
|
noMeasurementsFound := true // Canary to check if no field with values was found, so we can return nil
|
||||||
|
for k, _ := range mean {
|
||||||
|
if meanCounters[k] == 0 {
|
||||||
|
mean[k] = 0
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
mean[k] = mean[k] / meanCounters[k]
|
||||||
|
noMeasurementsFound = noMeasurementsFound && false
|
||||||
|
}
|
||||||
|
|
||||||
|
if noMeasurementsFound {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return mean
|
||||||
|
}, nil
|
||||||
|
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("Unknown aggregation function '%s'. No metrics will be processed", t.Aggregation)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
processors.Add("topk", func() telegraf.Processor {
|
||||||
|
return New()
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,532 @@
|
||||||
|
package topk
|
||||||
|
|
||||||
|
import (
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Key, value pair that represents a telegraf.Metric Field
|
||||||
|
type field struct {
|
||||||
|
key string
|
||||||
|
val interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func fieldList(fields ...field) []field {
|
||||||
|
return fields
|
||||||
|
}
|
||||||
|
|
||||||
|
// Key, value pair that represents a telegraf.Metric Tag
|
||||||
|
type tag struct {
|
||||||
|
key string
|
||||||
|
val string
|
||||||
|
}
|
||||||
|
|
||||||
|
func tagList(tags ...tag) []tag {
|
||||||
|
return tags
|
||||||
|
}
|
||||||
|
|
||||||
|
// Abstraction of a change in a single metric
|
||||||
|
type metricChange struct {
|
||||||
|
newFields []field // Fieldsthat should be added to the metric
|
||||||
|
newTags []tag // Tags that should be added to the metric
|
||||||
|
|
||||||
|
runHash bool // Sometimes the metrics' HashID must be run so reflect.DeepEqual works
|
||||||
|
// This happens because telegraf.Metric mantains an internal cache of
|
||||||
|
// its hash value that is set when HashID() is called for the first time
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate a new set of metrics from a set of changes. This is used to generate an answer which will be
|
||||||
|
// compare against the output of the processor
|
||||||
|
// NOTE: A `changeSet` is a map where the keys are the indices of the metrics to keep, and the values
|
||||||
|
// are list of new tags and fields to be added to the metric in that index.
|
||||||
|
// THE ORDERING OF THE NEW TAGS AND FIELDS MATTERS. When using reflect.DeepEqual to compare metrics,
|
||||||
|
// comparing metrics that have the same fields/tags added in different orders will return false, although
|
||||||
|
// they are semantically equal.
|
||||||
|
// Therefore the fields and tags must be in the same order that the processor would add them
|
||||||
|
func generateAns(input []telegraf.Metric, changeSet map[int]metricChange) []telegraf.Metric {
|
||||||
|
answer := []telegraf.Metric{}
|
||||||
|
|
||||||
|
// For every input metric, we check if there is a change we need to apply
|
||||||
|
// If there is no change for a given input metric, the metric is dropped
|
||||||
|
for i, metric := range input {
|
||||||
|
change, ok := changeSet[i]
|
||||||
|
if ok {
|
||||||
|
// Deep copy the metric
|
||||||
|
newMetric := metric.Copy()
|
||||||
|
|
||||||
|
// Add new fields
|
||||||
|
if change.newFields != nil {
|
||||||
|
for _, p := range change.newFields {
|
||||||
|
newMetric.AddField(p.key, p.val)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add new tags
|
||||||
|
if change.newTags != nil {
|
||||||
|
for _, p := range change.newTags {
|
||||||
|
newMetric.AddTag(p.key, p.val)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the hash function if required
|
||||||
|
if change.runHash {
|
||||||
|
newMetric.HashID()
|
||||||
|
}
|
||||||
|
|
||||||
|
answer = append(answer, newMetric)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return answer
|
||||||
|
}
|
||||||
|
|
||||||
|
func deepCopy(a []telegraf.Metric) []telegraf.Metric {
|
||||||
|
ret := make([]telegraf.Metric, 0, len(a))
|
||||||
|
for _, m := range a {
|
||||||
|
ret = append(ret, m.Copy())
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
|
func belongs(m telegraf.Metric, ms []telegraf.Metric) bool {
|
||||||
|
for _, i := range ms {
|
||||||
|
if reflect.DeepEqual(i, m) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func subSet(a []telegraf.Metric, b []telegraf.Metric) bool {
|
||||||
|
subset := true
|
||||||
|
for _, m := range a {
|
||||||
|
if !belongs(m, b) {
|
||||||
|
subset = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return subset
|
||||||
|
}
|
||||||
|
|
||||||
|
func equalSets(l1 []telegraf.Metric, l2 []telegraf.Metric) bool {
|
||||||
|
return subSet(l1, l2) && subSet(l2, l1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func createDuration(t int) internal.Duration {
|
||||||
|
return internal.Duration{Duration: time.Second * time.Duration(t)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func runAndCompare(topk *TopK, metrics []telegraf.Metric, answer []telegraf.Metric, testID string, t *testing.T) {
|
||||||
|
// Sleep for `period`, otherwise the processor will only
|
||||||
|
// cache the metrics, but it will not process them
|
||||||
|
time.Sleep(topk.Period.Duration)
|
||||||
|
|
||||||
|
// Run the processor
|
||||||
|
ret := topk.Apply(metrics...)
|
||||||
|
topk.Reset()
|
||||||
|
|
||||||
|
// The returned set mut be equal to the answer set
|
||||||
|
if !equalSets(ret, answer) {
|
||||||
|
t.Error("\nExpected metrics for", testID, ":\n",
|
||||||
|
answer, "\nReturned metrics:\n", ret)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Smoke tests
|
||||||
|
func TestTopkAggregatorsSmokeTests(t *testing.T) {
|
||||||
|
|
||||||
|
// Build the processor
|
||||||
|
var topk TopK
|
||||||
|
topk = *New()
|
||||||
|
topk.Period = createDuration(1)
|
||||||
|
topk.Fields = []string{"a"}
|
||||||
|
topk.GroupBy = []string{"tag_name"}
|
||||||
|
|
||||||
|
aggregators := []string{"mean", "sum", "max", "min"}
|
||||||
|
|
||||||
|
//The answer is equal to the original set for these particual scenarios
|
||||||
|
input := MetricsSet1
|
||||||
|
answer := MetricsSet1
|
||||||
|
|
||||||
|
for _, ag := range aggregators {
|
||||||
|
topk.Aggregation = ag
|
||||||
|
|
||||||
|
runAndCompare(&topk, input, answer, "SmokeAggregator_"+ag, t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddAggregateFields + Mean aggregator
|
||||||
|
func TestTopkMeanAddAggregateFields(t *testing.T) {
|
||||||
|
|
||||||
|
// Build the processor
|
||||||
|
var topk TopK
|
||||||
|
topk = *New()
|
||||||
|
topk.Period = createDuration(1)
|
||||||
|
topk.Aggregation = "mean"
|
||||||
|
topk.AddAggregateFields = []string{"a"}
|
||||||
|
topk.Fields = []string{"a"}
|
||||||
|
topk.GroupBy = []string{"tag_name"}
|
||||||
|
|
||||||
|
// Get the input
|
||||||
|
input := deepCopy(MetricsSet1)
|
||||||
|
|
||||||
|
// Generate the answer
|
||||||
|
chng := fieldList(field{"a_topk_aggregate", float64(28.044)})
|
||||||
|
changeSet := map[int]metricChange{
|
||||||
|
0: metricChange{newFields: chng},
|
||||||
|
1: metricChange{newFields: chng},
|
||||||
|
2: metricChange{newFields: chng},
|
||||||
|
3: metricChange{newFields: chng},
|
||||||
|
4: metricChange{newFields: chng},
|
||||||
|
}
|
||||||
|
answer := generateAns(input, changeSet)
|
||||||
|
|
||||||
|
// Run the test
|
||||||
|
runAndCompare(&topk, input, answer, "MeanAddAggregateFields test", t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddAggregateFields + Sum aggregator
|
||||||
|
func TestTopkSumAddAggregateFields(t *testing.T) {
|
||||||
|
|
||||||
|
// Build the processor
|
||||||
|
var topk TopK
|
||||||
|
topk = *New()
|
||||||
|
topk.Period = createDuration(1)
|
||||||
|
topk.Aggregation = "sum"
|
||||||
|
topk.AddAggregateFields = []string{"a"}
|
||||||
|
topk.Fields = []string{"a"}
|
||||||
|
topk.GroupBy = []string{"tag_name"}
|
||||||
|
|
||||||
|
// Get the input
|
||||||
|
input := deepCopy(MetricsSet1)
|
||||||
|
|
||||||
|
// Generate the answer
|
||||||
|
chng := fieldList(field{"a_topk_aggregate", float64(140.22)})
|
||||||
|
changeSet := map[int]metricChange{
|
||||||
|
0: metricChange{newFields: chng},
|
||||||
|
1: metricChange{newFields: chng},
|
||||||
|
2: metricChange{newFields: chng},
|
||||||
|
3: metricChange{newFields: chng},
|
||||||
|
4: metricChange{newFields: chng},
|
||||||
|
}
|
||||||
|
answer := generateAns(input, changeSet)
|
||||||
|
|
||||||
|
// Run the test
|
||||||
|
runAndCompare(&topk, input, answer, "SumAddAggregateFields test", t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddAggregateFields + Max aggregator
|
||||||
|
func TestTopkMaxAddAggregateFields(t *testing.T) {
|
||||||
|
|
||||||
|
// Build the processor
|
||||||
|
var topk TopK
|
||||||
|
topk = *New()
|
||||||
|
topk.Period = createDuration(1)
|
||||||
|
topk.Aggregation = "max"
|
||||||
|
topk.AddAggregateFields = []string{"a"}
|
||||||
|
topk.Fields = []string{"a"}
|
||||||
|
topk.GroupBy = []string{"tag_name"}
|
||||||
|
|
||||||
|
// Get the input
|
||||||
|
input := deepCopy(MetricsSet1)
|
||||||
|
|
||||||
|
// Generate the answer
|
||||||
|
chng := fieldList(field{"a_topk_aggregate", float64(50.5)})
|
||||||
|
changeSet := map[int]metricChange{
|
||||||
|
0: metricChange{newFields: chng},
|
||||||
|
1: metricChange{newFields: chng},
|
||||||
|
2: metricChange{newFields: chng},
|
||||||
|
3: metricChange{newFields: chng},
|
||||||
|
4: metricChange{newFields: chng},
|
||||||
|
}
|
||||||
|
answer := generateAns(input, changeSet)
|
||||||
|
|
||||||
|
// Run the test
|
||||||
|
runAndCompare(&topk, input, answer, "MaxAddAggregateFields test", t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddAggregateFields + Min aggregator
|
||||||
|
func TestTopkMinAddAggregateFields(t *testing.T) {
|
||||||
|
|
||||||
|
// Build the processor
|
||||||
|
var topk TopK
|
||||||
|
topk = *New()
|
||||||
|
topk.Period = createDuration(1)
|
||||||
|
topk.Aggregation = "min"
|
||||||
|
topk.AddAggregateFields = []string{"a"}
|
||||||
|
topk.Fields = []string{"a"}
|
||||||
|
topk.GroupBy = []string{"tag_name"}
|
||||||
|
|
||||||
|
// Get the input
|
||||||
|
input := deepCopy(MetricsSet1)
|
||||||
|
|
||||||
|
// Generate the answer
|
||||||
|
chng := fieldList(field{"a_topk_aggregate", float64(0.3)})
|
||||||
|
changeSet := map[int]metricChange{
|
||||||
|
0: metricChange{newFields: chng},
|
||||||
|
1: metricChange{newFields: chng},
|
||||||
|
2: metricChange{newFields: chng},
|
||||||
|
3: metricChange{newFields: chng},
|
||||||
|
4: metricChange{newFields: chng},
|
||||||
|
}
|
||||||
|
answer := generateAns(input, changeSet)
|
||||||
|
|
||||||
|
// Run the test
|
||||||
|
runAndCompare(&topk, input, answer, "MinAddAggregateFields test", t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GroupBy
|
||||||
|
func TestTopkGroupby1(t *testing.T) {
|
||||||
|
|
||||||
|
// Build the processor
|
||||||
|
var topk TopK
|
||||||
|
topk = *New()
|
||||||
|
topk.Period = createDuration(1)
|
||||||
|
topk.K = 3
|
||||||
|
topk.Aggregation = "sum"
|
||||||
|
topk.AddAggregateFields = []string{"value"}
|
||||||
|
topk.GroupBy = []string{"tag[13]"}
|
||||||
|
|
||||||
|
// Get the input
|
||||||
|
input := deepCopy(MetricsSet2)
|
||||||
|
|
||||||
|
// Generate the answer
|
||||||
|
changeSet := map[int]metricChange{
|
||||||
|
2: metricChange{newFields: fieldList(field{"value_topk_aggregate", float64(74.18)})},
|
||||||
|
3: metricChange{newFields: fieldList(field{"value_topk_aggregate", float64(72)})},
|
||||||
|
4: metricChange{newFields: fieldList(field{"value_topk_aggregate", float64(163.22)})},
|
||||||
|
5: metricChange{newFields: fieldList(field{"value_topk_aggregate", float64(163.22)})},
|
||||||
|
}
|
||||||
|
answer := generateAns(input, changeSet)
|
||||||
|
|
||||||
|
// Run the test
|
||||||
|
runAndCompare(&topk, input, answer, "GroupBy test 1", t)
|
||||||
|
}
|
||||||
|
func TestTopkGroupby2(t *testing.T) {
|
||||||
|
|
||||||
|
// Build the processor
|
||||||
|
var topk TopK
|
||||||
|
topk = *New()
|
||||||
|
topk.Period = createDuration(1)
|
||||||
|
topk.K = 3
|
||||||
|
topk.Aggregation = "mean"
|
||||||
|
topk.AddAggregateFields = []string{"value"}
|
||||||
|
topk.GroupBy = []string{"tag1"}
|
||||||
|
|
||||||
|
// Get the input
|
||||||
|
input := deepCopy(MetricsSet2)
|
||||||
|
|
||||||
|
// Generate the answer
|
||||||
|
chng1 := fieldList(field{"value_topk_aggregate", float64(66.805)})
|
||||||
|
chng2 := fieldList(field{"value_topk_aggregate", float64(72)})
|
||||||
|
chng3 := fieldList(field{"value_topk_aggregate", float64(81.61)})
|
||||||
|
changeSet := map[int]metricChange{
|
||||||
|
1: metricChange{newFields: chng1},
|
||||||
|
2: metricChange{newFields: chng1},
|
||||||
|
3: metricChange{newFields: chng2},
|
||||||
|
4: metricChange{newFields: chng3},
|
||||||
|
5: metricChange{newFields: chng3},
|
||||||
|
}
|
||||||
|
answer := generateAns(input, changeSet)
|
||||||
|
|
||||||
|
// Run the test
|
||||||
|
runAndCompare(&topk, input, answer, "GroupBy test 2", t)
|
||||||
|
}
|
||||||
|
func TestTopkGroupby3(t *testing.T) {
|
||||||
|
|
||||||
|
// Build the processor
|
||||||
|
var topk TopK
|
||||||
|
topk = *New()
|
||||||
|
topk.Period = createDuration(1)
|
||||||
|
topk.K = 1
|
||||||
|
topk.Aggregation = "min"
|
||||||
|
topk.AddAggregateFields = []string{"value"}
|
||||||
|
topk.GroupBy = []string{"tag4"}
|
||||||
|
|
||||||
|
// Get the input
|
||||||
|
input := deepCopy(MetricsSet2)
|
||||||
|
|
||||||
|
// Generate the answer
|
||||||
|
chng := fieldList(field{"value_topk_aggregate", float64(75.3)})
|
||||||
|
changeSet := map[int]metricChange{
|
||||||
|
4: metricChange{newFields: chng},
|
||||||
|
5: metricChange{newFields: chng},
|
||||||
|
}
|
||||||
|
answer := generateAns(input, changeSet)
|
||||||
|
|
||||||
|
// Run the test
|
||||||
|
runAndCompare(&topk, input, answer, "GroupBy test 3", t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GroupBy + Fields
|
||||||
|
func TestTopkGroupbyFields1(t *testing.T) {
|
||||||
|
|
||||||
|
// Build the processor
|
||||||
|
var topk TopK
|
||||||
|
topk = *New()
|
||||||
|
topk.Period = createDuration(1)
|
||||||
|
topk.K = 4 // This settings generate less than 3 groups
|
||||||
|
topk.Aggregation = "mean"
|
||||||
|
topk.AddAggregateFields = []string{"A"}
|
||||||
|
topk.GroupBy = []string{"tag1", "tag2"}
|
||||||
|
topk.Fields = []string{"A"}
|
||||||
|
|
||||||
|
// Get the input
|
||||||
|
input := deepCopy(MetricsSet2)
|
||||||
|
|
||||||
|
// Generate the answer
|
||||||
|
changeSet := map[int]metricChange{
|
||||||
|
0: metricChange{newFields: fieldList(field{"A_topk_aggregate", float64(95.36)})},
|
||||||
|
1: metricChange{newFields: fieldList(field{"A_topk_aggregate", float64(39.01)})},
|
||||||
|
2: metricChange{newFields: fieldList(field{"A_topk_aggregate", float64(39.01)})},
|
||||||
|
5: metricChange{newFields: fieldList(field{"A_topk_aggregate", float64(29.45)})},
|
||||||
|
}
|
||||||
|
answer := generateAns(input, changeSet)
|
||||||
|
|
||||||
|
// Run the test
|
||||||
|
runAndCompare(&topk, input, answer, "GroupBy Fields test 1", t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTopkGroupbyFields2(t *testing.T) {
|
||||||
|
|
||||||
|
// Build the processor
|
||||||
|
var topk TopK
|
||||||
|
topk = *New()
|
||||||
|
topk.Period = createDuration(1)
|
||||||
|
topk.K = 2
|
||||||
|
topk.Aggregation = "sum"
|
||||||
|
topk.AddAggregateFields = []string{"B", "C"}
|
||||||
|
topk.GroupBy = []string{"tag1", "tag3"}
|
||||||
|
topk.Fields = []string{"B", "C"}
|
||||||
|
|
||||||
|
// Get the input
|
||||||
|
input := deepCopy(MetricsSet2)
|
||||||
|
|
||||||
|
// Generate the answer
|
||||||
|
changeSet := map[int]metricChange{
|
||||||
|
0: metricChange{newFields: fieldList(field{"C_topk_aggregate", float64(72.41)})},
|
||||||
|
2: metricChange{newFields: fieldList(field{"B_topk_aggregate", float64(60.96)})},
|
||||||
|
4: metricChange{newFields: fieldList(field{"B_topk_aggregate", float64(81.55)}, field{"C_topk_aggregate", float64(49.96)})},
|
||||||
|
5: metricChange{newFields: fieldList(field{"C_topk_aggregate", float64(49.96)})},
|
||||||
|
}
|
||||||
|
answer := generateAns(input, changeSet)
|
||||||
|
|
||||||
|
// Run the test
|
||||||
|
runAndCompare(&topk, input, answer, "GroupBy Fields test 2", t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GroupBy metric name
|
||||||
|
func TestTopkGroupbyMetricName1(t *testing.T) {
|
||||||
|
|
||||||
|
// Build the processor
|
||||||
|
var topk TopK
|
||||||
|
topk = *New()
|
||||||
|
topk.Period = createDuration(1)
|
||||||
|
topk.K = 1
|
||||||
|
topk.Aggregation = "sum"
|
||||||
|
topk.AddAggregateFields = []string{"value"}
|
||||||
|
topk.GroupBy = []string{}
|
||||||
|
|
||||||
|
// Get the input
|
||||||
|
input := deepCopy(MetricsSet2)
|
||||||
|
|
||||||
|
// Generate the answer
|
||||||
|
chng := fieldList(field{"value_topk_aggregate", float64(235.22000000000003)})
|
||||||
|
changeSet := map[int]metricChange{
|
||||||
|
3: metricChange{newFields: chng},
|
||||||
|
4: metricChange{newFields: chng},
|
||||||
|
5: metricChange{newFields: chng},
|
||||||
|
}
|
||||||
|
answer := generateAns(input, changeSet)
|
||||||
|
|
||||||
|
// Run the test
|
||||||
|
runAndCompare(&topk, input, answer, "GroupBy by metric name test 1", t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTopkGroupbyMetricName2(t *testing.T) {
|
||||||
|
|
||||||
|
// Build the processor
|
||||||
|
var topk TopK
|
||||||
|
topk = *New()
|
||||||
|
topk.Period = createDuration(1)
|
||||||
|
topk.K = 2
|
||||||
|
topk.Aggregation = "sum"
|
||||||
|
topk.AddAggregateFields = []string{"A", "value"}
|
||||||
|
topk.GroupBy = []string{"tag[12]"}
|
||||||
|
topk.Fields = []string{"A", "value"}
|
||||||
|
|
||||||
|
// Get the input
|
||||||
|
input := deepCopy(MetricsSet2)
|
||||||
|
|
||||||
|
// Generate the answer
|
||||||
|
changeSet := map[int]metricChange{
|
||||||
|
0: metricChange{newFields: fieldList(field{"A_topk_aggregate", float64(95.36)})},
|
||||||
|
1: metricChange{newFields: fieldList(field{"A_topk_aggregate", float64(78.02)}, field{"value_topk_aggregate", float64(133.61)})},
|
||||||
|
2: metricChange{newFields: fieldList(field{"A_topk_aggregate", float64(78.02)}, field{"value_topk_aggregate", float64(133.61)})},
|
||||||
|
4: metricChange{newFields: fieldList(field{"value_topk_aggregate", float64(87.92)})},
|
||||||
|
}
|
||||||
|
answer := generateAns(input, changeSet)
|
||||||
|
|
||||||
|
// Run the test
|
||||||
|
runAndCompare(&topk, input, answer, "GroupBy by metric name test 2", t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// BottomK
|
||||||
|
func TestTopkBottomk(t *testing.T) {
|
||||||
|
|
||||||
|
// Build the processor
|
||||||
|
var topk TopK
|
||||||
|
topk = *New()
|
||||||
|
topk.Period = createDuration(1)
|
||||||
|
topk.K = 3
|
||||||
|
topk.Aggregation = "sum"
|
||||||
|
topk.GroupBy = []string{"tag1", "tag3"}
|
||||||
|
topk.Bottomk = true
|
||||||
|
|
||||||
|
// Get the input
|
||||||
|
input := deepCopy(MetricsSet2)
|
||||||
|
|
||||||
|
// Generate the answer
|
||||||
|
changeSet := map[int]metricChange{
|
||||||
|
0: metricChange{},
|
||||||
|
1: metricChange{},
|
||||||
|
3: metricChange{},
|
||||||
|
}
|
||||||
|
answer := generateAns(input, changeSet)
|
||||||
|
|
||||||
|
// Run the test
|
||||||
|
runAndCompare(&topk, input, answer, "Bottom k test", t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GroupByKeyTag
|
||||||
|
func TestTopkGroupByKeyTag(t *testing.T) {
|
||||||
|
|
||||||
|
// Build the processor
|
||||||
|
var topk TopK
|
||||||
|
topk = *New()
|
||||||
|
topk.Period = createDuration(1)
|
||||||
|
topk.K = 3
|
||||||
|
topk.Aggregation = "sum"
|
||||||
|
topk.GroupBy = []string{"tag1", "tag3"}
|
||||||
|
topk.AddGroupByTag = "gbt"
|
||||||
|
|
||||||
|
// Get the input
|
||||||
|
input := deepCopy(MetricsSet2)
|
||||||
|
|
||||||
|
// Generate the answer
|
||||||
|
changeSet := map[int]metricChange{
|
||||||
|
2: metricChange{newTags: tagList(tag{"gbt", "metric1&tag1=TWO&tag3=SIX&"})},
|
||||||
|
3: metricChange{newTags: tagList(tag{"gbt", "metric2&tag1=ONE&tag3=THREE&"})},
|
||||||
|
4: metricChange{newTags: tagList(tag{"gbt", "metric2&tag1=TWO&tag3=SEVEN&"})},
|
||||||
|
5: metricChange{newTags: tagList(tag{"gbt", "metric2&tag1=TWO&tag3=SEVEN&"})},
|
||||||
|
}
|
||||||
|
answer := generateAns(input, changeSet)
|
||||||
|
|
||||||
|
// Run the test
|
||||||
|
runAndCompare(&topk, input, answer, "GroupByKeyTag test", t)
|
||||||
|
}
|
Loading…
Reference in New Issue