diff --git a/internal/config/config.go b/internal/config/config.go index 3dc2c02ee..06822aa20 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -887,8 +887,7 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err // buildProcessor TODO doc func buildProcessor(name string, tbl *ast.Table) (*models.ProcessorConfig, error) { conf := &models.ProcessorConfig{Name: name} - unsupportedFields := []string{"pass", "fieldpass", "drop", "fielddrop", - "tagexclude", "taginclude"} + unsupportedFields := []string{"tagexclude", "taginclude"} for _, field := range unsupportedFields { if _, ok := tbl.Fields[field]; ok { // TODO raise error because field is not supported diff --git a/plugins/aggregators/minmax/minmax.go b/plugins/aggregators/minmax/minmax.go index e628ad7ac..62d885535 100644 --- a/plugins/aggregators/minmax/minmax.go +++ b/plugins/aggregators/minmax/minmax.go @@ -26,8 +26,8 @@ type MinMax struct { } type minmax struct { - min interface{} - max interface{} + min float64 + max float64 } var sampleConfig = ` @@ -55,24 +55,36 @@ func (m *MinMax) apply(in telegraf.Metric) { m.tagCache[id] = in.Tags() m.fieldCache[id] = make(map[string]minmax) for k, v := range in.Fields() { - m.fieldCache[id][k] = minmax{ - min: v, - max: v, + if fv, ok := convert(v); ok { + m.fieldCache[id][k] = minmax{ + min: fv, + max: fv, + } } } } else { for k, v := range in.Fields() { - cmpmin := compare(m.fieldCache[id][k].min, v) - cmpmax := compare(m.fieldCache[id][k].max, v) - if cmpmin == 1 { - tmp := m.fieldCache[id][k] - tmp.min = v - m.fieldCache[id][k] = tmp - } - if cmpmax == -1 { - tmp := m.fieldCache[id][k] - tmp.max = v - m.fieldCache[id][k] = tmp + if fv, ok := convert(v); ok { + if _, ok := m.fieldCache[id][k]; !ok { + // hit an uncached field of a cached metric + m.fieldCache[id][k] = minmax{ + min: fv, + max: fv, + } + continue + } + cmpmin := compare(m.fieldCache[id][k].min, fv) + cmpmax := compare(m.fieldCache[id][k].max, fv) + if cmpmin == 1 { + tmp := m.fieldCache[id][k] + tmp.min = fv + m.fieldCache[id][k] = tmp + } + if cmpmax == -1 { + tmp := m.fieldCache[id][k] + tmp.max = fv + m.fieldCache[id][k] = tmp + } } } } @@ -156,32 +168,23 @@ func (m *MinMax) continuousHandler() { } } -func compare(a, b interface{}) int { - switch at := a.(type) { - case int64: - if bt, ok := b.(int64); ok { - if at < bt { - return -1 - } else if at > bt { - return 1 - } - return 0 - } else { - return 0 - } +func compare(a, b float64) int { + if a < b { + return -1 + } else if a > b { + return 1 + } + return 0 +} + +func convert(in interface{}) (float64, bool) { + switch v := in.(type) { case float64: - if bt, ok := b.(float64); ok { - if at < bt { - return -1 - } else if at > bt { - return 1 - } - return 0 - } else { - return 0 - } + return v, true + case int64: + return float64(v), true default: - return 0 + return 0, false } } diff --git a/plugins/aggregators/minmax/minmax_test.go b/plugins/aggregators/minmax/minmax_test.go index 5a854d91b..fb902a99f 100644 --- a/plugins/aggregators/minmax/minmax_test.go +++ b/plugins/aggregators/minmax/minmax_test.go @@ -5,47 +5,265 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/testutil" + + "github.com/stretchr/testify/assert" +) + +var m1, _ = telegraf.NewMetric("m1", + map[string]string{"foo": "bar"}, + map[string]interface{}{ + "a": int64(1), + "b": int64(1), + "c": int64(1), + "d": int64(1), + "e": int64(1), + "f": float64(2), + "g": float64(2), + "h": float64(2), + "i": float64(2), + "j": float64(3), + }, + time.Now(), +) +var m2, _ = telegraf.NewMetric("m1", + map[string]string{"foo": "bar"}, + map[string]interface{}{ + "a": int64(1), + "b": int64(3), + "c": int64(3), + "d": int64(3), + "e": int64(3), + "f": float64(1), + "g": float64(1), + "h": float64(1), + "i": float64(1), + "j": float64(1), + "k": float64(200), + "ignoreme": "string", + "andme": true, + }, + time.Now(), ) func BenchmarkApply(b *testing.B) { minmax := MinMax{} minmax.clearCache() - m1, _ := telegraf.NewMetric("m1", - map[string]string{"foo": "bar"}, - map[string]interface{}{ - "a": int64(1), - "b": int64(1), - "c": int64(1), - "d": int64(1), - "e": int64(1), - "f": float64(2), - "g": float64(2), - "h": float64(2), - "i": float64(2), - "j": float64(3), - }, - time.Now(), - ) - m2, _ := telegraf.NewMetric("m1", - map[string]string{"foo": "bar"}, - map[string]interface{}{ - "a": int64(3), - "b": int64(3), - "c": int64(3), - "d": int64(3), - "e": int64(3), - "f": float64(1), - "g": float64(1), - "h": float64(1), - "i": float64(1), - "j": float64(1), - }, - time.Now(), - ) - for n := 0; n < b.N; n++ { minmax.apply(m1) minmax.apply(m2) } } + +// Test two metrics getting added, when running with a period, and the metrics +// are added in the same period. +func TestMinMaxWithPeriod(t *testing.T) { + acc := testutil.Accumulator{} + minmax := MinMax{ + Period: internal.Duration{Duration: time.Millisecond * 500}, + } + assert.NoError(t, minmax.Start(&acc)) + defer minmax.Stop() + + minmax.Apply(m1) + minmax.Apply(m2) + + for { + if acc.NMetrics() > 0 { + break + } + time.Sleep(time.Millisecond) + } + + expectedFields := map[string]interface{}{ + "a_max": float64(1), + "a_min": float64(1), + "b_max": float64(3), + "b_min": float64(1), + "c_max": float64(3), + "c_min": float64(1), + "d_max": float64(3), + "d_min": float64(1), + "e_max": float64(3), + "e_min": float64(1), + "f_max": float64(2), + "f_min": float64(1), + "g_max": float64(2), + "g_min": float64(1), + "h_max": float64(2), + "h_min": float64(1), + "i_max": float64(2), + "i_min": float64(1), + "j_max": float64(3), + "j_min": float64(1), + "k_max": float64(200), + "k_min": float64(200), + } + expectedTags := map[string]string{ + "foo": "bar", + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) +} + +// Test two metrics getting added, when running with a period, and the metrics +// are added in two different periods. +func TestMinMaxDifferentPeriods(t *testing.T) { + acc := testutil.Accumulator{} + minmax := MinMax{ + Period: internal.Duration{Duration: time.Millisecond * 100}, + } + assert.NoError(t, minmax.Start(&acc)) + defer minmax.Stop() + + minmax.Apply(m1) + for { + if acc.NMetrics() > 0 { + break + } + time.Sleep(time.Millisecond) + } + expectedFields := map[string]interface{}{ + "a_max": float64(1), + "a_min": float64(1), + "b_max": float64(1), + "b_min": float64(1), + "c_max": float64(1), + "c_min": float64(1), + "d_max": float64(1), + "d_min": float64(1), + "e_max": float64(1), + "e_min": float64(1), + "f_max": float64(2), + "f_min": float64(2), + "g_max": float64(2), + "g_min": float64(2), + "h_max": float64(2), + "h_min": float64(2), + "i_max": float64(2), + "i_min": float64(2), + "j_max": float64(3), + "j_min": float64(3), + } + expectedTags := map[string]string{ + "foo": "bar", + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) + + acc.ClearMetrics() + minmax.Apply(m2) + for { + if acc.NMetrics() > 0 { + break + } + time.Sleep(time.Millisecond) + } + expectedFields = map[string]interface{}{ + "a_max": float64(1), + "a_min": float64(1), + "b_max": float64(3), + "b_min": float64(3), + "c_max": float64(3), + "c_min": float64(3), + "d_max": float64(3), + "d_min": float64(3), + "e_max": float64(3), + "e_min": float64(3), + "f_max": float64(1), + "f_min": float64(1), + "g_max": float64(1), + "g_min": float64(1), + "h_max": float64(1), + "h_min": float64(1), + "i_max": float64(1), + "i_min": float64(1), + "j_max": float64(1), + "j_min": float64(1), + "k_max": float64(200), + "k_min": float64(200), + } + expectedTags = map[string]string{ + "foo": "bar", + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) +} + +// Test two metrics getting added, when running without a period. +func TestMinMaxWithoutPeriod(t *testing.T) { + acc := testutil.Accumulator{} + minmax := MinMax{} + assert.NoError(t, minmax.Start(&acc)) + defer minmax.Stop() + + minmax.Apply(m1) + for { + if acc.NMetrics() > 0 { + break + } + time.Sleep(time.Millisecond) + } + expectedFields := map[string]interface{}{ + "a_max": float64(1), + "a_min": float64(1), + "b_max": float64(1), + "b_min": float64(1), + "c_max": float64(1), + "c_min": float64(1), + "d_max": float64(1), + "d_min": float64(1), + "e_max": float64(1), + "e_min": float64(1), + "f_max": float64(2), + "f_min": float64(2), + "g_max": float64(2), + "g_min": float64(2), + "h_max": float64(2), + "h_min": float64(2), + "i_max": float64(2), + "i_min": float64(2), + "j_max": float64(3), + "j_min": float64(3), + } + expectedTags := map[string]string{ + "foo": "bar", + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) + + acc.ClearMetrics() + minmax.Apply(m2) + for { + if acc.NMetrics() > 0 { + break + } + time.Sleep(time.Millisecond) + } + expectedFields = map[string]interface{}{ + "a_max": float64(1), + "a_min": float64(1), + "b_max": float64(3), + "b_min": float64(1), + "c_max": float64(3), + "c_min": float64(1), + "d_max": float64(3), + "d_min": float64(1), + "e_max": float64(3), + "e_min": float64(1), + "f_max": float64(2), + "f_min": float64(1), + "g_max": float64(2), + "g_min": float64(1), + "h_max": float64(2), + "h_min": float64(1), + "i_max": float64(2), + "i_min": float64(1), + "j_max": float64(3), + "j_min": float64(1), + "k_max": float64(200), + "k_min": float64(200), + } + expectedTags = map[string]string{ + "foo": "bar", + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) +} diff --git a/plugins/processors/printer/printer_test.go b/plugins/processors/printer/printer_test.go index e69de29bb..b0c0dd810 100644 --- a/plugins/processors/printer/printer_test.go +++ b/plugins/processors/printer/printer_test.go @@ -0,0 +1 @@ +package printer diff --git a/testutil/accumulator.go b/testutil/accumulator.go index fe5727917..99f9e3006 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -39,6 +39,13 @@ func (a *Accumulator) NMetrics() uint64 { return atomic.LoadUint64(&a.nMetrics) } +func (a *Accumulator) ClearMetrics() { + atomic.StoreUint64(&a.nMetrics, 0) + a.Lock() + defer a.Unlock() + a.Metrics = make([]*Metric, 0) +} + // AddFields adds a measurement point with a specified timestamp. func (a *Accumulator) AddFields( measurement string,