From a84ce5d5cb25a696fe61304e00b129232600391e Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 10 Oct 2016 13:43:47 +0100 Subject: [PATCH] drop metrics outside of the aggregators period --- agent/agent.go | 22 ++++---- docs/CONFIGURATION.md | 4 +- internal/models/running_aggregator.go | 33 ++++++++++++ internal/models/running_aggregator_test.go | 62 ++++++++++++++++++++-- 4 files changed, 106 insertions(+), 15 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 47d5336e4..1a205e218 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -358,6 +358,17 @@ func (a *Agent) Run(shutdown chan struct{}) error { } }() + wg.Add(len(a.Config.Aggregators)) + for _, aggregator := range a.Config.Aggregators { + go func(agg *models.RunningAggregator) { + defer wg.Done() + acc := NewAccumulator(agg, metricC) + acc.SetPrecision(a.Config.Agent.Precision.Duration, + a.Config.Agent.Interval.Duration) + agg.Run(acc, shutdown) + }(aggregator) + } + wg.Add(len(a.Config.Inputs)) for _, input := range a.Config.Inputs { interval := a.Config.Agent.Interval.Duration @@ -371,17 +382,6 @@ func (a *Agent) Run(shutdown chan struct{}) error { }(input, interval) } - wg.Add(len(a.Config.Aggregators)) - for _, aggregator := range a.Config.Aggregators { - go func(agg *models.RunningAggregator) { - defer wg.Done() - acc := NewAccumulator(agg, metricC) - acc.SetPrecision(a.Config.Agent.Precision.Duration, - a.Config.Agent.Interval.Duration) - agg.Run(acc, shutdown) - }(aggregator) - } - wg.Wait() return nil } diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 5cf43f532..d05fc987e 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -87,7 +87,9 @@ There are no generic configuration options available for all outputs. The following config parameters are available for all aggregators: -* **period**: The period on which to flush & clear each aggregator. +* **period**: The period on which to flush & clear each aggregator. All metrics +that are sent with timestamps outside of this period will be ignored by the +aggregator. * **delay**: The delay before each aggregator is flushed. This is to control how long for aggregators to wait before receiving metrics from input plugins, in the case that aggregators are flushing and inputs are gathering on the diff --git a/internal/models/running_aggregator.go b/internal/models/running_aggregator.go index 9c8403f5f..5c7640ba6 100644 --- a/internal/models/running_aggregator.go +++ b/internal/models/running_aggregator.go @@ -11,6 +11,9 @@ type RunningAggregator struct { Config *AggregatorConfig metrics chan telegraf.Metric + + periodStart time.Time + periodEnd time.Time } func NewRunningAggregator( @@ -105,10 +108,32 @@ func (r *RunningAggregator) reset() { r.a.Reset() } +// Run runs the running aggregator, listens for incoming metrics, and waits +// for period ticks to tell it when to push and reset the aggregator. func (r *RunningAggregator) Run( acc telegraf.Accumulator, shutdown chan struct{}, ) { + // The start of the period is truncated to the nearest second. + // + // Every metric then gets it's timestamp checked and is dropped if it + // is not within: + // + // start < t < end + truncation + delay + // + // So if we start at now = 00:00.2 with a 10s period and 0.3s delay: + // now = 00:00.2 + // start = 00:00 + // truncation = 00:00.2 + // end = 00:10 + // 1st interval: 00:00 - 00:10.5 + // 2nd interval: 00:10 - 00:20.5 + // etc. + // + now := time.Now() + r.periodStart = now.Truncate(time.Second) + truncation := now.Sub(r.periodStart) + r.periodEnd = r.periodStart.Add(r.Config.Period) time.Sleep(r.Config.Delay) periodT := time.NewTicker(r.Config.Period) defer periodT.Stop() @@ -122,8 +147,16 @@ func (r *RunningAggregator) Run( } return case m := <-r.metrics: + if m.Time().Before(r.periodStart) || + m.Time().After(r.periodEnd.Add(truncation).Add(r.Config.Delay)) { + // the metric is outside the current aggregation period, so + // skip it. + continue + } r.add(m) case <-periodT.C: + r.periodStart = r.periodEnd + r.periodEnd = r.periodStart.Add(r.Config.Period) r.push(acc) r.reset() } diff --git a/internal/models/running_aggregator_test.go b/internal/models/running_aggregator_test.go index 495b8ddda..834f7d1e0 100644 --- a/internal/models/running_aggregator_test.go +++ b/internal/models/running_aggregator_test.go @@ -31,11 +31,64 @@ func TestAdd(t *testing.T) { map[string]interface{}{"value": int(101)}, map[string]string{}, telegraf.Untyped, - time.Now(), + time.Now().Add(time.Millisecond*150), ) assert.False(t, ra.Add(m)) for { + time.Sleep(time.Millisecond) + if atomic.LoadInt64(&a.sum) > 0 { + break + } + } + assert.Equal(t, int64(101), atomic.LoadInt64(&a.sum)) +} + +func TestAddMetricsOutsideCurrentPeriod(t *testing.T) { + a := &TestAggregator{} + ra := NewRunningAggregator(a, &AggregatorConfig{ + Name: "TestRunningAggregator", + Filter: Filter{ + NamePass: []string{"*"}, + }, + Period: time.Millisecond * 500, + }) + assert.NoError(t, ra.Config.Filter.Compile()) + acc := testutil.Accumulator{} + go ra.Run(&acc, make(chan struct{})) + + // metric before current period + m := ra.MakeMetric( + "RITest", + map[string]interface{}{"value": int(101)}, + map[string]string{}, + telegraf.Untyped, + time.Now().Add(-time.Hour), + ) + assert.False(t, ra.Add(m)) + + // metric after current period + m = ra.MakeMetric( + "RITest", + map[string]interface{}{"value": int(101)}, + map[string]string{}, + telegraf.Untyped, + time.Now().Add(time.Hour), + ) + assert.False(t, ra.Add(m)) + + // "now" metric + m = ra.MakeMetric( + "RITest", + map[string]interface{}{"value": int(101)}, + map[string]string{}, + telegraf.Untyped, + time.Now().Add(time.Millisecond*50), + ) + assert.False(t, ra.Add(m)) + + for { + time.Sleep(time.Millisecond) if atomic.LoadInt64(&a.sum) > 0 { break } @@ -68,11 +121,12 @@ func TestAddAndPushOnePeriod(t *testing.T) { map[string]interface{}{"value": int(101)}, map[string]string{}, telegraf.Untyped, - time.Now(), + time.Now().Add(time.Millisecond*100), ) assert.False(t, ra.Add(m)) for { + time.Sleep(time.Millisecond) if acc.NMetrics() > 0 { break } @@ -182,7 +236,9 @@ type TestAggregator struct { func (t *TestAggregator) Description() string { return "" } func (t *TestAggregator) SampleConfig() string { return "" } -func (t *TestAggregator) Reset() {} +func (t *TestAggregator) Reset() { + atomic.StoreInt64(&t.sum, 0) +} func (t *TestAggregator) Push(acc telegraf.Accumulator) { acc.AddFields("TestMetric",