Add topk processor plugin (#4096)

This commit is contained in:
Germán Jaber 2018-05-04 20:40:05 -05:00 committed by Daniel Nelson
parent dfa23e7a76
commit 68743825b8
5 changed files with 1202 additions and 0 deletions

View File

@ -3,4 +3,5 @@ package all
import ( import (
_ "github.com/influxdata/telegraf/plugins/processors/override" _ "github.com/influxdata/telegraf/plugins/processors/override"
_ "github.com/influxdata/telegraf/plugins/processors/printer" _ "github.com/influxdata/telegraf/plugins/processors/printer"
_ "github.com/influxdata/telegraf/plugins/processors/topk"
) )

View File

@ -0,0 +1,74 @@
# TopK Processor Plugin
The TopK processor plugin is a filter designed to get the top series over a period of time. It can be tweaked to do its top k computation over a period of time, so spikes can be smoothed out.
This processor goes through these steps when processing a batch of metrics:
1. Groups metrics in buckets using their tags and name as key
2. Aggregates each of the selected fields for each bucket by the selected aggregation function (sum, mean, etc)
3. Orders the buckets by one of the generated aggregations, returns all metrics in the top `K` buckets, then reorders the buckets by the next of the generated aggregations, returns all metrics in the top `K` buckets, etc, etc, etc, until it runs out of fields.
The plugin makes sure not to duplicate metrics
Note that depending on the amount of metrics on each computed bucket, more than `K` metrics may be returned
### Configuration:
```toml
[[processors.topk]]
## How many seconds between aggregations
# period = 10
## How many top metrics to return
# k = 10
## Over which tags should the aggregation be done. Globs can be specified, in
## which case any tag matching the glob will aggregated over. If set to an
## empty list is no aggregation over tags is done
# group_by = ['*']
## Over which fields are the top k are calculated
# fields = ["value"]
## What aggregation to use. Options: sum, mean, min, max
# aggregation = "mean"
## Instead of the top k largest metrics, return the bottom k lowest metrics
# bottomk = false
## The plugin assigns each metric a GroupBy tag generated from its name and
## tags. If this setting is different than "" the plugin will add a
## tag (which name will be the value of this setting) to each metric with
## the value of the calculated GroupBy tag. Useful for debugging
# add_groupby_tag = ""
## These settings provide a way to know the position of each metric in
## the top k. The 'add_rank_field' setting allows to specify for which
## fields the position is required. If the list is non empty, then a field
## will be added to each and every metric for each string present in this
## setting. This field will contain the ranking of the group that
## the metric belonged to when aggregated over that field.
## The name of the field will be set to the name of the aggregation field,
## suffixed with the string '_topk_rank'
# add_rank_fields = []
## These settings provide a way to know what values the plugin is generating
## when aggregating metrics. The 'add_agregate_field' setting allows to
## specify for which fields the final aggregation value is required. If the
## list is non empty, then a field will be added to each every metric for
## each field present in this setting. This field will contain
## the computed aggregation for the group that the metric belonged to when
## aggregated over that field.
## The name of the field will be set to the name of the aggregation field,
## suffixed with the string '_topk_aggregate'
# add_aggregate_fields = []
```
### Tags:
This processor does not add tags by default. But the setting `add_groupby_tag` will add a tag if set to anything other than ""
### Fields:
This processor does not add fields by default. But the settings `add_rank_fields` and `add_aggregation_fields` will add one or several fields if set to anything other than ""

View File

@ -0,0 +1,163 @@
package topk
import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"time"
)
///// Test set 1 /////
var metric11, _ = metric.New(
"m1",
map[string]string{"tag_name": "tag_value1"},
map[string]interface{}{
"a": float64(15.3),
"b": float64(40),
},
time.Now(),
)
var metric12, _ = metric.New(
"m1",
map[string]string{"tag_name": "tag_value1"},
map[string]interface{}{
"a": float64(50),
},
time.Now(),
)
var metric13, _ = metric.New(
"m1",
map[string]string{"tag_name": "tag_value1"},
map[string]interface{}{
"a": float64(0.3),
"c": float64(400),
},
time.Now(),
)
var metric14, _ = metric.New(
"m1",
map[string]string{"tag_name": "tag_value1"},
map[string]interface{}{
"a": float64(24.12),
"b": float64(40),
},
time.Now(),
)
var metric15, _ = metric.New(
"m1",
map[string]string{"tag_name": "tag_value1"},
map[string]interface{}{
"a": float64(50.5),
"h": float64(1),
"u": float64(2.4),
},
time.Now(),
)
var MetricsSet1 = []telegraf.Metric{metric11, metric12, metric13, metric14, metric15}
///// Test set 2 /////
var metric21, _ = metric.New(
"metric1",
map[string]string{
"id": "1",
"tag1": "ONE",
"tag2": "FIVE",
"tag3": "SIX",
"tag4": "EIGHT",
},
map[string]interface{}{
"value": float64(31.31),
"A": float64(95.36),
"C": float64(72.41),
},
time.Now(),
)
var metric22, _ = metric.New(
"metric1",
map[string]string{
"id": "2",
"tag1": "TWO",
"tag2": "FOUR",
"tag3": "THREE",
"tag4": "EIGHT",
},
map[string]interface{}{
"value": float64(59.43),
"A": float64(0.6),
},
time.Now(),
)
var metric23, _ = metric.New(
"metric1",
map[string]string{
"id": "3",
"tag1": "TWO",
"tag2": "FOUR",
"tag3": "SIX",
"tag5": "TEN",
},
map[string]interface{}{
"value": float64(74.18),
"A": float64(77.42),
"B": float64(60.96),
},
time.Now(),
)
var metric24, _ = metric.New(
"metric2",
map[string]string{
"id": "4",
"tag1": "ONE",
"tag2": "FIVE",
"tag3": "THREE",
},
map[string]interface{}{
"value": float64(72),
"B": float64(22.1),
"C": float64(30.8),
},
time.Now(),
)
var metric25, _ = metric.New(
"metric2",
map[string]string{
"id": "5",
"tag1": "TWO",
"tag2": "FOUR",
"tag3": "SEVEN",
"tag4": "NINE",
},
map[string]interface{}{
"value": float64(87.92),
"B": float64(81.55),
"C": float64(45.1),
},
time.Now(),
)
var metric26, _ = metric.New(
"metric2",
map[string]string{
"id": "6",
"tag1": "TWO",
"tag2": "FIVE",
"tag3": "SEVEN",
"tag4": "NINE",
},
map[string]interface{}{
"value": float64(75.3),
"A": float64(29.45),
"C": float64(4.86),
},
time.Now(),
)
var MetricsSet2 = []telegraf.Metric{metric21, metric22, metric23, metric24, metric25, metric26}

