From 68743825b83f158219fd4c99743e01132bd2ddaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Germ=C3=A1n=20Jaber?= Date: Fri, 4 May 2018 20:40:05 -0500 Subject: [PATCH] Add topk processor plugin (#4096) --- plugins/processors/all/all.go | 1 + plugins/processors/topk/README.md | 74 ++++ plugins/processors/topk/test_sets.go | 163 ++++++++ plugins/processors/topk/topk.go | 432 ++++++++++++++++++++++ plugins/processors/topk/topk_test.go | 532 +++++++++++++++++++++++++++ 5 files changed, 1202 insertions(+) create mode 100644 plugins/processors/topk/README.md create mode 100644 plugins/processors/topk/test_sets.go create mode 100644 plugins/processors/topk/topk.go create mode 100644 plugins/processors/topk/topk_test.go diff --git a/plugins/processors/all/all.go b/plugins/processors/all/all.go index 1eecbfa7e..f14014e29 100644 --- a/plugins/processors/all/all.go +++ b/plugins/processors/all/all.go @@ -3,4 +3,5 @@ package all import ( _ "github.com/influxdata/telegraf/plugins/processors/override" _ "github.com/influxdata/telegraf/plugins/processors/printer" + _ "github.com/influxdata/telegraf/plugins/processors/topk" ) diff --git a/plugins/processors/topk/README.md b/plugins/processors/topk/README.md new file mode 100644 index 000000000..9c9e48af9 --- /dev/null +++ b/plugins/processors/topk/README.md @@ -0,0 +1,74 @@ +# TopK Processor Plugin + +The TopK processor plugin is a filter designed to get the top series over a period of time. It can be tweaked to do its top k computation over a period of time, so spikes can be smoothed out. + +This processor goes through these steps when processing a batch of metrics: + + 1. Groups metrics in buckets using their tags and name as key + 2. Aggregates each of the selected fields for each bucket by the selected aggregation function (sum, mean, etc) + 3. Orders the buckets by one of the generated aggregations, returns all metrics in the top `K` buckets, then reorders the buckets by the next of the generated aggregations, returns all metrics in the top `K` buckets, etc, etc, etc, until it runs out of fields. + +The plugin makes sure not to duplicate metrics + +Note that depending on the amount of metrics on each computed bucket, more than `K` metrics may be returned + +### Configuration: + +```toml +[[processors.topk]] + ## How many seconds between aggregations + # period = 10 + + ## How many top metrics to return + # k = 10 + + ## Over which tags should the aggregation be done. Globs can be specified, in + ## which case any tag matching the glob will aggregated over. If set to an + ## empty list is no aggregation over tags is done + # group_by = ['*'] + + ## Over which fields are the top k are calculated + # fields = ["value"] + + ## What aggregation to use. Options: sum, mean, min, max + # aggregation = "mean" + + ## Instead of the top k largest metrics, return the bottom k lowest metrics + # bottomk = false + + ## The plugin assigns each metric a GroupBy tag generated from its name and + ## tags. If this setting is different than "" the plugin will add a + ## tag (which name will be the value of this setting) to each metric with + ## the value of the calculated GroupBy tag. Useful for debugging + # add_groupby_tag = "" + + ## These settings provide a way to know the position of each metric in + ## the top k. The 'add_rank_field' setting allows to specify for which + ## fields the position is required. If the list is non empty, then a field + ## will be added to each and every metric for each string present in this + ## setting. This field will contain the ranking of the group that + ## the metric belonged to when aggregated over that field. + ## The name of the field will be set to the name of the aggregation field, + ## suffixed with the string '_topk_rank' + # add_rank_fields = [] + + ## These settings provide a way to know what values the plugin is generating + ## when aggregating metrics. The 'add_agregate_field' setting allows to + ## specify for which fields the final aggregation value is required. If the + ## list is non empty, then a field will be added to each every metric for + ## each field present in this setting. This field will contain + ## the computed aggregation for the group that the metric belonged to when + ## aggregated over that field. + ## The name of the field will be set to the name of the aggregation field, + ## suffixed with the string '_topk_aggregate' + # add_aggregate_fields = [] +``` + +### Tags: + +This processor does not add tags by default. But the setting `add_groupby_tag` will add a tag if set to anything other than "" + + +### Fields: + +This processor does not add fields by default. But the settings `add_rank_fields` and `add_aggregation_fields` will add one or several fields if set to anything other than "" diff --git a/plugins/processors/topk/test_sets.go b/plugins/processors/topk/test_sets.go new file mode 100644 index 000000000..aea2c44c8 --- /dev/null +++ b/plugins/processors/topk/test_sets.go @@ -0,0 +1,163 @@ +package topk + +import ( + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "time" +) + +///// Test set 1 ///// +var metric11, _ = metric.New( + "m1", + map[string]string{"tag_name": "tag_value1"}, + map[string]interface{}{ + "a": float64(15.3), + "b": float64(40), + }, + time.Now(), +) + +var metric12, _ = metric.New( + "m1", + map[string]string{"tag_name": "tag_value1"}, + map[string]interface{}{ + "a": float64(50), + }, + time.Now(), +) + +var metric13, _ = metric.New( + "m1", + map[string]string{"tag_name": "tag_value1"}, + map[string]interface{}{ + "a": float64(0.3), + "c": float64(400), + }, + time.Now(), +) + +var metric14, _ = metric.New( + "m1", + map[string]string{"tag_name": "tag_value1"}, + map[string]interface{}{ + "a": float64(24.12), + "b": float64(40), + }, + time.Now(), +) + +var metric15, _ = metric.New( + "m1", + map[string]string{"tag_name": "tag_value1"}, + map[string]interface{}{ + "a": float64(50.5), + "h": float64(1), + "u": float64(2.4), + }, + time.Now(), +) + +var MetricsSet1 = []telegraf.Metric{metric11, metric12, metric13, metric14, metric15} + +///// Test set 2 ///// +var metric21, _ = metric.New( + "metric1", + map[string]string{ + "id": "1", + "tag1": "ONE", + "tag2": "FIVE", + "tag3": "SIX", + "tag4": "EIGHT", + }, + map[string]interface{}{ + "value": float64(31.31), + "A": float64(95.36), + "C": float64(72.41), + }, + time.Now(), +) + +var metric22, _ = metric.New( + "metric1", + map[string]string{ + "id": "2", + "tag1": "TWO", + "tag2": "FOUR", + "tag3": "THREE", + "tag4": "EIGHT", + }, + map[string]interface{}{ + "value": float64(59.43), + "A": float64(0.6), + }, + time.Now(), +) + +var metric23, _ = metric.New( + "metric1", + map[string]string{ + "id": "3", + "tag1": "TWO", + "tag2": "FOUR", + "tag3": "SIX", + "tag5": "TEN", + }, + map[string]interface{}{ + "value": float64(74.18), + "A": float64(77.42), + "B": float64(60.96), + }, + time.Now(), +) + +var metric24, _ = metric.New( + "metric2", + map[string]string{ + "id": "4", + "tag1": "ONE", + "tag2": "FIVE", + "tag3": "THREE", + }, + map[string]interface{}{ + "value": float64(72), + "B": float64(22.1), + "C": float64(30.8), + }, + time.Now(), +) + +var metric25, _ = metric.New( + "metric2", + map[string]string{ + "id": "5", + "tag1": "TWO", + "tag2": "FOUR", + "tag3": "SEVEN", + "tag4": "NINE", + }, + map[string]interface{}{ + "value": float64(87.92), + "B": float64(81.55), + "C": float64(45.1), + }, + time.Now(), +) + +var metric26, _ = metric.New( + "metric2", + map[string]string{ + "id": "6", + "tag1": "TWO", + "tag2": "FIVE", + "tag3": "SEVEN", + "tag4": "NINE", + }, + map[string]interface{}{ + "value": float64(75.3), + "A": float64(29.45), + "C": float64(4.86), + }, + time.Now(), +) + +var MetricsSet2 = []telegraf.Metric{metric21, metric22, metric23, metric24, metric25, metric26} diff --git a/plugins/processors/topk/topk.go b/plugins/processors/topk/topk.go new file mode 100644 index 000000000..8a52fa8d4 --- /dev/null +++ b/plugins/processors/topk/topk.go @@ -0,0 +1,432 @@ +package topk + +import ( + "fmt" + "log" + "math" + "sort" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/processors" +) + +type TopK struct { + Period internal.Duration + K int + GroupBy []string `toml:"group_by"` + Fields []string + Aggregation string + Bottomk bool + AddGroupByTag string `toml:"add_groupby_tag"` + AddRankFields []string `toml:"add_rank_fields"` + AddAggregateFields []string `toml:"add_aggregate_fields"` + + cache map[string][]telegraf.Metric + tagsGlobs filter.Filter + rankFieldSet map[string]bool + aggFieldSet map[string]bool + lastAggregation time.Time +} + +func New() *TopK { + // Create object + topk := TopK{} + + // Setup defaults + topk.Period = internal.Duration{Duration: time.Second * time.Duration(10)} + topk.K = 10 + topk.Fields = []string{"value"} + topk.Aggregation = "mean" + topk.GroupBy = []string{"*"} + topk.AddGroupByTag = "" + topk.AddRankFields = []string{""} + topk.AddAggregateFields = []string{""} + + // Initialize cache + topk.Reset() + + return &topk +} + +var sampleConfig = ` + ## How many seconds between aggregations + # period = 10 + + ## How many top metrics to return + # k = 10 + + ## Over which tags should the aggregation be done. Globs can be specified, in + ## which case any tag matching the glob will aggregated over. If set to an + ## empty list is no aggregation over tags is done + # group_by = ['*'] + + ## Over which fields are the top k are calculated + # fields = ["value"] + + ## What aggregation to use. Options: sum, mean, min, max + # aggregation = "mean" + + ## Instead of the top k largest metrics, return the bottom k lowest metrics + # bottomk = false + + ## The plugin assigns each metric a GroupBy tag generated from its name and + ## tags. If this setting is different than "" the plugin will add a + ## tag (which name will be the value of this setting) to each metric with + ## the value of the calculated GroupBy tag. Useful for debugging + # add_groupby_tag = "" + + ## These settings provide a way to know the position of each metric in + ## the top k. The 'add_rank_field' setting allows to specify for which + ## fields the position is required. If the list is non empty, then a field + ## will be added to each and every metric for each string present in this + ## setting. This field will contain the ranking of the group that + ## the metric belonged to when aggregated over that field. + ## The name of the field will be set to the name of the aggregation field, + ## suffixed with the string '_topk_rank' + # add_rank_fields = [] + + ## These settings provide a way to know what values the plugin is generating + ## when aggregating metrics. The 'add_agregate_field' setting allows to + ## specify for which fields the final aggregation value is required. If the + ## list is non empty, then a field will be added to each every metric for + ## each field present in this setting. This field will contain + ## the computed aggregation for the group that the metric belonged to when + ## aggregated over that field. + ## The name of the field will be set to the name of the aggregation field, + ## suffixed with the string '_topk_aggregate' + # add_aggregate_fields = [] +` + +type MetricAggregation struct { + groupbykey string + values map[string]float64 +} + +func sortMetrics(metrics []MetricAggregation, field string, reverse bool) { + less := func(i, j int) bool { + iv := metrics[i].values[field] + jv := metrics[j].values[field] + if iv < jv { + return true + } else { + return false + } + } + + if reverse { + sort.SliceStable(metrics, less) + } else { + sort.SliceStable(metrics, func(i, j int) bool { return !less(i, j) }) + } +} + +func (t *TopK) SampleConfig() string { + return sampleConfig +} + +func (t *TopK) Reset() { + t.cache = make(map[string][]telegraf.Metric) + t.lastAggregation = time.Now() +} + +func (t *TopK) Description() string { + return "Print all metrics that pass through this filter." +} + +func (t *TopK) generateGroupByKey(m telegraf.Metric) (string, error) { + // Create the filter.Filter objects if they have not been created + if t.tagsGlobs == nil && len(t.GroupBy) > 0 { + var err error + t.tagsGlobs, err = filter.Compile(t.GroupBy) + if err != nil { + return "", fmt.Errorf("could not compile pattern: %v %v", t.GroupBy, err) + } + } + + groupkey := m.Name() + "&" + + if len(t.GroupBy) > 0 { + tags := m.Tags() + keys := make([]string, 0, len(tags)) + for tag, value := range tags { + if t.tagsGlobs.Match(tag) { + keys = append(keys, tag+"="+value+"&") + } + } + // Sorting the selected tags is necessary because dictionaries + // do not ensure any specific or deterministic ordering + sort.SliceStable(keys, func(i, j int) bool { return keys[i] < keys[j] }) + for _, str := range keys { + groupkey += str + } + } + + return groupkey, nil +} + +func (t *TopK) groupBy(m telegraf.Metric) { + // Generate the metric group key + groupkey, err := t.generateGroupByKey(m) + if err != nil { + // If we could not generate the groupkey, fail hard + // by dropping this and all subsequent metrics + log.Printf("E! [processors.topk]: could not generate group key: %v", err) + return + } + + // Initialize the key with an empty list if necessary + if _, ok := t.cache[groupkey]; !ok { + t.cache[groupkey] = make([]telegraf.Metric, 0, 10) + } + + // Append the metric to the corresponding key list + t.cache[groupkey] = append(t.cache[groupkey], m) + + // Add the generated groupby key tag to the metric if requested + if t.AddGroupByTag != "" { + m.AddTag(t.AddGroupByTag, groupkey) + } +} + +func (t *TopK) Apply(in ...telegraf.Metric) []telegraf.Metric { + // Init any internal datastructures that are not initialized yet + if t.rankFieldSet == nil { + t.rankFieldSet = make(map[string]bool) + for _, f := range t.AddRankFields { + t.rankFieldSet[f] = true + } + } + if t.aggFieldSet == nil { + t.aggFieldSet = make(map[string]bool) + for _, f := range t.AddAggregateFields { + t.aggFieldSet[f] = true + } + } + + // Add the metrics received to our internal cache + for _, m := range in { + + // Check if the metric has any of the fields over wich we are aggregating + hasField := false + for _, f := range t.Fields { + if m.HasField(f) { + hasField = true + break + } + } + if !hasField { + continue + } + + // Add the metric to the internal cache + t.groupBy(m) + } + + // If enough time has passed + elapsed := time.Since(t.lastAggregation) + if elapsed >= t.Period.Duration { + return t.push() + } + + return []telegraf.Metric{} +} + +func min(a, b int) int { + if a > b { + return b + } + return a +} + +func convert(in interface{}) (float64, bool) { + switch v := in.(type) { + case float64: + return v, true + case int64: + return float64(v), true + case uint64: + return float64(v), true + default: + return 0, false + } +} + +func (t *TopK) push() []telegraf.Metric { + // Generate aggregations list using the selected fields + aggregations := make([]MetricAggregation, 0, 100) + aggregator, err := t.getAggregationFunction(t.Aggregation) + if err != nil { + // If we could not generate the aggregation + // function, fail hard by dropping all metrics + log.Printf("E! [processors.topk]: %v", err) + return []telegraf.Metric{} + } + for k, ms := range t.cache { + aggregations = append(aggregations, MetricAggregation{groupbykey: k, values: aggregator(ms, t.Fields)}) + } + + // The return value that will hold the returned metrics + var ret []telegraf.Metric = make([]telegraf.Metric, 0, 0) + + // Get the top K metrics for each field and add them to the return value + addedKeys := make(map[string]bool) + groupTag := t.AddGroupByTag + for _, field := range t.Fields { + + // Sort the aggregations + sortMetrics(aggregations, field, t.Bottomk) + + // Create a one dimentional list with the top K metrics of each key + for i, ag := range aggregations[0:min(t.K, len(aggregations))] { + + // Check whether of not we need to add fields of tags to the selected metrics + if len(t.aggFieldSet) != 0 || len(t.rankFieldSet) != 0 || groupTag != "" { + for _, m := range t.cache[ag.groupbykey] { + + // Add the aggregation final value if requested + _, addAggField := t.aggFieldSet[field] + if addAggField && m.HasField(field) { + m.AddField(field+"_topk_aggregate", ag.values[field]) + } + + // Add the rank relative to the current field if requested + _, addRankField := t.rankFieldSet[field] + if addRankField && m.HasField(field) { + m.AddField(field+"_topk_rank", i+1) + } + } + } + + // Add metrics if we have not already appended them to the return value + _, ok := addedKeys[ag.groupbykey] + if !ok { + ret = append(ret, t.cache[ag.groupbykey]...) + addedKeys[ag.groupbykey] = true + } + } + } + + t.Reset() + + return ret +} + +// Function that generates the aggregation functions +func (t *TopK) getAggregationFunction(aggOperation string) (func([]telegraf.Metric, []string) map[string]float64, error) { + + // This is a function aggregates a set of metrics using a given aggregation function + var aggregator = func(ms []telegraf.Metric, fields []string, f func(map[string]float64, float64, string)) map[string]float64 { + agg := make(map[string]float64) + // Compute the sums of the selected fields over all the measurements collected for this metric + for _, m := range ms { + for _, field := range fields { + fieldVal, ok := m.Fields()[field] + if !ok { + continue // Skip if this metric doesn't have this field set + } + val, ok := convert(fieldVal) + if !ok { + log.Printf("Cannot convert value '%s' from metric '%s' with tags '%s'", + m.Fields()[field], m.Name(), m.Tags()) + continue + } + f(agg, val, field) + } + } + return agg + } + + switch aggOperation { + case "sum": + return func(ms []telegraf.Metric, fields []string) map[string]float64 { + sum := func(agg map[string]float64, val float64, field string) { + agg[field] += val + } + return aggregator(ms, fields, sum) + }, nil + + case "min": + return func(ms []telegraf.Metric, fields []string) map[string]float64 { + min := func(agg map[string]float64, val float64, field string) { + // If this field has not been set, set it to the maximum float64 + _, ok := agg[field] + if !ok { + agg[field] = math.MaxFloat64 + } + + // Check if we've found a new minimum + if agg[field] > val { + agg[field] = val + } + } + return aggregator(ms, fields, min) + }, nil + + case "max": + return func(ms []telegraf.Metric, fields []string) map[string]float64 { + max := func(agg map[string]float64, val float64, field string) { + // If this field has not been set, set it to the minimum float64 + _, ok := agg[field] + if !ok { + agg[field] = -math.MaxFloat64 + } + + // Check if we've found a new maximum + if agg[field] < val { + agg[field] = val + } + } + return aggregator(ms, fields, max) + }, nil + + case "mean": + return func(ms []telegraf.Metric, fields []string) map[string]float64 { + mean := make(map[string]float64) + meanCounters := make(map[string]float64) + // Compute the sums of the selected fields over all the measurements collected for this metric + for _, m := range ms { + for _, field := range fields { + fieldVal, ok := m.Fields()[field] + if !ok { + continue // Skip if this metric doesn't have this field set + } + val, ok := convert(fieldVal) + if !ok { + log.Printf("Cannot convert value '%s' from metric '%s' with tags '%s'", + m.Fields()[field], m.Name(), m.Tags()) + continue + } + mean[field] += val + meanCounters[field] += 1 + } + } + // Divide by the number of recorded measurements collected for every field + noMeasurementsFound := true // Canary to check if no field with values was found, so we can return nil + for k, _ := range mean { + if meanCounters[k] == 0 { + mean[k] = 0 + continue + } + mean[k] = mean[k] / meanCounters[k] + noMeasurementsFound = noMeasurementsFound && false + } + + if noMeasurementsFound { + return nil + } + return mean + }, nil + + default: + return nil, fmt.Errorf("Unknown aggregation function '%s'. No metrics will be processed", t.Aggregation) + } +} + +func init() { + processors.Add("topk", func() telegraf.Processor { + return New() + }) +} diff --git a/plugins/processors/topk/topk_test.go b/plugins/processors/topk/topk_test.go new file mode 100644 index 000000000..2f5844448 --- /dev/null +++ b/plugins/processors/topk/topk_test.go @@ -0,0 +1,532 @@ +package topk + +import ( + "reflect" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" +) + +// Key, value pair that represents a telegraf.Metric Field +type field struct { + key string + val interface{} +} + +func fieldList(fields ...field) []field { + return fields +} + +// Key, value pair that represents a telegraf.Metric Tag +type tag struct { + key string + val string +} + +func tagList(tags ...tag) []tag { + return tags +} + +// Abstraction of a change in a single metric +type metricChange struct { + newFields []field // Fieldsthat should be added to the metric + newTags []tag // Tags that should be added to the metric + + runHash bool // Sometimes the metrics' HashID must be run so reflect.DeepEqual works + // This happens because telegraf.Metric mantains an internal cache of + // its hash value that is set when HashID() is called for the first time +} + +// Generate a new set of metrics from a set of changes. This is used to generate an answer which will be +// compare against the output of the processor +// NOTE: A `changeSet` is a map where the keys are the indices of the metrics to keep, and the values +// are list of new tags and fields to be added to the metric in that index. +// THE ORDERING OF THE NEW TAGS AND FIELDS MATTERS. When using reflect.DeepEqual to compare metrics, +// comparing metrics that have the same fields/tags added in different orders will return false, although +// they are semantically equal. +// Therefore the fields and tags must be in the same order that the processor would add them +func generateAns(input []telegraf.Metric, changeSet map[int]metricChange) []telegraf.Metric { + answer := []telegraf.Metric{} + + // For every input metric, we check if there is a change we need to apply + // If there is no change for a given input metric, the metric is dropped + for i, metric := range input { + change, ok := changeSet[i] + if ok { + // Deep copy the metric + newMetric := metric.Copy() + + // Add new fields + if change.newFields != nil { + for _, p := range change.newFields { + newMetric.AddField(p.key, p.val) + } + } + + // Add new tags + if change.newTags != nil { + for _, p := range change.newTags { + newMetric.AddTag(p.key, p.val) + } + } + + // Run the hash function if required + if change.runHash { + newMetric.HashID() + } + + answer = append(answer, newMetric) + } + } + + return answer +} + +func deepCopy(a []telegraf.Metric) []telegraf.Metric { + ret := make([]telegraf.Metric, 0, len(a)) + for _, m := range a { + ret = append(ret, m.Copy()) + } + + return ret +} + +func belongs(m telegraf.Metric, ms []telegraf.Metric) bool { + for _, i := range ms { + if reflect.DeepEqual(i, m) { + return true + } + } + return false +} + +func subSet(a []telegraf.Metric, b []telegraf.Metric) bool { + subset := true + for _, m := range a { + if !belongs(m, b) { + subset = false + break + } + } + return subset +} + +func equalSets(l1 []telegraf.Metric, l2 []telegraf.Metric) bool { + return subSet(l1, l2) && subSet(l2, l1) +} + +func createDuration(t int) internal.Duration { + return internal.Duration{Duration: time.Second * time.Duration(t)} +} + +func runAndCompare(topk *TopK, metrics []telegraf.Metric, answer []telegraf.Metric, testID string, t *testing.T) { + // Sleep for `period`, otherwise the processor will only + // cache the metrics, but it will not process them + time.Sleep(topk.Period.Duration) + + // Run the processor + ret := topk.Apply(metrics...) + topk.Reset() + + // The returned set mut be equal to the answer set + if !equalSets(ret, answer) { + t.Error("\nExpected metrics for", testID, ":\n", + answer, "\nReturned metrics:\n", ret) + } +} + +// Smoke tests +func TestTopkAggregatorsSmokeTests(t *testing.T) { + + // Build the processor + var topk TopK + topk = *New() + topk.Period = createDuration(1) + topk.Fields = []string{"a"} + topk.GroupBy = []string{"tag_name"} + + aggregators := []string{"mean", "sum", "max", "min"} + + //The answer is equal to the original set for these particual scenarios + input := MetricsSet1 + answer := MetricsSet1 + + for _, ag := range aggregators { + topk.Aggregation = ag + + runAndCompare(&topk, input, answer, "SmokeAggregator_"+ag, t) + } +} + +// AddAggregateFields + Mean aggregator +func TestTopkMeanAddAggregateFields(t *testing.T) { + + // Build the processor + var topk TopK + topk = *New() + topk.Period = createDuration(1) + topk.Aggregation = "mean" + topk.AddAggregateFields = []string{"a"} + topk.Fields = []string{"a"} + topk.GroupBy = []string{"tag_name"} + + // Get the input + input := deepCopy(MetricsSet1) + + // Generate the answer + chng := fieldList(field{"a_topk_aggregate", float64(28.044)}) + changeSet := map[int]metricChange{ + 0: metricChange{newFields: chng}, + 1: metricChange{newFields: chng}, + 2: metricChange{newFields: chng}, + 3: metricChange{newFields: chng}, + 4: metricChange{newFields: chng}, + } + answer := generateAns(input, changeSet) + + // Run the test + runAndCompare(&topk, input, answer, "MeanAddAggregateFields test", t) +} + +// AddAggregateFields + Sum aggregator +func TestTopkSumAddAggregateFields(t *testing.T) { + + // Build the processor + var topk TopK + topk = *New() + topk.Period = createDuration(1) + topk.Aggregation = "sum" + topk.AddAggregateFields = []string{"a"} + topk.Fields = []string{"a"} + topk.GroupBy = []string{"tag_name"} + + // Get the input + input := deepCopy(MetricsSet1) + + // Generate the answer + chng := fieldList(field{"a_topk_aggregate", float64(140.22)}) + changeSet := map[int]metricChange{ + 0: metricChange{newFields: chng}, + 1: metricChange{newFields: chng}, + 2: metricChange{newFields: chng}, + 3: metricChange{newFields: chng}, + 4: metricChange{newFields: chng}, + } + answer := generateAns(input, changeSet) + + // Run the test + runAndCompare(&topk, input, answer, "SumAddAggregateFields test", t) +} + +// AddAggregateFields + Max aggregator +func TestTopkMaxAddAggregateFields(t *testing.T) { + + // Build the processor + var topk TopK + topk = *New() + topk.Period = createDuration(1) + topk.Aggregation = "max" + topk.AddAggregateFields = []string{"a"} + topk.Fields = []string{"a"} + topk.GroupBy = []string{"tag_name"} + + // Get the input + input := deepCopy(MetricsSet1) + + // Generate the answer + chng := fieldList(field{"a_topk_aggregate", float64(50.5)}) + changeSet := map[int]metricChange{ + 0: metricChange{newFields: chng}, + 1: metricChange{newFields: chng}, + 2: metricChange{newFields: chng}, + 3: metricChange{newFields: chng}, + 4: metricChange{newFields: chng}, + } + answer := generateAns(input, changeSet) + + // Run the test + runAndCompare(&topk, input, answer, "MaxAddAggregateFields test", t) +} + +// AddAggregateFields + Min aggregator +func TestTopkMinAddAggregateFields(t *testing.T) { + + // Build the processor + var topk TopK + topk = *New() + topk.Period = createDuration(1) + topk.Aggregation = "min" + topk.AddAggregateFields = []string{"a"} + topk.Fields = []string{"a"} + topk.GroupBy = []string{"tag_name"} + + // Get the input + input := deepCopy(MetricsSet1) + + // Generate the answer + chng := fieldList(field{"a_topk_aggregate", float64(0.3)}) + changeSet := map[int]metricChange{ + 0: metricChange{newFields: chng}, + 1: metricChange{newFields: chng}, + 2: metricChange{newFields: chng}, + 3: metricChange{newFields: chng}, + 4: metricChange{newFields: chng}, + } + answer := generateAns(input, changeSet) + + // Run the test + runAndCompare(&topk, input, answer, "MinAddAggregateFields test", t) +} + +// GroupBy +func TestTopkGroupby1(t *testing.T) { + + // Build the processor + var topk TopK + topk = *New() + topk.Period = createDuration(1) + topk.K = 3 + topk.Aggregation = "sum" + topk.AddAggregateFields = []string{"value"} + topk.GroupBy = []string{"tag[13]"} + + // Get the input + input := deepCopy(MetricsSet2) + + // Generate the answer + changeSet := map[int]metricChange{ + 2: metricChange{newFields: fieldList(field{"value_topk_aggregate", float64(74.18)})}, + 3: metricChange{newFields: fieldList(field{"value_topk_aggregate", float64(72)})}, + 4: metricChange{newFields: fieldList(field{"value_topk_aggregate", float64(163.22)})}, + 5: metricChange{newFields: fieldList(field{"value_topk_aggregate", float64(163.22)})}, + } + answer := generateAns(input, changeSet) + + // Run the test + runAndCompare(&topk, input, answer, "GroupBy test 1", t) +} +func TestTopkGroupby2(t *testing.T) { + + // Build the processor + var topk TopK + topk = *New() + topk.Period = createDuration(1) + topk.K = 3 + topk.Aggregation = "mean" + topk.AddAggregateFields = []string{"value"} + topk.GroupBy = []string{"tag1"} + + // Get the input + input := deepCopy(MetricsSet2) + + // Generate the answer + chng1 := fieldList(field{"value_topk_aggregate", float64(66.805)}) + chng2 := fieldList(field{"value_topk_aggregate", float64(72)}) + chng3 := fieldList(field{"value_topk_aggregate", float64(81.61)}) + changeSet := map[int]metricChange{ + 1: metricChange{newFields: chng1}, + 2: metricChange{newFields: chng1}, + 3: metricChange{newFields: chng2}, + 4: metricChange{newFields: chng3}, + 5: metricChange{newFields: chng3}, + } + answer := generateAns(input, changeSet) + + // Run the test + runAndCompare(&topk, input, answer, "GroupBy test 2", t) +} +func TestTopkGroupby3(t *testing.T) { + + // Build the processor + var topk TopK + topk = *New() + topk.Period = createDuration(1) + topk.K = 1 + topk.Aggregation = "min" + topk.AddAggregateFields = []string{"value"} + topk.GroupBy = []string{"tag4"} + + // Get the input + input := deepCopy(MetricsSet2) + + // Generate the answer + chng := fieldList(field{"value_topk_aggregate", float64(75.3)}) + changeSet := map[int]metricChange{ + 4: metricChange{newFields: chng}, + 5: metricChange{newFields: chng}, + } + answer := generateAns(input, changeSet) + + // Run the test + runAndCompare(&topk, input, answer, "GroupBy test 3", t) +} + +// GroupBy + Fields +func TestTopkGroupbyFields1(t *testing.T) { + + // Build the processor + var topk TopK + topk = *New() + topk.Period = createDuration(1) + topk.K = 4 // This settings generate less than 3 groups + topk.Aggregation = "mean" + topk.AddAggregateFields = []string{"A"} + topk.GroupBy = []string{"tag1", "tag2"} + topk.Fields = []string{"A"} + + // Get the input + input := deepCopy(MetricsSet2) + + // Generate the answer + changeSet := map[int]metricChange{ + 0: metricChange{newFields: fieldList(field{"A_topk_aggregate", float64(95.36)})}, + 1: metricChange{newFields: fieldList(field{"A_topk_aggregate", float64(39.01)})}, + 2: metricChange{newFields: fieldList(field{"A_topk_aggregate", float64(39.01)})}, + 5: metricChange{newFields: fieldList(field{"A_topk_aggregate", float64(29.45)})}, + } + answer := generateAns(input, changeSet) + + // Run the test + runAndCompare(&topk, input, answer, "GroupBy Fields test 1", t) +} + +func TestTopkGroupbyFields2(t *testing.T) { + + // Build the processor + var topk TopK + topk = *New() + topk.Period = createDuration(1) + topk.K = 2 + topk.Aggregation = "sum" + topk.AddAggregateFields = []string{"B", "C"} + topk.GroupBy = []string{"tag1", "tag3"} + topk.Fields = []string{"B", "C"} + + // Get the input + input := deepCopy(MetricsSet2) + + // Generate the answer + changeSet := map[int]metricChange{ + 0: metricChange{newFields: fieldList(field{"C_topk_aggregate", float64(72.41)})}, + 2: metricChange{newFields: fieldList(field{"B_topk_aggregate", float64(60.96)})}, + 4: metricChange{newFields: fieldList(field{"B_topk_aggregate", float64(81.55)}, field{"C_topk_aggregate", float64(49.96)})}, + 5: metricChange{newFields: fieldList(field{"C_topk_aggregate", float64(49.96)})}, + } + answer := generateAns(input, changeSet) + + // Run the test + runAndCompare(&topk, input, answer, "GroupBy Fields test 2", t) +} + +// GroupBy metric name +func TestTopkGroupbyMetricName1(t *testing.T) { + + // Build the processor + var topk TopK + topk = *New() + topk.Period = createDuration(1) + topk.K = 1 + topk.Aggregation = "sum" + topk.AddAggregateFields = []string{"value"} + topk.GroupBy = []string{} + + // Get the input + input := deepCopy(MetricsSet2) + + // Generate the answer + chng := fieldList(field{"value_topk_aggregate", float64(235.22000000000003)}) + changeSet := map[int]metricChange{ + 3: metricChange{newFields: chng}, + 4: metricChange{newFields: chng}, + 5: metricChange{newFields: chng}, + } + answer := generateAns(input, changeSet) + + // Run the test + runAndCompare(&topk, input, answer, "GroupBy by metric name test 1", t) +} + +func TestTopkGroupbyMetricName2(t *testing.T) { + + // Build the processor + var topk TopK + topk = *New() + topk.Period = createDuration(1) + topk.K = 2 + topk.Aggregation = "sum" + topk.AddAggregateFields = []string{"A", "value"} + topk.GroupBy = []string{"tag[12]"} + topk.Fields = []string{"A", "value"} + + // Get the input + input := deepCopy(MetricsSet2) + + // Generate the answer + changeSet := map[int]metricChange{ + 0: metricChange{newFields: fieldList(field{"A_topk_aggregate", float64(95.36)})}, + 1: metricChange{newFields: fieldList(field{"A_topk_aggregate", float64(78.02)}, field{"value_topk_aggregate", float64(133.61)})}, + 2: metricChange{newFields: fieldList(field{"A_topk_aggregate", float64(78.02)}, field{"value_topk_aggregate", float64(133.61)})}, + 4: metricChange{newFields: fieldList(field{"value_topk_aggregate", float64(87.92)})}, + } + answer := generateAns(input, changeSet) + + // Run the test + runAndCompare(&topk, input, answer, "GroupBy by metric name test 2", t) +} + +// BottomK +func TestTopkBottomk(t *testing.T) { + + // Build the processor + var topk TopK + topk = *New() + topk.Period = createDuration(1) + topk.K = 3 + topk.Aggregation = "sum" + topk.GroupBy = []string{"tag1", "tag3"} + topk.Bottomk = true + + // Get the input + input := deepCopy(MetricsSet2) + + // Generate the answer + changeSet := map[int]metricChange{ + 0: metricChange{}, + 1: metricChange{}, + 3: metricChange{}, + } + answer := generateAns(input, changeSet) + + // Run the test + runAndCompare(&topk, input, answer, "Bottom k test", t) +} + +// GroupByKeyTag +func TestTopkGroupByKeyTag(t *testing.T) { + + // Build the processor + var topk TopK + topk = *New() + topk.Period = createDuration(1) + topk.K = 3 + topk.Aggregation = "sum" + topk.GroupBy = []string{"tag1", "tag3"} + topk.AddGroupByTag = "gbt" + + // Get the input + input := deepCopy(MetricsSet2) + + // Generate the answer + changeSet := map[int]metricChange{ + 2: metricChange{newTags: tagList(tag{"gbt", "metric1&tag1=TWO&tag3=SIX&"})}, + 3: metricChange{newTags: tagList(tag{"gbt", "metric2&tag1=ONE&tag3=THREE&"})}, + 4: metricChange{newTags: tagList(tag{"gbt", "metric2&tag1=TWO&tag3=SEVEN&"})}, + 5: metricChange{newTags: tagList(tag{"gbt", "metric2&tag1=TWO&tag3=SEVEN&"})}, + } + answer := generateAns(input, changeSet) + + // Run the test + runAndCompare(&topk, input, answer, "GroupByKeyTag test", t) +}