Align aggregator period with internal ticker to avoid skipping metrics (#3693)
By the time the aggregator.run() was called about 600ms already passed since setting now which was skewing up the aggregation intervals and skipping metrics.
This commit is contained in:
parent
0f55d9eba2
commit
601dc99606
|
@ -364,8 +364,6 @@ func (a *Agent) Run(shutdown chan struct{}) error {
|
||||||
metricC := make(chan telegraf.Metric, 100)
|
metricC := make(chan telegraf.Metric, 100)
|
||||||
aggC := make(chan telegraf.Metric, 100)
|
aggC := make(chan telegraf.Metric, 100)
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
|
|
||||||
// Start all ServicePlugins
|
// Start all ServicePlugins
|
||||||
for _, input := range a.Config.Inputs {
|
for _, input := range a.Config.Inputs {
|
||||||
input.SetDefaultTags(a.Config.Tags)
|
input.SetDefaultTags(a.Config.Tags)
|
||||||
|
@ -406,7 +404,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
|
||||||
acc := NewAccumulator(agg, aggC)
|
acc := NewAccumulator(agg, aggC)
|
||||||
acc.SetPrecision(a.Config.Agent.Precision.Duration,
|
acc.SetPrecision(a.Config.Agent.Precision.Duration,
|
||||||
a.Config.Agent.Interval.Duration)
|
a.Config.Agent.Interval.Duration)
|
||||||
agg.Run(acc, now, shutdown)
|
agg.Run(acc, shutdown)
|
||||||
}(aggregator)
|
}(aggregator)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -114,7 +114,6 @@ func (r *RunningAggregator) reset() {
|
||||||
// for period ticks to tell it when to push and reset the aggregator.
|
// for period ticks to tell it when to push and reset the aggregator.
|
||||||
func (r *RunningAggregator) Run(
|
func (r *RunningAggregator) Run(
|
||||||
acc telegraf.Accumulator,
|
acc telegraf.Accumulator,
|
||||||
now time.Time,
|
|
||||||
shutdown chan struct{},
|
shutdown chan struct{},
|
||||||
) {
|
) {
|
||||||
// The start of the period is truncated to the nearest second.
|
// The start of the period is truncated to the nearest second.
|
||||||
|
@ -133,6 +132,7 @@ func (r *RunningAggregator) Run(
|
||||||
// 2nd interval: 00:10 - 00:20.5
|
// 2nd interval: 00:10 - 00:20.5
|
||||||
// etc.
|
// etc.
|
||||||
//
|
//
|
||||||
|
now := time.Now()
|
||||||
r.periodStart = now.Truncate(time.Second)
|
r.periodStart = now.Truncate(time.Second)
|
||||||
truncation := now.Sub(r.periodStart)
|
truncation := now.Sub(r.periodStart)
|
||||||
r.periodEnd = r.periodStart.Add(r.Config.Period)
|
r.periodEnd = r.periodStart.Add(r.Config.Period)
|
||||||
|
|
|
@ -24,7 +24,7 @@ func TestAdd(t *testing.T) {
|
||||||
})
|
})
|
||||||
assert.NoError(t, ra.Config.Filter.Compile())
|
assert.NoError(t, ra.Config.Filter.Compile())
|
||||||
acc := testutil.Accumulator{}
|
acc := testutil.Accumulator{}
|
||||||
go ra.Run(&acc, time.Now(), make(chan struct{}))
|
go ra.Run(&acc, make(chan struct{}))
|
||||||
|
|
||||||
m := ra.MakeMetric(
|
m := ra.MakeMetric(
|
||||||
"RITest",
|
"RITest",
|
||||||
|
@ -55,7 +55,7 @@ func TestAddMetricsOutsideCurrentPeriod(t *testing.T) {
|
||||||
})
|
})
|
||||||
assert.NoError(t, ra.Config.Filter.Compile())
|
assert.NoError(t, ra.Config.Filter.Compile())
|
||||||
acc := testutil.Accumulator{}
|
acc := testutil.Accumulator{}
|
||||||
go ra.Run(&acc, time.Now(), make(chan struct{}))
|
go ra.Run(&acc, make(chan struct{}))
|
||||||
|
|
||||||
// metric before current period
|
// metric before current period
|
||||||
m := ra.MakeMetric(
|
m := ra.MakeMetric(
|
||||||
|
@ -113,7 +113,7 @@ func TestAddAndPushOnePeriod(t *testing.T) {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
ra.Run(&acc, time.Now(), shutdown)
|
ra.Run(&acc, shutdown)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
m := ra.MakeMetric(
|
m := ra.MakeMetric(
|
||||||
|
|
Loading…
Reference in New Issue