View File

@ -0,0 +1,432 @@
package topk
import (
"fmt"
"log"
"math"
"sort"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/processors"
)
type TopK struct {
Period internal.Duration
K int
GroupBy []string `toml:"group_by"`
Fields []string
Aggregation string
Bottomk bool
AddGroupByTag string `toml:"add_groupby_tag"`
AddRankFields []string `toml:"add_rank_fields"`
AddAggregateFields []string `toml:"add_aggregate_fields"`
cache map[string][]telegraf.Metric
tagsGlobs filter.Filter
rankFieldSet map[string]bool
aggFieldSet map[string]bool
lastAggregation time.Time
}
func New() *TopK {
// Create object
topk := TopK{}
// Setup defaults
topk.Period = internal.Duration{Duration: time.Second * time.Duration(10)}
topk.K = 10
topk.Fields = []string{"value"}
topk.Aggregation = "mean"
topk.GroupBy = []string{"*"}
topk.AddGroupByTag = ""
topk.AddRankFields = []string{""}
topk.AddAggregateFields = []string{""}
// Initialize cache
topk.Reset()
return &topk
}
var sampleConfig = `
## How many seconds between aggregations
# period = 10
## How many top metrics to return
# k = 10
## Over which tags should the aggregation be done. Globs can be specified, in
## which case any tag matching the glob will aggregated over. If set to an
## empty list is no aggregation over tags is done
# group_by = ['*']
## Over which fields are the top k are calculated
# fields = ["value"]
## What aggregation to use. Options: sum, mean, min, max
# aggregation = "mean"
## Instead of the top k largest metrics, return the bottom k lowest metrics
# bottomk = false
## The plugin assigns each metric a GroupBy tag generated from its name and
## tags. If this setting is different than "" the plugin will add a
## tag (which name will be the value of this setting) to each metric with
## the value of the calculated GroupBy tag. Useful for debugging
# add_groupby_tag = ""
## These settings provide a way to know the position of each metric in
## the top k. The 'add_rank_field' setting allows to specify for which
## fields the position is required. If the list is non empty, then a field
## will be added to each and every metric for each string present in this
## setting. This field will contain the ranking of the group that
## the metric belonged to when aggregated over that field.
## The name of the field will be set to the name of the aggregation field,
## suffixed with the string '_topk_rank'
# add_rank_fields = []
## These settings provide a way to know what values the plugin is generating
## when aggregating metrics. The 'add_agregate_field' setting allows to
## specify for which fields the final aggregation value is required. If the
## list is non empty, then a field will be added to each every metric for
## each field present in this setting. This field will contain
## the computed aggregation for the group that the metric belonged to when
## aggregated over that field.
## The name of the field will be set to the name of the aggregation field,
## suffixed with the string '_topk_aggregate'
# add_aggregate_fields = []
`
type MetricAggregation struct {
groupbykey string
values map[string]float64
}
func sortMetrics(metrics []MetricAggregation, field string, reverse bool) {
less := func(i, j int) bool {
iv := metrics[i].values[field]
jv := metrics[j].values[field]
if iv < jv {
return true
} else {
return false
}
}
if reverse {
sort.SliceStable(metrics, less)
} else {
sort.SliceStable(metrics, func(i, j int) bool { return !less(i, j) })
}
}
func (t *TopK) SampleConfig() string {
return sampleConfig
}
func (t *TopK) Reset() {
t.cache = make(map[string][]telegraf.Metric)
t.lastAggregation = time.Now()
}
func (t *TopK) Description() string {
return "Print all metrics that pass through this filter."
}
func (t *TopK) generateGroupByKey(m telegraf.Metric) (string, error) {
// Create the filter.Filter objects if they have not been created
if t.tagsGlobs == nil && len(t.GroupBy) > 0 {
var err error
t.tagsGlobs, err = filter.Compile(t.GroupBy)
if err != nil {
return "", fmt.Errorf("could not compile pattern: %v %v", t.GroupBy, err)
}
}
groupkey := m.Name() + "&"
if len(t.GroupBy) > 0 {
tags := m.Tags()
keys := make([]string, 0, len(tags))
for tag, value := range tags {
if t.tagsGlobs.Match(tag) {
keys = append(keys, tag+"="+value+"&")
}
}
// Sorting the selected tags is necessary because dictionaries
// do not ensure any specific or deterministic ordering
sort.SliceStable(keys, func(i, j int) bool { return keys[i] < keys[j] })
for _, str := range keys {
groupkey += str
}
}
return groupkey, nil
}
func (t *TopK) groupBy(m telegraf.Metric) {
// Generate the metric group key
groupkey, err := t.generateGroupByKey(m)
if err != nil {
// If we could not generate the groupkey, fail hard
// by dropping this and all subsequent metrics
log.Printf("E! [processors.topk]: could not generate group key: %v", err)
return
}
// Initialize the key with an empty list if necessary
if _, ok := t.cache[groupkey]; !ok {
t.cache[groupkey] = make([]telegraf.Metric, 0, 10)
}
// Append the metric to the corresponding key list
t.cache[groupkey] = append(t.cache[groupkey], m)
// Add the generated groupby key tag to the metric if requested
if t.AddGroupByTag != "" {
m.AddTag(t.AddGroupByTag, groupkey)
}
}
func (t *TopK) Apply(in ...telegraf.Metric) []telegraf.Metric {
// Init any internal datastructures that are not initialized yet
if t.rankFieldSet == nil {
t.rankFieldSet = make(map[string]bool)
for _, f := range t.AddRankFields {
t.rankFieldSet[f] = true
}
}
if t.aggFieldSet == nil {
t.aggFieldSet = make(map[string]bool)
for _, f := range t.AddAggregateFields {
t.aggFieldSet[f] = true
}
}
// Add the metrics received to our internal cache
for _, m := range in {
// Check if the metric has any of the fields over wich we are aggregating
hasField := false
for _, f := range t.Fields {
if m.HasField(f) {
hasField = true
break
}
}
if !hasField {
continue
}
// Add the metric to the internal cache
t.groupBy(m)
}
// If enough time has passed
elapsed := time.Since(t.lastAggregation)
if elapsed >= t.Period.Duration {
return t.push()
}
return []telegraf.Metric{}
}
func min(a, b int) int {
if a > b {
return b
}
return a
}
func convert(in interface{}) (float64, bool) {
switch v := in.(type) {
case float64:
return v, true
case int64:
return float64(v), true
case uint64:
return float64(v), true
default:
return 0, false
}
}
func (t *TopK) push() []telegraf.Metric {
// Generate aggregations list using the selected fields
aggregations := make([]MetricAggregation, 0, 100)
aggregator, err := t.getAggregationFunction(t.Aggregation)
if err != nil {
// If we could not generate the aggregation
// function, fail hard by dropping all metrics
log.Printf("E! [processors.topk]: %v", err)
return []telegraf.Metric{}
}
for k, ms := range t.cache {
aggregations = append(aggregations, MetricAggregation{groupbykey: k, values: aggregator(ms, t.Fields)})
}
// The return value that will hold the returned metrics
var ret []telegraf.Metric = make([]telegraf.Metric, 0, 0)
// Get the top K metrics for each field and add them to the return value
addedKeys := make(map[string]bool)
groupTag := t.AddGroupByTag
for _, field := range t.Fields {
// Sort the aggregations
sortMetrics(aggregations, field, t.Bottomk)
// Create a one dimentional list with the top K metrics of each key
for i, ag := range aggregations[0:min(t.K, len(aggregations))] {
// Check whether of not we need to add fields of tags to the selected metrics
if len(t.aggFieldSet) != 0 || len(t.rankFieldSet) != 0 || groupTag != "" {
for _, m := range t.cache[ag.groupbykey] {
// Add the aggregation final value if requested
_, addAggField := t.aggFieldSet[field]
if addAggField && m.HasField(field) {
m.AddField(field+"_topk_aggregate", ag.values[field])
}
// Add the rank relative to the current field if requested
_, addRankField := t.rankFieldSet[field]
if addRankField && m.HasField(field) {
m.AddField(field+"_topk_rank", i+1)
}
}
}
// Add metrics if we have not already appended them to the return value
_, ok := addedKeys[ag.groupbykey]
if !ok {
ret = append(ret, t.cache[ag.groupbykey]...)
addedKeys[ag.groupbykey] = true
}
}
}
t.Reset()
return ret
}
// Function that generates the aggregation functions
func (t *TopK) getAggregationFunction(aggOperation string) (func([]telegraf.Metric, []string) map[string]float64, error) {
// This is a function aggregates a set of metrics using a given aggregation function
var aggregator = func(ms []telegraf.Metric, fields []string, f func(map[string]float64, float64, string)) map[string]float64 {
agg := make(map[string]float64)
// Compute the sums of the selected fields over all the measurements collected for this metric
for _, m := range ms {
for _, field := range fields {
fieldVal, ok := m.Fields()[field]
if !ok {
continue // Skip if this metric doesn't have this field set
}
val, ok := convert(fieldVal)
if !ok {
log.Printf("Cannot convert value '%s' from metric '%s' with tags '%s'",
m.Fields()[field], m.Name(), m.Tags())
continue
}
f(agg, val, field)
}
}
return agg
}
switch aggOperation {
case "sum":
return func(ms []telegraf.Metric, fields []string) map[string]float64 {
sum := func(agg map[string]float64, val float64, field string) {
agg[field] += val
}
return aggregator(ms, fields, sum)
}, nil
case "min":
return func(ms []telegraf.Metric, fields []string) map[string]float64 {
min := func(agg map[string]float64, val float64, field string) {
// If this field has not been set, set it to the maximum float64
_, ok := agg[field]
if !ok {
agg[field] = math.MaxFloat64
}
// Check if we've found a new minimum
if agg[field] > val {
agg[field] = val
}
}
return aggregator(ms, fields, min)
}, nil
case "max":
return func(ms []telegraf.Metric, fields []string) map[string]float64 {
max := func(agg map[string]float64, val float64, field string) {
// If this field has not been set, set it to the minimum float64
_, ok := agg[field]
if !ok {
agg[field] = -math.MaxFloat64
}
// Check if we've found a new maximum
if agg[field] < val {
agg[field] = val
}
}
return aggregator(ms, fields, max)
}, nil
case "mean":
return func(ms []telegraf.Metric, fields []string) map[string]float64 {
mean := make(map[string]float64)
meanCounters := make(map[string]float64)
// Compute the sums of the selected fields over all the measurements collected for this metric
for _, m := range ms {
for _, field := range fields {
fieldVal, ok := m.Fields()[field]
if !ok {
continue // Skip if this metric doesn't have this field set
}
val, ok := convert(fieldVal)
if !ok {
log.Printf("Cannot convert value '%s' from metric '%s' with tags '%s'",
m.Fields()[field], m.Name(), m.Tags())
continue
}
mean[field] += val
meanCounters[field] += 1
}
}
// Divide by the number of recorded measurements collected for every field
noMeasurementsFound := true // Canary to check if no field with values was found, so we can return nil
for k, _ := range mean {
if meanCounters[k] == 0 {
mean[k] = 0
continue
}
mean[k] = mean[k] / meanCounters[k]
noMeasurementsFound = noMeasurementsFound && false
}
if noMeasurementsFound {
return nil
}
return mean
}, nil
default:
return nil, fmt.Errorf("Unknown aggregation function '%s'. No metrics will be processed", t.Aggregation)
}
}
func init() {
processors.Add("topk", func() telegraf.Processor {
return New()
})
}

