From 601dc99606359a47c1b1519bf4e3be59a6f5f364 Mon Sep 17 00:00:00 2001 From: Piotr Popieluch Date: Fri, 19 Jan 2018 02:37:53 +0100 Subject: [PATCH] Align aggregator period with internal ticker to avoid skipping metrics (#3693) By the time the aggregator.run() was called about 600ms already passed since setting now which was skewing up the aggregation intervals and skipping metrics. --- agent/agent.go | 4 +--- internal/models/running_aggregator.go | 2 +- internal/models/running_aggregator_test.go | 6 +++--- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index af96718cd..8739f941d 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -364,8 +364,6 @@ func (a *Agent) Run(shutdown chan struct{}) error { metricC := make(chan telegraf.Metric, 100) aggC := make(chan telegraf.Metric, 100) - now := time.Now() - // Start all ServicePlugins for _, input := range a.Config.Inputs { input.SetDefaultTags(a.Config.Tags) @@ -406,7 +404,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { acc := NewAccumulator(agg, aggC) acc.SetPrecision(a.Config.Agent.Precision.Duration, a.Config.Agent.Interval.Duration) - agg.Run(acc, now, shutdown) + agg.Run(acc, shutdown) }(aggregator) } diff --git a/internal/models/running_aggregator.go b/internal/models/running_aggregator.go index 91e5334e8..8189a6667 100644 --- a/internal/models/running_aggregator.go +++ b/internal/models/running_aggregator.go @@ -114,7 +114,6 @@ func (r *RunningAggregator) reset() { // for period ticks to tell it when to push and reset the aggregator. func (r *RunningAggregator) Run( acc telegraf.Accumulator, - now time.Time, shutdown chan struct{}, ) { // The start of the period is truncated to the nearest second. @@ -133,6 +132,7 @@ func (r *RunningAggregator) Run( // 2nd interval: 00:10 - 00:20.5 // etc. // + now := time.Now() r.periodStart = now.Truncate(time.Second) truncation := now.Sub(r.periodStart) r.periodEnd = r.periodStart.Add(r.Config.Period) diff --git a/internal/models/running_aggregator_test.go b/internal/models/running_aggregator_test.go index cb56dc4ef..30279f0ee 100644 --- a/internal/models/running_aggregator_test.go +++ b/internal/models/running_aggregator_test.go @@ -24,7 +24,7 @@ func TestAdd(t *testing.T) { }) assert.NoError(t, ra.Config.Filter.Compile()) acc := testutil.Accumulator{} - go ra.Run(&acc, time.Now(), make(chan struct{})) + go ra.Run(&acc, make(chan struct{})) m := ra.MakeMetric( "RITest", @@ -55,7 +55,7 @@ func TestAddMetricsOutsideCurrentPeriod(t *testing.T) { }) assert.NoError(t, ra.Config.Filter.Compile()) acc := testutil.Accumulator{} - go ra.Run(&acc, time.Now(), make(chan struct{})) + go ra.Run(&acc, make(chan struct{})) // metric before current period m := ra.MakeMetric( @@ -113,7 +113,7 @@ func TestAddAndPushOnePeriod(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - ra.Run(&acc, time.Now(), shutdown) + ra.Run(&acc, shutdown) }() m := ra.MakeMetric(