From fead80844e75383ffccabfb848a649526db94ddc Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Thu, 22 Sep 2016 18:10:51 +0100 Subject: [PATCH] Refactor handling of MinMax functionality into RunningAggregator allows for easier addition of a sliding window at a later time. Also makes `period` be a generic argument for all aggregator plugins. --- agent/agent.go | 60 +++++---- aggregator.go | 20 +-- internal/config/config.go | 40 ++++-- internal/models/running_aggregator.go | 69 +++++++++- internal/models/running_aggregator_test.go | 117 ++++++++++++----- plugins/aggregators/minmax/minmax.go | 104 +++------------ plugins/aggregators/minmax/minmax_test.go | 139 +++------------------ 7 files changed, 252 insertions(+), 297 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index a912126ba..0e2af0f9f 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -259,9 +259,6 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er for { select { case <-shutdown: - for _, agg := range a.Config.Aggregators { - agg.Aggregator.Stop() - } if len(outMetricC) > 0 { // keep going until outMetricC is flushed continue @@ -273,7 +270,7 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er var dropOriginal bool if !m.IsAggregate() { for _, agg := range a.Config.Aggregators { - if ok := agg.Apply(copyMetric(m)); ok { + if ok := agg.Add(copyMetric(m)); ok { dropOriginal = true } } @@ -315,22 +312,6 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er } } -func copyMetric(m telegraf.Metric) telegraf.Metric { - t := time.Time(m.Time()) - - tags := make(map[string]string) - fields := make(map[string]interface{}) - for k, v := range m.Tags() { - tags[k] = v - } - for k, v := range m.Fields() { - fields[k] = v - } - - out, _ := telegraf.NewMetric(m.Name(), tags, fields, t) - return out -} - // Run runs the agent daemon, gathering every Interval func (a *Agent) Run(shutdown chan struct{}) error { var wg sync.WaitGroup @@ -367,18 +348,6 @@ func (a *Agent) Run(shutdown chan struct{}) error { time.Sleep(time.Duration(i - (time.Now().UnixNano() % i))) } - // Start all Aggregators - for _, aggregator := range a.Config.Aggregators { - acc := NewAccumulator(aggregator, metricC) - acc.SetPrecision(a.Config.Agent.Precision.Duration, - a.Config.Agent.Interval.Duration) - if err := aggregator.Aggregator.Start(acc); err != nil { - log.Printf("[%s] failed to start, exiting\n%s\n", - aggregator.Name(), err.Error()) - return err - } - } - wg.Add(1) go func() { defer wg.Done() @@ -403,6 +372,33 @@ func (a *Agent) Run(shutdown chan struct{}) error { }(input, interval) } + wg.Add(len(a.Config.Aggregators)) + for _, aggregator := range a.Config.Aggregators { + go func(agg *models.RunningAggregator) { + defer wg.Done() + acc := NewAccumulator(agg, metricC) + acc.SetPrecision(a.Config.Agent.Precision.Duration, + a.Config.Agent.Interval.Duration) + agg.Run(acc, shutdown) + }(aggregator) + } + wg.Wait() return nil } + +func copyMetric(m telegraf.Metric) telegraf.Metric { + t := time.Time(m.Time()) + + tags := make(map[string]string) + fields := make(map[string]interface{}) + for k, v := range m.Tags() { + tags[k] = v + } + for k, v := range m.Fields() { + fields[k] = v + } + + out, _ := telegraf.NewMetric(m.Name(), tags, fields, t) + return out +} diff --git a/aggregator.go b/aggregator.go index 2a881c3a4..48aa8e4bf 100644 --- a/aggregator.go +++ b/aggregator.go @@ -1,16 +1,22 @@ package telegraf +// Aggregator is an interface for implementing an Aggregator plugin. +// the RunningAggregator wraps this interface and guarantees that +// Add, Push, and Reset can not be called concurrently, so locking is not +// required when implementing an Aggregator plugin. type Aggregator interface { - // SampleConfig returns the default configuration of the Input + // SampleConfig returns the default configuration of the Input. SampleConfig() string - // Description returns a one-sentence description on the Input + // Description returns a one-sentence description on the Input. Description() string - // Apply the metric to the aggregator - Apply(in Metric) + // Add the metric to the aggregator. + Add(in Metric) - // Start starts the service filter with the given accumulator - Start(acc Accumulator) error - Stop() + // Push pushes the current aggregates to the accumulator. + Push(acc Accumulator) + + // Reset resets the aggregators caches and aggregates. + Reset() } diff --git a/internal/config/config.go b/internal/config/config.go index 06822aa20..64338cded 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -693,7 +693,7 @@ func (c *Config) addAggregator(name string, table *ast.Table) error { } aggregator := creator() - aggregatorConfig, err := buildAggregator(name, table) + conf, err := buildAggregator(name, table) if err != nil { return err } @@ -702,12 +702,7 @@ func (c *Config) addAggregator(name string, table *ast.Table) error { return err } - rf := &models.RunningAggregator{ - Aggregator: aggregator, - Config: aggregatorConfig, - } - - c.Aggregators = append(c.Aggregators, rf) + c.Aggregators = append(c.Aggregators, models.NewRunningAggregator(aggregator, conf)) return nil } @@ -818,7 +813,6 @@ func (c *Config) addInput(name string, table *ast.Table) error { // buildAggregator TODO doc func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, error) { - conf := &models.AggregatorConfig{Name: name} unsupportedFields := []string{"tagexclude", "taginclude"} for _, field := range unsupportedFields { if _, ok := tbl.Fields[field]; ok { @@ -826,6 +820,34 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err } } + conf := &models.AggregatorConfig{Name: name} + + if node, ok := tbl.Fields["period"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + dur, err := time.ParseDuration(str.Value) + if err != nil { + return nil, err + } + + conf.Period = dur + } + } + } + + if node, ok := tbl.Fields["delay"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + dur, err := time.ParseDuration(str.Value) + if err != nil { + return nil, err + } + + conf.Delay = dur + } + } + } + if node, ok := tbl.Fields["drop_original"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if b, ok := kv.Value.(*ast.Boolean); ok { @@ -871,6 +893,8 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err } } + delete(tbl.Fields, "period") + delete(tbl.Fields, "delay") delete(tbl.Fields, "drop_original") delete(tbl.Fields, "name_prefix") delete(tbl.Fields, "name_suffix") diff --git a/internal/models/running_aggregator.go b/internal/models/running_aggregator.go index d344565f2..ca947bb12 100644 --- a/internal/models/running_aggregator.go +++ b/internal/models/running_aggregator.go @@ -7,8 +7,21 @@ import ( ) type RunningAggregator struct { - Aggregator telegraf.Aggregator - Config *AggregatorConfig + a telegraf.Aggregator + Config *AggregatorConfig + + metrics chan telegraf.Metric +} + +func NewRunningAggregator( + a telegraf.Aggregator, + conf *AggregatorConfig, +) *RunningAggregator { + return &RunningAggregator{ + a: a, + Config: conf, + metrics: make(chan telegraf.Metric, 100), + } } // AggregatorConfig containing configuration parameters for the running @@ -22,6 +35,9 @@ type AggregatorConfig struct { MeasurementSuffix string Tags map[string]string Filter Filter + + Period time.Duration + Delay time.Duration } func (r *RunningAggregator) Name() string { @@ -56,10 +72,10 @@ func (r *RunningAggregator) MakeMetric( return m } -// Apply applies the given metric to the aggregator. +// Add applies the given metric to the aggregator. // Before applying to the plugin, it will run any defined filters on the metric. // Apply returns true if the original metric should be dropped. -func (r *RunningAggregator) Apply(in telegraf.Metric) bool { +func (r *RunningAggregator) Add(in telegraf.Metric) bool { if r.Config.Filter.IsActive() { // check if the aggregator should apply this metric name := in.Name() @@ -74,6 +90,49 @@ func (r *RunningAggregator) Apply(in telegraf.Metric) bool { in, _ = telegraf.NewMetric(name, tags, fields, t) } - r.Aggregator.Apply(in) + r.metrics <- in return r.Config.DropOriginal } +func (r *RunningAggregator) add(in telegraf.Metric) { + r.a.Add(in) +} + +func (r *RunningAggregator) push(acc telegraf.Accumulator) { + r.a.Push(acc) +} + +func (r *RunningAggregator) reset() { + r.a.Reset() +} + +func (r *RunningAggregator) Run( + acc telegraf.Accumulator, + shutdown chan struct{}, +) { + if r.Config.Delay == 0 { + r.Config.Delay = time.Millisecond * 100 + } + if r.Config.Period == 0 { + r.Config.Period = time.Second * 30 + } + + time.Sleep(r.Config.Delay) + periodT := time.NewTicker(r.Config.Period) + defer periodT.Stop() + + for { + select { + case <-shutdown: + if len(r.metrics) > 0 { + // wait until metrics are flushed before exiting + continue + } + return + case m := <-r.metrics: + r.add(m) + case <-periodT.C: + r.push(acc) + r.reset() + } + } +} diff --git a/internal/models/running_aggregator_test.go b/internal/models/running_aggregator_test.go index d2f04e202..f816c0a80 100644 --- a/internal/models/running_aggregator_test.go +++ b/internal/models/running_aggregator_test.go @@ -2,26 +2,28 @@ package models import ( "fmt" + "sync" + "sync/atomic" "testing" "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" ) -func TestApply(t *testing.T) { +func TestAdd(t *testing.T) { a := &TestAggregator{} - ra := RunningAggregator{ - Config: &AggregatorConfig{ - Name: "TestRunningAggregator", - Filter: Filter{ - NamePass: []string{"*"}, - }, + ra := NewRunningAggregator(a, &AggregatorConfig{ + Name: "TestRunningAggregator", + Filter: Filter{ + NamePass: []string{"*"}, }, - Aggregator: a, - } + }) assert.NoError(t, ra.Config.Filter.Compile()) + acc := testutil.Accumulator{} + go ra.Run(&acc, make(chan struct{})) m := ra.MakeMetric( "RITest", @@ -30,21 +32,64 @@ func TestApply(t *testing.T) { telegraf.Untyped, time.Now(), ) - assert.False(t, ra.Apply(m)) - assert.Equal(t, int64(101), a.sum) + assert.False(t, ra.Add(m)) + + for { + if atomic.LoadInt64(&a.sum) > 0 { + break + } + } + assert.Equal(t, int64(101), atomic.LoadInt64(&a.sum)) } -func TestApplyDropOriginal(t *testing.T) { - ra := RunningAggregator{ - Config: &AggregatorConfig{ - Name: "TestRunningAggregator", - Filter: Filter{ - NamePass: []string{"RI*"}, - }, - DropOriginal: true, +func TestAddAndPushOnePeriod(t *testing.T) { + a := &TestAggregator{} + ra := NewRunningAggregator(a, &AggregatorConfig{ + Name: "TestRunningAggregator", + Filter: Filter{ + NamePass: []string{"*"}, }, - Aggregator: &TestAggregator{}, + Period: time.Millisecond * 500, + }) + assert.NoError(t, ra.Config.Filter.Compile()) + acc := testutil.Accumulator{} + shutdown := make(chan struct{}) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + ra.Run(&acc, shutdown) + }() + + m := ra.MakeMetric( + "RITest", + map[string]interface{}{"value": int(101)}, + map[string]string{}, + telegraf.Untyped, + time.Now(), + ) + assert.False(t, ra.Add(m)) + + for { + if acc.NMetrics() > 0 { + break + } } + acc.AssertContainsFields(t, "TestMetric", map[string]interface{}{"sum": int64(101)}) + + close(shutdown) + wg.Wait() +} + +func TestAddDropOriginal(t *testing.T) { + ra := NewRunningAggregator(&TestAggregator{}, &AggregatorConfig{ + Name: "TestRunningAggregator", + Filter: Filter{ + NamePass: []string{"RI*"}, + }, + DropOriginal: true, + }) assert.NoError(t, ra.Config.Filter.Compile()) m := ra.MakeMetric( @@ -54,9 +99,9 @@ func TestApplyDropOriginal(t *testing.T) { telegraf.Untyped, time.Now(), ) - assert.True(t, ra.Apply(m)) + assert.True(t, ra.Add(m)) - // this metric name doesn't match the filter, so Apply will return false + // this metric name doesn't match the filter, so Add will return false m2 := ra.MakeMetric( "foobar", map[string]interface{}{"value": int(101)}, @@ -64,17 +109,15 @@ func TestApplyDropOriginal(t *testing.T) { telegraf.Untyped, time.Now(), ) - assert.False(t, ra.Apply(m2)) + assert.False(t, ra.Add(m2)) } // make an untyped, counter, & gauge metric func TestMakeMetricA(t *testing.T) { now := time.Now() - ra := RunningAggregator{ - Config: &AggregatorConfig{ - Name: "TestRunningAggregator", - }, - } + ra := NewRunningAggregator(&TestAggregator{}, &AggregatorConfig{ + Name: "TestRunningAggregator", + }) assert.Equal(t, "aggregators.TestRunningAggregator", ra.Name()) m := ra.MakeMetric( @@ -136,15 +179,21 @@ type TestAggregator struct { sum int64 } -func (t *TestAggregator) Description() string { return "" } -func (t *TestAggregator) SampleConfig() string { return "" } -func (t *TestAggregator) Start(acc telegraf.Accumulator) error { return nil } -func (t *TestAggregator) Stop() {} +func (t *TestAggregator) Description() string { return "" } +func (t *TestAggregator) SampleConfig() string { return "" } +func (t *TestAggregator) Reset() {} -func (t *TestAggregator) Apply(in telegraf.Metric) { +func (t *TestAggregator) Push(acc telegraf.Accumulator) { + acc.AddFields("TestMetric", + map[string]interface{}{"sum": t.sum}, + map[string]string{}, + ) +} + +func (t *TestAggregator) Add(in telegraf.Metric) { for _, v := range in.Fields() { if vi, ok := v.(int64); ok { - t.sum += vi + atomic.AddInt64(&t.sum, vi) } } } diff --git a/plugins/aggregators/minmax/minmax.go b/plugins/aggregators/minmax/minmax.go index 62d885535..0c88a31b4 100644 --- a/plugins/aggregators/minmax/minmax.go +++ b/plugins/aggregators/minmax/minmax.go @@ -1,28 +1,21 @@ package minmax import ( - "sync" - "time" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/aggregators" ) type MinMax struct { - Period internal.Duration - - // metrics waiting to be processed - metrics chan telegraf.Metric - shutdown chan struct{} - wg sync.WaitGroup - // caches for metric fields, names, and tags fieldCache map[uint64]map[string]minmax nameCache map[uint64]string tagCache map[uint64]map[string]string +} - acc telegraf.Accumulator +func NewMinMax() telegraf.Aggregator { + mm := &MinMax{} + mm.Reset() + return mm } type minmax struct { @@ -43,11 +36,7 @@ func (m *MinMax) Description() string { return "Keep the aggregate min/max of each metric passing through." } -func (m *MinMax) Apply(in telegraf.Metric) { - m.metrics <- in -} - -func (m *MinMax) apply(in telegraf.Metric) { +func (m *MinMax) Add(in telegraf.Metric) { id := in.HashID() if _, ok := m.nameCache[id]; !ok { // hit an uncached metric, create caches for first time: @@ -90,84 +79,23 @@ func (m *MinMax) apply(in telegraf.Metric) { } } -func (m *MinMax) Start(acc telegraf.Accumulator) error { - m.metrics = make(chan telegraf.Metric, 10) - m.shutdown = make(chan struct{}) - m.clearCache() - m.acc = acc - m.wg.Add(1) - if m.Period.Duration > 0 { - go m.periodHandler() - } else { - go m.continuousHandler() +func (m *MinMax) Push(acc telegraf.Accumulator) { + for id, _ := range m.nameCache { + fields := map[string]interface{}{} + for k, v := range m.fieldCache[id] { + fields[k+"_min"] = v.min + fields[k+"_max"] = v.max + } + acc.AddFields(m.nameCache[id], fields, m.tagCache[id]) } - return nil } -func (m *MinMax) Stop() { - close(m.shutdown) - m.wg.Wait() -} - -func (m *MinMax) addfields(id uint64) { - fields := map[string]interface{}{} - for k, v := range m.fieldCache[id] { - fields[k+"_min"] = v.min - fields[k+"_max"] = v.max - } - m.acc.AddFields(m.nameCache[id], fields, m.tagCache[id]) -} - -func (m *MinMax) clearCache() { +func (m *MinMax) Reset() { m.fieldCache = make(map[uint64]map[string]minmax) m.nameCache = make(map[uint64]string) m.tagCache = make(map[uint64]map[string]string) } -// periodHandler only adds the aggregate metrics on the configured Period. -// thus if telegraf's collection interval is 10s, and period is 30s, there -// will only be one aggregate sent every 3 metrics. -func (m *MinMax) periodHandler() { - // TODO make this sleep less of a hack! - time.Sleep(time.Millisecond * 200) - defer m.wg.Done() - ticker := time.NewTicker(m.Period.Duration) - defer ticker.Stop() - for { - select { - case in := <-m.metrics: - m.apply(in) - case <-m.shutdown: - if len(m.metrics) > 0 { - continue - } - return - case <-ticker.C: - for id, _ := range m.nameCache { - m.addfields(id) - } - m.clearCache() - } - } -} - -// continuousHandler sends one metric for every metric that passes through it. -func (m *MinMax) continuousHandler() { - defer m.wg.Done() - for { - select { - case in := <-m.metrics: - m.apply(in) - m.addfields(in.HashID()) - case <-m.shutdown: - if len(m.metrics) > 0 { - continue - } - return - } - } -} - func compare(a, b float64) int { if a < b { return -1 @@ -190,6 +118,6 @@ func convert(in interface{}) (float64, bool) { func init() { aggregators.Add("minmax", func() telegraf.Aggregator { - return &MinMax{} + return NewMinMax() }) } diff --git a/plugins/aggregators/minmax/minmax_test.go b/plugins/aggregators/minmax/minmax_test.go index fb902a99f..97af5749d 100644 --- a/plugins/aggregators/minmax/minmax_test.go +++ b/plugins/aggregators/minmax/minmax_test.go @@ -5,10 +5,7 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/testutil" - - "github.com/stretchr/testify/assert" ) var m1, _ = telegraf.NewMetric("m1", @@ -48,34 +45,22 @@ var m2, _ = telegraf.NewMetric("m1", ) func BenchmarkApply(b *testing.B) { - minmax := MinMax{} - minmax.clearCache() + minmax := NewMinMax() for n := 0; n < b.N; n++ { - minmax.apply(m1) - minmax.apply(m2) + minmax.Add(m1) + minmax.Add(m2) } } -// Test two metrics getting added, when running with a period, and the metrics -// are added in the same period. +// Test two metrics getting added. func TestMinMaxWithPeriod(t *testing.T) { acc := testutil.Accumulator{} - minmax := MinMax{ - Period: internal.Duration{Duration: time.Millisecond * 500}, - } - assert.NoError(t, minmax.Start(&acc)) - defer minmax.Stop() + minmax := NewMinMax() - minmax.Apply(m1) - minmax.Apply(m2) - - for { - if acc.NMetrics() > 0 { - break - } - time.Sleep(time.Millisecond) - } + minmax.Add(m1) + minmax.Add(m2) + minmax.Push(&acc) expectedFields := map[string]interface{}{ "a_max": float64(1), @@ -107,23 +92,14 @@ func TestMinMaxWithPeriod(t *testing.T) { acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) } -// Test two metrics getting added, when running with a period, and the metrics -// are added in two different periods. +// Test two metrics getting added with a push/reset in between (simulates +// getting added in different periods.) func TestMinMaxDifferentPeriods(t *testing.T) { acc := testutil.Accumulator{} - minmax := MinMax{ - Period: internal.Duration{Duration: time.Millisecond * 100}, - } - assert.NoError(t, minmax.Start(&acc)) - defer minmax.Stop() + minmax := NewMinMax() - minmax.Apply(m1) - for { - if acc.NMetrics() > 0 { - break - } - time.Sleep(time.Millisecond) - } + minmax.Add(m1) + minmax.Push(&acc) expectedFields := map[string]interface{}{ "a_max": float64(1), "a_min": float64(1), @@ -152,13 +128,9 @@ func TestMinMaxDifferentPeriods(t *testing.T) { acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) acc.ClearMetrics() - minmax.Apply(m2) - for { - if acc.NMetrics() > 0 { - break - } - time.Sleep(time.Millisecond) - } + minmax.Reset() + minmax.Add(m2) + minmax.Push(&acc) expectedFields = map[string]interface{}{ "a_max": float64(1), "a_min": float64(1), @@ -188,82 +160,3 @@ func TestMinMaxDifferentPeriods(t *testing.T) { } acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) } - -// Test two metrics getting added, when running without a period. -func TestMinMaxWithoutPeriod(t *testing.T) { - acc := testutil.Accumulator{} - minmax := MinMax{} - assert.NoError(t, minmax.Start(&acc)) - defer minmax.Stop() - - minmax.Apply(m1) - for { - if acc.NMetrics() > 0 { - break - } - time.Sleep(time.Millisecond) - } - expectedFields := map[string]interface{}{ - "a_max": float64(1), - "a_min": float64(1), - "b_max": float64(1), - "b_min": float64(1), - "c_max": float64(1), - "c_min": float64(1), - "d_max": float64(1), - "d_min": float64(1), - "e_max": float64(1), - "e_min": float64(1), - "f_max": float64(2), - "f_min": float64(2), - "g_max": float64(2), - "g_min": float64(2), - "h_max": float64(2), - "h_min": float64(2), - "i_max": float64(2), - "i_min": float64(2), - "j_max": float64(3), - "j_min": float64(3), - } - expectedTags := map[string]string{ - "foo": "bar", - } - acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) - - acc.ClearMetrics() - minmax.Apply(m2) - for { - if acc.NMetrics() > 0 { - break - } - time.Sleep(time.Millisecond) - } - expectedFields = map[string]interface{}{ - "a_max": float64(1), - "a_min": float64(1), - "b_max": float64(3), - "b_min": float64(1), - "c_max": float64(3), - "c_min": float64(1), - "d_max": float64(3), - "d_min": float64(1), - "e_max": float64(3), - "e_min": float64(1), - "f_max": float64(2), - "f_min": float64(1), - "g_max": float64(2), - "g_min": float64(1), - "h_max": float64(2), - "h_min": float64(1), - "i_max": float64(2), - "i_min": float64(1), - "j_max": float64(3), - "j_min": float64(1), - "k_max": float64(200), - "k_min": float64(200), - } - expectedTags = map[string]string{ - "foo": "bar", - } - acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) -}