Align aggregator period with internal ticker to avoid skipping metrics (#3693)
By the time the aggregator.run() was called about 600ms already passed since setting now which was skewing up the aggregation intervals and skipping metrics.
(cherry picked from commit 601dc99606)
This commit is contained in:
parent
82df5bf2d8
commit
bb3ee1fd39
|
|
@ -364,8 +364,6 @@ func (a *Agent) Run(shutdown chan struct{}) error {
|
||||||
metricC := make(chan telegraf.Metric, 100)
|
metricC := make(chan telegraf.Metric, 100)
|
||||||
aggC := make(chan telegraf.Metric, 100)
|
aggC := make(chan telegraf.Metric, 100)
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
|
|
||||||
// Start all ServicePlugins
|
// Start all ServicePlugins
|
||||||
for _, input := range a.Config.Inputs {
|
for _, input := range a.Config.Inputs {
|
||||||
input.SetDefaultTags(a.Config.Tags)
|
input.SetDefaultTags(a.Config.Tags)
|
||||||
|
|
@ -406,7 +404,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
|
||||||
acc := NewAccumulator(agg, aggC)
|
acc := NewAccumulator(agg, aggC)
|
||||||
acc.SetPrecision(a.Config.Agent.Precision.Duration,
|
acc.SetPrecision(a.Config.Agent.Precision.Duration,
|
||||||
a.Config.Agent.Interval.Duration)
|
a.Config.Agent.Interval.Duration)
|
||||||
agg.Run(acc, now, shutdown)
|
agg.Run(acc, shutdown)
|
||||||
}(aggregator)
|
}(aggregator)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -114,7 +114,6 @@ func (r *RunningAggregator) reset() {
|
||||||
// for period ticks to tell it when to push and reset the aggregator.
|
// for period ticks to tell it when to push and reset the aggregator.
|
||||||
func (r *RunningAggregator) Run(
|
func (r *RunningAggregator) Run(
|
||||||
acc telegraf.Accumulator,
|
acc telegraf.Accumulator,
|
||||||
now time.Time,
|
|
||||||
shutdown chan struct{},
|
shutdown chan struct{},
|
||||||
) {
|
) {
|
||||||
// The start of the period is truncated to the nearest second.
|
// The start of the period is truncated to the nearest second.
|
||||||
|
|
@ -133,6 +132,7 @@ func (r *RunningAggregator) Run(
|
||||||
// 2nd interval: 00:10 - 00:20.5
|
// 2nd interval: 00:10 - 00:20.5
|
||||||
// etc.
|
// etc.
|
||||||
//
|
//
|
||||||
|
now := time.Now()
|
||||||
r.periodStart = now.Truncate(time.Second)
|
r.periodStart = now.Truncate(time.Second)
|
||||||
truncation := now.Sub(r.periodStart)
|
truncation := now.Sub(r.periodStart)
|
||||||
r.periodEnd = r.periodStart.Add(r.Config.Period)
|
r.periodEnd = r.periodStart.Add(r.Config.Period)
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ func TestAdd(t *testing.T) {
|
||||||
})
|
})
|
||||||
assert.NoError(t, ra.Config.Filter.Compile())
|
assert.NoError(t, ra.Config.Filter.Compile())
|
||||||
acc := testutil.Accumulator{}
|
acc := testutil.Accumulator{}
|
||||||
go ra.Run(&acc, time.Now(), make(chan struct{}))
|
go ra.Run(&acc, make(chan struct{}))
|
||||||
|
|
||||||
m := ra.MakeMetric(
|
m := ra.MakeMetric(
|
||||||
"RITest",
|
"RITest",
|
||||||
|
|
@ -55,7 +55,7 @@ func TestAddMetricsOutsideCurrentPeriod(t *testing.T) {
|
||||||
})
|
})
|
||||||
assert.NoError(t, ra.Config.Filter.Compile())
|
assert.NoError(t, ra.Config.Filter.Compile())
|
||||||
acc := testutil.Accumulator{}
|
acc := testutil.Accumulator{}
|
||||||
go ra.Run(&acc, time.Now(), make(chan struct{}))
|
go ra.Run(&acc, make(chan struct{}))
|
||||||
|
|
||||||
// metric before current period
|
// metric before current period
|
||||||
m := ra.MakeMetric(
|
m := ra.MakeMetric(
|
||||||
|
|
@ -113,7 +113,7 @@ func TestAddAndPushOnePeriod(t *testing.T) {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
ra.Run(&acc, time.Now(), shutdown)
|
ra.Run(&acc, shutdown)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
m := ra.MakeMetric(
|
m := ra.MakeMetric(
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue