Align aggregator period with internal ticker to avoid skipping metrics (#3693)

By the time the aggregator.run() was called about 600ms already passed since setting now which was skewing up the aggregation intervals and skipping metrics.
This commit is contained in:
Piotr Popieluch 2018-01-19 02:37:53 +01:00 committed by Daniel Nelson
parent 96a175a54c
commit 3ea7c2d222
3 changed files with 5 additions and 7 deletions

View File

@ -364,8 +364,6 @@ func (a *Agent) Run(shutdown chan struct{}) error {
metricC := make(chan telegraf.Metric, 100) metricC := make(chan telegraf.Metric, 100)
aggC := make(chan telegraf.Metric, 100) aggC := make(chan telegraf.Metric, 100)
now := time.Now()
// Start all ServicePlugins // Start all ServicePlugins
for _, input := range a.Config.Inputs { for _, input := range a.Config.Inputs {
input.SetDefaultTags(a.Config.Tags) input.SetDefaultTags(a.Config.Tags)
@ -406,7 +404,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
acc := NewAccumulator(agg, aggC) acc := NewAccumulator(agg, aggC)
acc.SetPrecision(a.Config.Agent.Precision.Duration, acc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration) a.Config.Agent.Interval.Duration)
agg.Run(acc, now, shutdown) agg.Run(acc, shutdown)
}(aggregator) }(aggregator)
} }

View File

@ -114,7 +114,6 @@ func (r *RunningAggregator) reset() {
// for period ticks to tell it when to push and reset the aggregator. // for period ticks to tell it when to push and reset the aggregator.
func (r *RunningAggregator) Run( func (r *RunningAggregator) Run(
acc telegraf.Accumulator, acc telegraf.Accumulator,
now time.Time,
shutdown chan struct{}, shutdown chan struct{},
) { ) {
// The start of the period is truncated to the nearest second. // The start of the period is truncated to the nearest second.
@ -133,6 +132,7 @@ func (r *RunningAggregator) Run(
// 2nd interval: 00:10 - 00:20.5 // 2nd interval: 00:10 - 00:20.5
// etc. // etc.
// //
now := time.Now()
r.periodStart = now.Truncate(time.Second) r.periodStart = now.Truncate(time.Second)
truncation := now.Sub(r.periodStart) truncation := now.Sub(r.periodStart)
r.periodEnd = r.periodStart.Add(r.Config.Period) r.periodEnd = r.periodStart.Add(r.Config.Period)

View File

@ -24,7 +24,7 @@ func TestAdd(t *testing.T) {
}) })
assert.NoError(t, ra.Config.Filter.Compile()) assert.NoError(t, ra.Config.Filter.Compile())
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
go ra.Run(&acc, time.Now(), make(chan struct{})) go ra.Run(&acc, make(chan struct{}))
m := ra.MakeMetric( m := ra.MakeMetric(
"RITest", "RITest",
@ -55,7 +55,7 @@ func TestAddMetricsOutsideCurrentPeriod(t *testing.T) {
}) })
assert.NoError(t, ra.Config.Filter.Compile()) assert.NoError(t, ra.Config.Filter.Compile())
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
go ra.Run(&acc, time.Now(), make(chan struct{})) go ra.Run(&acc, make(chan struct{}))
// metric before current period // metric before current period
m := ra.MakeMetric( m := ra.MakeMetric(
@ -113,7 +113,7 @@ func TestAddAndPushOnePeriod(t *testing.T) {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
ra.Run(&acc, time.Now(), shutdown) ra.Run(&acc, shutdown)
}() }()
m := ra.MakeMetric( m := ra.MakeMetric(