533 lines
14 KiB
Go
533 lines
14 KiB
Go
package topk
|
|
|
|
import (
|
|
"reflect"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/internal"
|
|
)
|
|
|
|
// Key, value pair that represents a telegraf.Metric Field
|
|
type field struct {
|
|
key string
|
|
val interface{}
|
|
}
|
|
|
|
func fieldList(fields ...field) []field {
|
|
return fields
|
|
}
|
|
|
|
// Key, value pair that represents a telegraf.Metric Tag
|
|
type tag struct {
|
|
key string
|
|
val string
|
|
}
|
|
|
|
func tagList(tags ...tag) []tag {
|
|
return tags
|
|
}
|
|
|
|
// Abstraction of a change in a single metric
|
|
type metricChange struct {
|
|
newFields []field // Fieldsthat should be added to the metric
|
|
newTags []tag // Tags that should be added to the metric
|
|
|
|
runHash bool // Sometimes the metrics' HashID must be run so reflect.DeepEqual works
|
|
// This happens because telegraf.Metric mantains an internal cache of
|
|
// its hash value that is set when HashID() is called for the first time
|
|
}
|
|
|
|
// Generate a new set of metrics from a set of changes. This is used to generate an answer which will be
|
|
// compare against the output of the processor
|
|
// NOTE: A `changeSet` is a map where the keys are the indices of the metrics to keep, and the values
|
|
// are list of new tags and fields to be added to the metric in that index.
|
|
// THE ORDERING OF THE NEW TAGS AND FIELDS MATTERS. When using reflect.DeepEqual to compare metrics,
|
|
// comparing metrics that have the same fields/tags added in different orders will return false, although
|
|
// they are semantically equal.
|
|
// Therefore the fields and tags must be in the same order that the processor would add them
|
|
func generateAns(input []telegraf.Metric, changeSet map[int]metricChange) []telegraf.Metric {
|
|
answer := []telegraf.Metric{}
|
|
|
|
// For every input metric, we check if there is a change we need to apply
|
|
// If there is no change for a given input metric, the metric is dropped
|
|
for i, metric := range input {
|
|
change, ok := changeSet[i]
|
|
if ok {
|
|
// Deep copy the metric
|
|
newMetric := metric.Copy()
|
|
|
|
// Add new fields
|
|
if change.newFields != nil {
|
|
for _, p := range change.newFields {
|
|
newMetric.AddField(p.key, p.val)
|
|
}
|
|
}
|
|
|
|
// Add new tags
|
|
if change.newTags != nil {
|
|
for _, p := range change.newTags {
|
|
newMetric.AddTag(p.key, p.val)
|
|
}
|
|
}
|
|
|
|
// Run the hash function if required
|
|
if change.runHash {
|
|
newMetric.HashID()
|
|
}
|
|
|
|
answer = append(answer, newMetric)
|
|
}
|
|
}
|
|
|
|
return answer
|
|
}
|
|
|
|
func deepCopy(a []telegraf.Metric) []telegraf.Metric {
|
|
ret := make([]telegraf.Metric, 0, len(a))
|
|
for _, m := range a {
|
|
ret = append(ret, m.Copy())
|
|
}
|
|
|
|
return ret
|
|
}
|
|
|
|
func belongs(m telegraf.Metric, ms []telegraf.Metric) bool {
|
|
for _, i := range ms {
|
|
if reflect.DeepEqual(i, m) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func subSet(a []telegraf.Metric, b []telegraf.Metric) bool {
|
|
subset := true
|
|
for _, m := range a {
|
|
if !belongs(m, b) {
|
|
subset = false
|
|
break
|
|
}
|
|
}
|
|
return subset
|
|
}
|
|
|
|
func equalSets(l1 []telegraf.Metric, l2 []telegraf.Metric) bool {
|
|
return subSet(l1, l2) && subSet(l2, l1)
|
|
}
|
|
|
|
func createDuration(t int) internal.Duration {
|
|
return internal.Duration{Duration: time.Second * time.Duration(t)}
|
|
}
|
|
|
|
func runAndCompare(topk *TopK, metrics []telegraf.Metric, answer []telegraf.Metric, testID string, t *testing.T) {
|
|
// Sleep for `period`, otherwise the processor will only
|
|
// cache the metrics, but it will not process them
|
|
time.Sleep(topk.Period.Duration)
|
|
|
|
// Run the processor
|
|
ret := topk.Apply(metrics...)
|
|
topk.Reset()
|
|
|
|
// The returned set mut be equal to the answer set
|
|
if !equalSets(ret, answer) {
|
|
t.Error("\nExpected metrics for", testID, ":\n",
|
|
answer, "\nReturned metrics:\n", ret)
|
|
}
|
|
}
|
|
|
|
// Smoke tests
|
|
func TestTopkAggregatorsSmokeTests(t *testing.T) {
|
|
|
|
// Build the processor
|
|
var topk TopK
|
|
topk = *New()
|
|
topk.Period = createDuration(1)
|
|
topk.Fields = []string{"a"}
|
|
topk.GroupBy = []string{"tag_name"}
|
|
|
|
aggregators := []string{"mean", "sum", "max", "min"}
|
|
|
|
//The answer is equal to the original set for these particual scenarios
|
|
input := MetricsSet1
|
|
answer := MetricsSet1
|
|
|
|
for _, ag := range aggregators {
|
|
topk.Aggregation = ag
|
|
|
|
runAndCompare(&topk, input, answer, "SmokeAggregator_"+ag, t)
|
|
}
|
|
}
|
|
|
|
// AddAggregateFields + Mean aggregator
|
|
func TestTopkMeanAddAggregateFields(t *testing.T) {
|
|
|
|
// Build the processor
|
|
var topk TopK
|
|
topk = *New()
|
|
topk.Period = createDuration(1)
|
|
topk.Aggregation = "mean"
|
|
topk.AddAggregateFields = []string{"a"}
|
|
topk.Fields = []string{"a"}
|
|
topk.GroupBy = []string{"tag_name"}
|
|
|
|
// Get the input
|
|
input := deepCopy(MetricsSet1)
|
|
|
|
// Generate the answer
|
|
chng := fieldList(field{"a_topk_aggregate", float64(28.044)})
|
|
changeSet := map[int]metricChange{
|
|
0: {newFields: chng},
|
|
1: {newFields: chng},
|
|
2: {newFields: chng},
|
|
3: {newFields: chng},
|
|
4: {newFields: chng},
|
|
}
|
|
answer := generateAns(input, changeSet)
|
|
|
|
// Run the test
|
|
runAndCompare(&topk, input, answer, "MeanAddAggregateFields test", t)
|
|
}
|
|
|
|
// AddAggregateFields + Sum aggregator
|
|
func TestTopkSumAddAggregateFields(t *testing.T) {
|
|
|
|
// Build the processor
|
|
var topk TopK
|
|
topk = *New()
|
|
topk.Period = createDuration(1)
|
|
topk.Aggregation = "sum"
|
|
topk.AddAggregateFields = []string{"a"}
|
|
topk.Fields = []string{"a"}
|
|
topk.GroupBy = []string{"tag_name"}
|
|
|
|
// Get the input
|
|
input := deepCopy(MetricsSet1)
|
|
|
|
// Generate the answer
|
|
chng := fieldList(field{"a_topk_aggregate", float64(140.22)})
|
|
changeSet := map[int]metricChange{
|
|
0: {newFields: chng},
|
|
1: {newFields: chng},
|
|
2: {newFields: chng},
|
|
3: {newFields: chng},
|
|
4: {newFields: chng},
|
|
}
|
|
answer := generateAns(input, changeSet)
|
|
|
|
// Run the test
|
|
runAndCompare(&topk, input, answer, "SumAddAggregateFields test", t)
|
|
}
|
|
|
|
// AddAggregateFields + Max aggregator
|
|
func TestTopkMaxAddAggregateFields(t *testing.T) {
|
|
|
|
// Build the processor
|
|
var topk TopK
|
|
topk = *New()
|
|
topk.Period = createDuration(1)
|
|
topk.Aggregation = "max"
|
|
topk.AddAggregateFields = []string{"a"}
|
|
topk.Fields = []string{"a"}
|
|
topk.GroupBy = []string{"tag_name"}
|
|
|
|
// Get the input
|
|
input := deepCopy(MetricsSet1)
|
|
|
|
// Generate the answer
|
|
chng := fieldList(field{"a_topk_aggregate", float64(50.5)})
|
|
changeSet := map[int]metricChange{
|
|
0: {newFields: chng},
|
|
1: {newFields: chng},
|
|
2: {newFields: chng},
|
|
3: {newFields: chng},
|
|
4: {newFields: chng},
|
|
}
|
|
answer := generateAns(input, changeSet)
|
|
|
|
// Run the test
|
|
runAndCompare(&topk, input, answer, "MaxAddAggregateFields test", t)
|
|
}
|
|
|
|
// AddAggregateFields + Min aggregator
|
|
func TestTopkMinAddAggregateFields(t *testing.T) {
|
|
|
|
// Build the processor
|
|
var topk TopK
|
|
topk = *New()
|
|
topk.Period = createDuration(1)
|
|
topk.Aggregation = "min"
|
|
topk.AddAggregateFields = []string{"a"}
|
|
topk.Fields = []string{"a"}
|
|
topk.GroupBy = []string{"tag_name"}
|
|
|
|
// Get the input
|
|
input := deepCopy(MetricsSet1)
|
|
|
|
// Generate the answer
|
|
chng := fieldList(field{"a_topk_aggregate", float64(0.3)})
|
|
changeSet := map[int]metricChange{
|
|
0: {newFields: chng},
|
|
1: {newFields: chng},
|
|
2: {newFields: chng},
|
|
3: {newFields: chng},
|
|
4: {newFields: chng},
|
|
}
|
|
answer := generateAns(input, changeSet)
|
|
|
|
// Run the test
|
|
runAndCompare(&topk, input, answer, "MinAddAggregateFields test", t)
|
|
}
|
|
|
|
// GroupBy
|
|
func TestTopkGroupby1(t *testing.T) {
|
|
|
|
// Build the processor
|
|
var topk TopK
|
|
topk = *New()
|
|
topk.Period = createDuration(1)
|
|
topk.K = 3
|
|
topk.Aggregation = "sum"
|
|
topk.AddAggregateFields = []string{"value"}
|
|
topk.GroupBy = []string{"tag[13]"}
|
|
|
|
// Get the input
|
|
input := deepCopy(MetricsSet2)
|
|
|
|
// Generate the answer
|
|
changeSet := map[int]metricChange{
|
|
2: {newFields: fieldList(field{"value_topk_aggregate", float64(74.18)})},
|
|
3: {newFields: fieldList(field{"value_topk_aggregate", float64(72)})},
|
|
4: {newFields: fieldList(field{"value_topk_aggregate", float64(163.22)})},
|
|
5: {newFields: fieldList(field{"value_topk_aggregate", float64(163.22)})},
|
|
}
|
|
answer := generateAns(input, changeSet)
|
|
|
|
// Run the test
|
|
runAndCompare(&topk, input, answer, "GroupBy test 1", t)
|
|
}
|
|
func TestTopkGroupby2(t *testing.T) {
|
|
|
|
// Build the processor
|
|
var topk TopK
|
|
topk = *New()
|
|
topk.Period = createDuration(1)
|
|
topk.K = 3
|
|
topk.Aggregation = "mean"
|
|
topk.AddAggregateFields = []string{"value"}
|
|
topk.GroupBy = []string{"tag1"}
|
|
|
|
// Get the input
|
|
input := deepCopy(MetricsSet2)
|
|
|
|
// Generate the answer
|
|
chng1 := fieldList(field{"value_topk_aggregate", float64(66.805)})
|
|
chng2 := fieldList(field{"value_topk_aggregate", float64(72)})
|
|
chng3 := fieldList(field{"value_topk_aggregate", float64(81.61)})
|
|
changeSet := map[int]metricChange{
|
|
1: {newFields: chng1},
|
|
2: {newFields: chng1},
|
|
3: {newFields: chng2},
|
|
4: {newFields: chng3},
|
|
5: {newFields: chng3},
|
|
}
|
|
answer := generateAns(input, changeSet)
|
|
|
|
// Run the test
|
|
runAndCompare(&topk, input, answer, "GroupBy test 2", t)
|
|
}
|
|
func TestTopkGroupby3(t *testing.T) {
|
|
|
|
// Build the processor
|
|
var topk TopK
|
|
topk = *New()
|
|
topk.Period = createDuration(1)
|
|
topk.K = 1
|
|
topk.Aggregation = "min"
|
|
topk.AddAggregateFields = []string{"value"}
|
|
topk.GroupBy = []string{"tag4"}
|
|
|
|
// Get the input
|
|
input := deepCopy(MetricsSet2)
|
|
|
|
// Generate the answer
|
|
chng := fieldList(field{"value_topk_aggregate", float64(75.3)})
|
|
changeSet := map[int]metricChange{
|
|
4: {newFields: chng},
|
|
5: {newFields: chng},
|
|
}
|
|
answer := generateAns(input, changeSet)
|
|
|
|
// Run the test
|
|
runAndCompare(&topk, input, answer, "GroupBy test 3", t)
|
|
}
|
|
|
|
// GroupBy + Fields
|
|
func TestTopkGroupbyFields1(t *testing.T) {
|
|
|
|
// Build the processor
|
|
var topk TopK
|
|
topk = *New()
|
|
topk.Period = createDuration(1)
|
|
topk.K = 4 // This settings generate less than 3 groups
|
|
topk.Aggregation = "mean"
|
|
topk.AddAggregateFields = []string{"A"}
|
|
topk.GroupBy = []string{"tag1", "tag2"}
|
|
topk.Fields = []string{"A"}
|
|
|
|
// Get the input
|
|
input := deepCopy(MetricsSet2)
|
|
|
|
// Generate the answer
|
|
changeSet := map[int]metricChange{
|
|
0: {newFields: fieldList(field{"A_topk_aggregate", float64(95.36)})},
|
|
1: {newFields: fieldList(field{"A_topk_aggregate", float64(39.01)})},
|
|
2: {newFields: fieldList(field{"A_topk_aggregate", float64(39.01)})},
|
|
5: {newFields: fieldList(field{"A_topk_aggregate", float64(29.45)})},
|
|
}
|
|
answer := generateAns(input, changeSet)
|
|
|
|
// Run the test
|
|
runAndCompare(&topk, input, answer, "GroupBy Fields test 1", t)
|
|
}
|
|
|
|
func TestTopkGroupbyFields2(t *testing.T) {
|
|
|
|
// Build the processor
|
|
var topk TopK
|
|
topk = *New()
|
|
topk.Period = createDuration(1)
|
|
topk.K = 2
|
|
topk.Aggregation = "sum"
|
|
topk.AddAggregateFields = []string{"B", "C"}
|
|
topk.GroupBy = []string{"tag1", "tag3"}
|
|
topk.Fields = []string{"B", "C"}
|
|
|
|
// Get the input
|
|
input := deepCopy(MetricsSet2)
|
|
|
|
// Generate the answer
|
|
changeSet := map[int]metricChange{
|
|
0: {newFields: fieldList(field{"C_topk_aggregate", float64(72.41)})},
|
|
2: {newFields: fieldList(field{"B_topk_aggregate", float64(60.96)})},
|
|
4: {newFields: fieldList(field{"B_topk_aggregate", float64(81.55)}, field{"C_topk_aggregate", float64(49.96)})},
|
|
5: {newFields: fieldList(field{"C_topk_aggregate", float64(49.96)})},
|
|
}
|
|
answer := generateAns(input, changeSet)
|
|
|
|
// Run the test
|
|
runAndCompare(&topk, input, answer, "GroupBy Fields test 2", t)
|
|
}
|
|
|
|
// GroupBy metric name
|
|
func TestTopkGroupbyMetricName1(t *testing.T) {
|
|
|
|
// Build the processor
|
|
var topk TopK
|
|
topk = *New()
|
|
topk.Period = createDuration(1)
|
|
topk.K = 1
|
|
topk.Aggregation = "sum"
|
|
topk.AddAggregateFields = []string{"value"}
|
|
topk.GroupBy = []string{}
|
|
|
|
// Get the input
|
|
input := deepCopy(MetricsSet2)
|
|
|
|
// Generate the answer
|
|
chng := fieldList(field{"value_topk_aggregate", float64(235.22000000000003)})
|
|
changeSet := map[int]metricChange{
|
|
3: {newFields: chng},
|
|
4: {newFields: chng},
|
|
5: {newFields: chng},
|
|
}
|
|
answer := generateAns(input, changeSet)
|
|
|
|
// Run the test
|
|
runAndCompare(&topk, input, answer, "GroupBy by metric name test 1", t)
|
|
}
|
|
|
|
func TestTopkGroupbyMetricName2(t *testing.T) {
|
|
|
|
// Build the processor
|
|
var topk TopK
|
|
topk = *New()
|
|
topk.Period = createDuration(1)
|
|
topk.K = 2
|
|
topk.Aggregation = "sum"
|
|
topk.AddAggregateFields = []string{"A", "value"}
|
|
topk.GroupBy = []string{"tag[12]"}
|
|
topk.Fields = []string{"A", "value"}
|
|
|
|
// Get the input
|
|
input := deepCopy(MetricsSet2)
|
|
|
|
// Generate the answer
|
|
changeSet := map[int]metricChange{
|
|
0: {newFields: fieldList(field{"A_topk_aggregate", float64(95.36)})},
|
|
1: {newFields: fieldList(field{"A_topk_aggregate", float64(78.02)}, field{"value_topk_aggregate", float64(133.61)})},
|
|
2: {newFields: fieldList(field{"A_topk_aggregate", float64(78.02)}, field{"value_topk_aggregate", float64(133.61)})},
|
|
4: {newFields: fieldList(field{"value_topk_aggregate", float64(87.92)})},
|
|
}
|
|
answer := generateAns(input, changeSet)
|
|
|
|
// Run the test
|
|
runAndCompare(&topk, input, answer, "GroupBy by metric name test 2", t)
|
|
}
|
|
|
|
// BottomK
|
|
func TestTopkBottomk(t *testing.T) {
|
|
|
|
// Build the processor
|
|
var topk TopK
|
|
topk = *New()
|
|
topk.Period = createDuration(1)
|
|
topk.K = 3
|
|
topk.Aggregation = "sum"
|
|
topk.GroupBy = []string{"tag1", "tag3"}
|
|
topk.Bottomk = true
|
|
|
|
// Get the input
|
|
input := deepCopy(MetricsSet2)
|
|
|
|
// Generate the answer
|
|
changeSet := map[int]metricChange{
|
|
0: {},
|
|
1: {},
|
|
3: {},
|
|
}
|
|
answer := generateAns(input, changeSet)
|
|
|
|
// Run the test
|
|
runAndCompare(&topk, input, answer, "Bottom k test", t)
|
|
}
|
|
|
|
// GroupByKeyTag
|
|
func TestTopkGroupByKeyTag(t *testing.T) {
|
|
|
|
// Build the processor
|
|
var topk TopK
|
|
topk = *New()
|
|
topk.Period = createDuration(1)
|
|
topk.K = 3
|
|
topk.Aggregation = "sum"
|
|
topk.GroupBy = []string{"tag1", "tag3"}
|
|
topk.AddGroupByTag = "gbt"
|
|
|
|
// Get the input
|
|
input := deepCopy(MetricsSet2)
|
|
|
|
// Generate the answer
|
|
changeSet := map[int]metricChange{
|
|
2: {newTags: tagList(tag{"gbt", "metric1&tag1=TWO&tag3=SIX&"})},
|
|
3: {newTags: tagList(tag{"gbt", "metric2&tag1=ONE&tag3=THREE&"})},
|
|
4: {newTags: tagList(tag{"gbt", "metric2&tag1=TWO&tag3=SEVEN&"})},
|
|
5: {newTags: tagList(tag{"gbt", "metric2&tag1=TWO&tag3=SEVEN&"})},
|
|
}
|
|
answer := generateAns(input, changeSet)
|
|
|
|
// Run the test
|
|
runAndCompare(&topk, input, answer, "GroupByKeyTag test", t)
|
|
}
|