View File

@ -0,0 +1,532 @@
package topk
import (
"reflect"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
)
// Key, value pair that represents a telegraf.Metric Field
type field struct {
key string
val interface{}
}
func fieldList(fields ...field) []field {
return fields
}
// Key, value pair that represents a telegraf.Metric Tag
type tag struct {
key string
val string
}
func tagList(tags ...tag) []tag {
return tags
}
// Abstraction of a change in a single metric
type metricChange struct {
newFields []field // Fieldsthat should be added to the metric
newTags []tag // Tags that should be added to the metric
runHash bool // Sometimes the metrics' HashID must be run so reflect.DeepEqual works
// This happens because telegraf.Metric mantains an internal cache of
// its hash value that is set when HashID() is called for the first time
}
// Generate a new set of metrics from a set of changes. This is used to generate an answer which will be
// compare against the output of the processor
// NOTE: A `changeSet` is a map where the keys are the indices of the metrics to keep, and the values
// are list of new tags and fields to be added to the metric in that index.
// THE ORDERING OF THE NEW TAGS AND FIELDS MATTERS. When using reflect.DeepEqual to compare metrics,
// comparing metrics that have the same fields/tags added in different orders will return false, although
// they are semantically equal.
// Therefore the fields and tags must be in the same order that the processor would add them
func generateAns(input []telegraf.Metric, changeSet map[int]metricChange) []telegraf.Metric {
answer := []telegraf.Metric{}
// For every input metric, we check if there is a change we need to apply
// If there is no change for a given input metric, the metric is dropped
for i, metric := range input {
change, ok := changeSet[i]
if ok {
// Deep copy the metric
newMetric := metric.Copy()
// Add new fields
if change.newFields != nil {
for _, p := range change.newFields {
newMetric.AddField(p.key, p.val)
}
}
// Add new tags
if change.newTags != nil {
for _, p := range change.newTags {
newMetric.AddTag(p.key, p.val)
}
}
// Run the hash function if required
if change.runHash {
newMetric.HashID()
}
answer = append(answer, newMetric)
}
}
return answer
}
func deepCopy(a []telegraf.Metric) []telegraf.Metric {
ret := make([]telegraf.Metric, 0, len(a))
for _, m := range a {
ret = append(ret, m.Copy())
}
return ret
}
func belongs(m telegraf.Metric, ms []telegraf.Metric) bool {
for _, i := range ms {
if reflect.DeepEqual(i, m) {
return true
}
}
return false
}
func subSet(a []telegraf.Metric, b []telegraf.Metric) bool {
subset := true
for _, m := range a {
if !belongs(m, b) {
subset = false
break
}
}
return subset
}
func equalSets(l1 []telegraf.Metric, l2 []telegraf.Metric) bool {
return subSet(l1, l2) && subSet(l2, l1)
}
func createDuration(t int) internal.Duration {
return internal.Duration{Duration: time.Second * time.Duration(t)}
}
func runAndCompare(topk *TopK, metrics []telegraf.Metric, answer []telegraf.Metric, testID string, t *testing.T) {
// Sleep for `period`, otherwise the processor will only
// cache the metrics, but it will not process them
time.Sleep(topk.Period.Duration)
// Run the processor
ret := topk.Apply(metrics...)
topk.Reset()
// The returned set mut be equal to the answer set
if !equalSets(ret, answer) {
t.Error("\nExpected metrics for", testID, ":\n",
answer, "\nReturned metrics:\n", ret)
}
}
// Smoke tests
func TestTopkAggregatorsSmokeTests(t *testing.T) {
// Build the processor
var topk TopK
topk = *New()
topk.Period = createDuration(1)
topk.Fields = []string{"a"}
topk.GroupBy = []string{"tag_name"}
aggregators := []string{"mean", "sum", "max", "min"}
//The answer is equal to the original set for these particual scenarios
input := MetricsSet1
answer := MetricsSet1
for _, ag := range aggregators {
topk.Aggregation = ag
runAndCompare(&topk, input, answer, "SmokeAggregator_"+ag, t)
}
}
// AddAggregateFields + Mean aggregator
func TestTopkMeanAddAggregateFields(t *testing.T) {
// Build the processor
var topk TopK
topk = *New()
topk.Period = createDuration(1)
topk.Aggregation = "mean"
topk.AddAggregateFields = []string{"a"}
topk.Fields = []string{"a"}
topk.GroupBy = []string{"tag_name"}
// Get the input
input := deepCopy(MetricsSet1)
// Generate the answer
chng := fieldList(field{"a_topk_aggregate", float64(28.044)})
changeSet := map[int]metricChange{
0: metricChange{newFields: chng},
1: metricChange{newFields: chng},
2: metricChange{newFields: chng},
3: metricChange{newFields: chng},
4: metricChange{newFields: chng},
}
answer := generateAns(input, changeSet)
// Run the test
runAndCompare(&topk, input, answer, "MeanAddAggregateFields test", t)
}
// AddAggregateFields + Sum aggregator
func TestTopkSumAddAggregateFields(t *testing.T) {
// Build the processor
var topk TopK
topk = *New()
topk.Period = createDuration(1)
topk.Aggregation = "sum"
topk.AddAggregateFields = []string{"a"}
topk.Fields = []string{"a"}
topk.GroupBy = []string{"tag_name"}
// Get the input
input := deepCopy(MetricsSet1)
// Generate the answer
chng := fieldList(field{"a_topk_aggregate", float64(140.22)})
changeSet := map[int]metricChange{
0: metricChange{newFields: chng},
1: metricChange{newFields: chng},
2: metricChange{newFields: chng},
3: metricChange{newFields: chng},
4: metricChange{newFields: chng},
}
answer := generateAns(input, changeSet)
// Run the test
runAndCompare(&topk, input, answer, "SumAddAggregateFields test", t)
}
// AddAggregateFields + Max aggregator
func TestTopkMaxAddAggregateFields(t *testing.T) {
// Build the processor
var topk TopK
topk = *New()
topk.Period = createDuration(1)
topk.Aggregation = "max"
topk.AddAggregateFields = []string{"a"}
topk.Fields = []string{"a"}
topk.GroupBy = []string{"tag_name"}
// Get the input
input := deepCopy(MetricsSet1)
// Generate the answer
chng := fieldList(field{"a_topk_aggregate", float64(50.5)})
changeSet := map[int]metricChange{
0: metricChange{newFields: chng},
1: metricChange{newFields: chng},
2: metricChange{newFields: chng},
3: metricChange{newFields: chng},
4: metricChange{newFields: chng},
}
answer := generateAns(input, changeSet)
// Run the test
runAndCompare(&topk, input, answer, "MaxAddAggregateFields test", t)
}
// AddAggregateFields + Min aggregator
func TestTopkMinAddAggregateFields(t *testing.T) {
// Build the processor
var topk TopK
topk = *New()
topk.Period = createDuration(1)
topk.Aggregation = "min"
topk.AddAggregateFields = []string{"a"}
topk.Fields = []string{"a"}
topk.GroupBy = []string{"tag_name"}
// Get the input
input := deepCopy(MetricsSet1)
// Generate the answer
chng := fieldList(field{"a_topk_aggregate", float64(0.3)})
changeSet := map[int]metricChange{
0: metricChange{newFields: chng},
1: metricChange{newFields: chng},
2: metricChange{newFields: chng},
3: metricChange{newFields: chng},
4: metricChange{newFields: chng},
}
answer := generateAns(input, changeSet)
// Run the test
runAndCompare(&topk, input, answer, "MinAddAggregateFields test", t)
}
// GroupBy
func TestTopkGroupby1(t *testing.T) {
// Build the processor
var topk TopK
topk = *New()
topk.Period = createDuration(1)
topk.K = 3
topk.Aggregation = "sum"
topk.AddAggregateFields = []string{"value"}
topk.GroupBy = []string{"tag[13]"}
// Get the input
input := deepCopy(MetricsSet2)
// Generate the answer
changeSet := map[int]metricChange{
2: metricChange{newFields: fieldList(field{"value_topk_aggregate", float64(74.18)})},
3: metricChange{newFields: fieldList(field{"value_topk_aggregate", float64(72)})},
4: metricChange{newFields: fieldList(field{"value_topk_aggregate", float64(163.22)})},
5: metricChange{newFields: fieldList(field{"value_topk_aggregate", float64(163.22)})},
}
answer := generateAns(input, changeSet)
// Run the test
runAndCompare(&topk, input, answer, "GroupBy test 1", t)
}
func TestTopkGroupby2(t *testing.T) {
// Build the processor
var topk TopK
topk = *New()
topk.Period = createDuration(1)
topk.K = 3
topk.Aggregation = "mean"
topk.AddAggregateFields = []string{"value"}
topk.GroupBy = []string{"tag1"}
// Get the input
input := deepCopy(MetricsSet2)
// Generate the answer
chng1 := fieldList(field{"value_topk_aggregate", float64(66.805)})
chng2 := fieldList(field{"value_topk_aggregate", float64(72)})
chng3 := fieldList(field{"value_topk_aggregate", float64(81.61)})
changeSet := map[int]metricChange{
1: metricChange{newFields: chng1},
2: metricChange{newFields: chng1},
3: metricChange{newFields: chng2},
4: metricChange{newFields: chng3},
5: metricChange{newFields: chng3},
}
answer := generateAns(input, changeSet)
// Run the test
runAndCompare(&topk, input, answer, "GroupBy test 2", t)
}
func TestTopkGroupby3(t *testing.T) {
// Build the processor
var topk TopK
topk = *New()
topk.Period = createDuration(1)
topk.K = 1
topk.Aggregation = "min"
topk.AddAggregateFields = []string{"value"}
topk.GroupBy = []string{"tag4"}
// Get the input
input := deepCopy(MetricsSet2)
// Generate the answer
chng := fieldList(field{"value_topk_aggregate", float64(75.3)})
changeSet := map[int]metricChange{
4: metricChange{newFields: chng},
5: metricChange{newFields: chng},
}
answer := generateAns(input, changeSet)
// Run the test
runAndCompare(&topk, input, answer, "GroupBy test 3", t)
}
// GroupBy + Fields
func TestTopkGroupbyFields1(t *testing.T) {
// Build the processor
var topk TopK
topk = *New()
topk.Period = createDuration(1)
topk.K = 4 // This settings generate less than 3 groups
topk.Aggregation = "mean"
topk.AddAggregateFields = []string{"A"}
topk.GroupBy = []string{"tag1", "tag2"}
topk.Fields = []string{"A"}
// Get the input
input := deepCopy(MetricsSet2)
// Generate the answer
changeSet := map[int]metricChange{
0: metricChange{newFields: fieldList(field{"A_topk_aggregate", float64(95.36)})},
1: metricChange{newFields: fieldList(field{"A_topk_aggregate", float64(39.01)})},
2: metricChange{newFields: fieldList(field{"A_topk_aggregate", float64(39.01)})},
5: metricChange{newFields: fieldList(field{"A_topk_aggregate", float64(29.45)})},
}
answer := generateAns(input, changeSet)
// Run the test
runAndCompare(&topk, input, answer, "GroupBy Fields test 1", t)
}
func TestTopkGroupbyFields2(t *testing.T) {
// Build the processor
var topk TopK
topk = *New()
topk.Period = createDuration(1)
topk.K = 2
topk.Aggregation = "sum"
topk.AddAggregateFields = []string{"B", "C"}
topk.GroupBy = []string{"tag1", "tag3"}
topk.Fields = []string{"B", "C"}
// Get the input
input := deepCopy(MetricsSet2)
// Generate the answer
changeSet := map[int]metricChange{
0: metricChange{newFields: fieldList(field{"C_topk_aggregate", float64(72.41)})},
2: metricChange{newFields: fieldList(field{"B_topk_aggregate", float64(60.96)})},
4: metricChange{newFields: fieldList(field{"B_topk_aggregate", float64(81.55)}, field{"C_topk_aggregate", float64(49.96)})},
5: metricChange{newFields: fieldList(field{"C_topk_aggregate", float64(49.96)})},
}
answer := generateAns(input, changeSet)
// Run the test
runAndCompare(&topk, input, answer, "GroupBy Fields test 2", t)
}
// GroupBy metric name
func TestTopkGroupbyMetricName1(t *testing.T) {
// Build the processor
var topk TopK
topk = *New()
topk.Period = createDuration(1)
topk.K = 1
topk.Aggregation = "sum"
topk.AddAggregateFields = []string{"value"}
topk.GroupBy = []string{}
// Get the input
input := deepCopy(MetricsSet2)
// Generate the answer
chng := fieldList(field{"value_topk_aggregate", float64(235.22000000000003)})
changeSet := map[int]metricChange{
3: metricChange{newFields: chng},
4: metricChange{newFields: chng},
5: metricChange{newFields: chng},
}
answer := generateAns(input, changeSet)
// Run the test
runAndCompare(&topk, input, answer, "GroupBy by metric name test 1", t)
}
func TestTopkGroupbyMetricName2(t *testing.T) {
// Build the processor
var topk TopK
topk = *New()
topk.Period = createDuration(1)
topk.K = 2
topk.Aggregation = "sum"
topk.AddAggregateFields = []string{"A", "value"}
topk.GroupBy = []string{"tag[12]"}
topk.Fields = []string{"A", "value"}
// Get the input
input := deepCopy(MetricsSet2)
// Generate the answer
changeSet := map[int]metricChange{
0: metricChange{newFields: fieldList(field{"A_topk_aggregate", float64(95.36)})},
1: metricChange{newFields: fieldList(field{"A_topk_aggregate", float64(78.02)}, field{"value_topk_aggregate", float64(133.61)})},
2: metricChange{newFields: fieldList(field{"A_topk_aggregate", float64(78.02)}, field{"value_topk_aggregate", float64(133.61)})},
4: metricChange{newFields: fieldList(field{"value_topk_aggregate", float64(87.92)})},
}
answer := generateAns(input, changeSet)
// Run the test
runAndCompare(&topk, input, answer, "GroupBy by metric name test 2", t)
}
// BottomK
func TestTopkBottomk(t *testing.T) {
// Build the processor
var topk TopK
topk = *New()
topk.Period = createDuration(1)
topk.K = 3
topk.Aggregation = "sum"
topk.GroupBy = []string{"tag1", "tag3"}
topk.Bottomk = true
// Get the input
input := deepCopy(MetricsSet2)
// Generate the answer
changeSet := map[int]metricChange{
0: metricChange{},
1: metricChange{},
3: metricChange{},
}
answer := generateAns(input, changeSet)
// Run the test
runAndCompare(&topk, input, answer, "Bottom k test", t)
}
// GroupByKeyTag
func TestTopkGroupByKeyTag(t *testing.T) {
// Build the processor
var topk TopK
topk = *New()
topk.Period = createDuration(1)
topk.K = 3
topk.Aggregation = "sum"
topk.GroupBy = []string{"tag1", "tag3"}
topk.AddGroupByTag = "gbt"
// Get the input
input := deepCopy(MetricsSet2)
// Generate the answer
changeSet := map[int]metricChange{
2: metricChange{newTags: tagList(tag{"gbt", "metric1&tag1=TWO&tag3=SIX&"})},
3: metricChange{newTags: tagList(tag{"gbt", "metric2&tag1=ONE&tag3=THREE&"})},
4: metricChange{newTags: tagList(tag{"gbt", "metric2&tag1=TWO&tag3=SEVEN&"})},
5: metricChange{newTags: tagList(tag{"gbt", "metric2&tag1=TWO&tag3=SEVEN&"})},
}
answer := generateAns(input, changeSet)
// Run the test
runAndCompare(&topk, input, answer, "GroupByKeyTag test", t)
}