Fix aggregator window and shutdown of multiple aggregators (#5644)

This commit is contained in:
Daniel Nelson 2019-03-29 15:40:33 -07:00 committed by GitHub
parent 3045ffbbe3
commit 4e3244c575
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 218 additions and 93 deletions

View File

@ -41,11 +41,10 @@ type Accumulator interface {
// AddMetric adds an metric to the accumulator. // AddMetric adds an metric to the accumulator.
AddMetric(Metric) AddMetric(Metric)
// SetPrecision takes two time.Duration objects. If the first is non-zero, // SetPrecision sets the timestamp rounding precision. All metrics addeds
// it sets that as the precision. Otherwise, it takes the second argument // added to the accumulator will have their timestamp rounded to the
// as the order of time that the metrics should be rounded to, with the // nearest multiple of precision.
// maximum being 1s. SetPrecision(precision time.Duration)
SetPrecision(precision, interval time.Duration)
// Report an error. // Report an error.
AddError(err error) AddError(err error)

View File

@ -114,21 +114,8 @@ func (ac *accumulator) AddError(err error) {
log.Printf("E! [%s]: Error in plugin: %v", ac.maker.Name(), err) log.Printf("E! [%s]: Error in plugin: %v", ac.maker.Name(), err)
} }
func (ac *accumulator) SetPrecision(precision, interval time.Duration) { func (ac *accumulator) SetPrecision(precision time.Duration) {
if precision > 0 { ac.precision = precision
ac.precision = precision
return
}
switch {
case interval >= time.Second:
ac.precision = time.Second
case interval >= time.Millisecond:
ac.precision = time.Millisecond
case interval >= time.Microsecond:
ac.precision = time.Microsecond
default:
ac.precision = time.Nanosecond
}
} }
func (ac *accumulator) getTime(t []time.Time) time.Time { func (ac *accumulator) getTime(t []time.Time) time.Time {

View File

@ -74,7 +74,6 @@ func TestSetPrecision(t *testing.T) {
name string name string
unset bool unset bool
precision time.Duration precision time.Duration
interval time.Duration
timestamp time.Time timestamp time.Time
expected time.Time expected time.Time
}{ }{
@ -86,13 +85,13 @@ func TestSetPrecision(t *testing.T) {
}, },
{ {
name: "second interval", name: "second interval",
interval: time.Second, precision: time.Second,
timestamp: time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC), timestamp: time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC),
expected: time.Date(2006, time.February, 10, 12, 0, 0, 0, time.UTC), expected: time.Date(2006, time.February, 10, 12, 0, 0, 0, time.UTC),
}, },
{ {
name: "microsecond interval", name: "microsecond interval",
interval: time.Microsecond, precision: time.Microsecond,
timestamp: time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC), timestamp: time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC),
expected: time.Date(2006, time.February, 10, 12, 0, 0, 82913000, time.UTC), expected: time.Date(2006, time.February, 10, 12, 0, 0, 82913000, time.UTC),
}, },
@ -109,7 +108,7 @@ func TestSetPrecision(t *testing.T) {
a := NewAccumulator(&TestMetricMaker{}, metrics) a := NewAccumulator(&TestMetricMaker{}, metrics)
if !tt.unset { if !tt.unset {
a.SetPrecision(tt.precision, tt.interval) a.SetPrecision(tt.precision)
} }
a.AddFields("acctest", a.AddFields("acctest",

View File

@ -180,8 +180,7 @@ func (a *Agent) Test(ctx context.Context) error {
} }
acc := NewAccumulator(input, metricC) acc := NewAccumulator(input, metricC)
acc.SetPrecision(a.Config.Agent.Precision.Duration, acc.SetPrecision(a.Precision())
a.Config.Agent.Interval.Duration)
input.SetDefaultTags(a.Config.Tags) input.SetDefaultTags(a.Config.Tags)
// Special instructions for some inputs. cpu, for example, needs to be // Special instructions for some inputs. cpu, for example, needs to be
@ -189,8 +188,7 @@ func (a *Agent) Test(ctx context.Context) error {
switch input.Name() { switch input.Name() {
case "inputs.cpu", "inputs.mongodb", "inputs.procstat": case "inputs.cpu", "inputs.mongodb", "inputs.procstat":
nulAcc := NewAccumulator(input, nulC) nulAcc := NewAccumulator(input, nulC)
nulAcc.SetPrecision(a.Config.Agent.Precision.Duration, nulAcc.SetPrecision(a.Precision())
a.Config.Agent.Interval.Duration)
if err := input.Input.Gather(nulAcc); err != nil { if err := input.Input.Gather(nulAcc); err != nil {
return err return err
} }
@ -222,7 +220,6 @@ func (a *Agent) runInputs(
var wg sync.WaitGroup var wg sync.WaitGroup
for _, input := range a.Config.Inputs { for _, input := range a.Config.Inputs {
interval := a.Config.Agent.Interval.Duration interval := a.Config.Agent.Interval.Duration
precision := a.Config.Agent.Precision.Duration
jitter := a.Config.Agent.CollectionJitter.Duration jitter := a.Config.Agent.CollectionJitter.Duration
// Overwrite agent interval if this plugin has its own. // Overwrite agent interval if this plugin has its own.
@ -231,7 +228,7 @@ func (a *Agent) runInputs(
} }
acc := NewAccumulator(input, dst) acc := NewAccumulator(input, dst)
acc.SetPrecision(precision, interval) acc.SetPrecision(a.Precision())
wg.Add(1) wg.Add(1)
go func(input *models.RunningInput) { go func(input *models.RunningInput) {
@ -339,10 +336,27 @@ func (a *Agent) applyProcessors(m telegraf.Metric) []telegraf.Metric {
return metrics return metrics
} }
// runAggregators triggers the periodic push for Aggregators. func updateWindow(start time.Time, roundInterval bool, period time.Duration) (time.Time, time.Time) {
var until time.Time
if roundInterval {
until = internal.AlignTime(start, period)
if until == start {
until = internal.AlignTime(start.Add(time.Nanosecond), period)
}
} else {
until = start.Add(period)
}
since := until.Add(-period)
return since, until
}
// runAggregators adds metrics to the aggregators and triggers their periodic
// push call.
// //
// When the context is done a final push will occur and then this function // Runs until src is closed and all metrics have been processed. Will call
// will return. // push one final time before returning.
func (a *Agent) runAggregators( func (a *Agent) runAggregators(
startTime time.Time, startTime time.Time,
src <-chan telegraf.Metric, src <-chan telegraf.Metric,
@ -350,6 +364,13 @@ func (a *Agent) runAggregators(
) error { ) error {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
// Before calling Add, initialize the aggregation window. This ensures
// that any metric created after start time will be aggregated.
for _, agg := range a.Config.Aggregators {
since, until := updateWindow(startTime, a.Config.Agent.RoundInterval, agg.Period())
agg.UpdateWindow(since, until)
}
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
go func() { go func() {
@ -371,33 +392,29 @@ func (a *Agent) runAggregators(
cancel() cancel()
}() }()
precision := a.Config.Agent.Precision.Duration
interval := a.Config.Agent.Interval.Duration
aggregations := make(chan telegraf.Metric, 100) aggregations := make(chan telegraf.Metric, 100)
for _, agg := range a.Config.Aggregators { wg.Add(1)
wg.Add(1) go func() {
go func(agg *models.RunningAggregator) { defer wg.Done()
defer func() {
wg.Done()
close(aggregations)
}()
if a.Config.Agent.RoundInterval { var aggWg sync.WaitGroup
// Aggregators are aligned to the agent interval regardless of for _, agg := range a.Config.Aggregators {
// their period. aggWg.Add(1)
err := internal.SleepContext(ctx, internal.AlignDuration(startTime, interval)) go func(agg *models.RunningAggregator) {
if err != nil { defer aggWg.Done()
return
}
}
agg.SetPeriodStart(startTime) acc := NewAccumulator(agg, aggregations)
acc.SetPrecision(a.Precision())
fmt.Println(1)
a.push(ctx, agg, acc)
fmt.Println(2)
}(agg)
}
acc := NewAccumulator(agg, aggregations) aggWg.Wait()
acc.SetPrecision(precision, interval) fmt.Println(3)
a.push(ctx, agg, acc) close(aggregations)
}(agg) }()
}
for metric := range aggregations { for metric := range aggregations {
metrics := a.applyProcessors(metric) metrics := a.applyProcessors(metric)
@ -405,39 +422,42 @@ func (a *Agent) runAggregators(
dst <- metric dst <- metric
} }
} }
fmt.Println(4)
wg.Wait() wg.Wait()
fmt.Println(5)
return nil return nil
} }
// push runs the push for a single aggregator every period. More simple than // push runs the push for a single aggregator every period.
// the output/input version as timeout should be less likely.... not really
// because the output channel can block for now.
func (a *Agent) push( func (a *Agent) push(
ctx context.Context, ctx context.Context,
aggregator *models.RunningAggregator, aggregator *models.RunningAggregator,
acc telegraf.Accumulator, acc telegraf.Accumulator,
) { ) {
ticker := time.NewTicker(aggregator.Period())
defer ticker.Stop()
for { for {
// Ensures that Push will be called for each period, even if it has
// already elapsed before this function is called. This is guaranteed
// because so long as only Push updates the EndPeriod. This method
// also avoids drift by not using a ticker.
until := time.Until(aggregator.EndPeriod())
select { select {
case <-ticker.C: case <-time.After(until):
aggregator.Push(acc)
break break
case <-ctx.Done(): case <-ctx.Done():
aggregator.Push(acc) aggregator.Push(acc)
return return
} }
aggregator.Push(acc)
} }
} }
// runOutputs triggers the periodic write for Outputs. // runOutputs triggers the periodic write for Outputs.
// //
// When the context is done, outputs continue to run until their buffer is
// closed, afterwich they run flush once more. // Runs until src is closed and all metrics have been processed. Will call
// Write one final time before returning.
func (a *Agent) runOutputs( func (a *Agent) runOutputs(
startTime time.Time, startTime time.Time,
src <-chan telegraf.Metric, src <-chan telegraf.Metric,
@ -608,7 +628,7 @@ func (a *Agent) startServiceInputs(
// Gather() accumulator does apply rounding according to the // Gather() accumulator does apply rounding according to the
// precision agent setting. // precision agent setting.
acc := NewAccumulator(input, dst) acc := NewAccumulator(input, dst)
acc.SetPrecision(time.Nanosecond, 0) acc.SetPrecision(time.Nanosecond)
err := si.Start(acc) err := si.Start(acc)
if err != nil { if err != nil {
@ -638,6 +658,27 @@ func (a *Agent) stopServiceInputs() {
} }
} }
// Returns the rounding precision for metrics.
func (a *Agent) Precision() time.Duration {
precision := a.Config.Agent.Precision.Duration
interval := a.Config.Agent.Interval.Duration
if precision > 0 {
return precision
}
switch {
case interval >= time.Second:
return time.Second
case interval >= time.Millisecond:
return time.Millisecond
case interval >= time.Microsecond:
return time.Microsecond
default:
return time.Nanosecond
}
}
// panicRecover displays an error if an input panics. // panicRecover displays an error if an input panics.
func panicRecover(input *models.RunningInput) { func panicRecover(input *models.RunningInput) {
if err := recover(); err != nil { if err := recover(); err != nil {

View File

@ -2,15 +2,13 @@ package agent
import ( import (
"testing" "testing"
"time"
"github.com/influxdata/telegraf/internal/config" "github.com/influxdata/telegraf/internal/config"
// needing to load the plugins
_ "github.com/influxdata/telegraf/plugins/inputs/all" _ "github.com/influxdata/telegraf/plugins/inputs/all"
// needing to load the outputs
_ "github.com/influxdata/telegraf/plugins/outputs/all" _ "github.com/influxdata/telegraf/plugins/outputs/all"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestAgent_OmitHostname(t *testing.T) { func TestAgent_OmitHostname(t *testing.T) {
@ -109,3 +107,62 @@ func TestAgent_LoadOutput(t *testing.T) {
a, _ = NewAgent(c) a, _ = NewAgent(c)
assert.Equal(t, 3, len(a.Config.Outputs)) assert.Equal(t, 3, len(a.Config.Outputs))
} }
func TestWindow(t *testing.T) {
parse := func(s string) time.Time {
tm, err := time.Parse(time.RFC3339, s)
if err != nil {
panic(err)
}
return tm
}
tests := []struct {
name string
start time.Time
roundInterval bool
period time.Duration
since time.Time
until time.Time
}{
{
name: "round with exact alignment",
start: parse("2018-03-27T00:00:00Z"),
roundInterval: true,
period: 30 * time.Second,
since: parse("2018-03-27T00:00:00Z"),
until: parse("2018-03-27T00:00:30Z"),
},
{
name: "round with alignment needed",
start: parse("2018-03-27T00:00:05Z"),
roundInterval: true,
period: 30 * time.Second,
since: parse("2018-03-27T00:00:00Z"),
until: parse("2018-03-27T00:00:30Z"),
},
{
name: "no round with exact alignment",
start: parse("2018-03-27T00:00:00Z"),
roundInterval: false,
period: 30 * time.Second,
since: parse("2018-03-27T00:00:00Z"),
until: parse("2018-03-27T00:00:30Z"),
},
{
name: "no found with alignment needed",
start: parse("2018-03-27T00:00:05Z"),
roundInterval: false,
period: 30 * time.Second,
since: parse("2018-03-27T00:00:05Z"),
until: parse("2018-03-27T00:00:35Z"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
since, until := updateWindow(tt.start, tt.roundInterval, tt.period)
require.Equal(t, tt.since, since, "since")
require.Equal(t, tt.until, until, "until")
})
}
}

View File

@ -288,11 +288,13 @@ func SleepContext(ctx context.Context, duration time.Duration) error {
} }
// AlignDuration returns the duration until next aligned interval. // AlignDuration returns the duration until next aligned interval.
// If the current time is aligned a 0 duration is returned.
func AlignDuration(tm time.Time, interval time.Duration) time.Duration { func AlignDuration(tm time.Time, interval time.Duration) time.Duration {
return AlignTime(tm, interval).Sub(tm) return AlignTime(tm, interval).Sub(tm)
} }
// AlignTime returns the time of the next aligned interval. // AlignTime returns the time of the next aligned interval.
// If the current time is aligned the current time is returned.
func AlignTime(tm time.Time, interval time.Duration) time.Time { func AlignTime(tm time.Time, interval time.Duration) time.Time {
truncated := tm.Truncate(interval) truncated := tm.Truncate(interval)
if truncated == tm { if truncated == tm {

View File

@ -271,6 +271,40 @@ func TestAlignDuration(t *testing.T) {
} }
} }
func TestAlignTime(t *testing.T) {
rfc3339 := func(value string) time.Time {
t, _ := time.Parse(time.RFC3339, value)
return t
}
tests := []struct {
name string
now time.Time
interval time.Duration
expected time.Time
}{
{
name: "aligned",
now: rfc3339("2018-01-01T01:01:00Z"),
interval: 10 * time.Second,
expected: rfc3339("2018-01-01T01:01:00Z"),
},
{
name: "aligned",
now: rfc3339("2018-01-01T01:01:01Z"),
interval: 10 * time.Second,
expected: rfc3339("2018-01-01T01:01:10Z"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual := AlignTime(tt.now, tt.interval)
require.Equal(t, tt.expected, actual)
})
}
}
func TestParseTimestamp(t *testing.T) { func TestParseTimestamp(t *testing.T) {
time, err := ParseTimestamp("2019-02-20 21:50:34.029665", "2006-01-02 15:04:05.000000") time, err := ParseTimestamp("2019-02-20 21:50:34.029665", "2006-01-02 15:04:05.000000")
assert.Nil(t, err) assert.Nil(t, err)

View File

@ -1,6 +1,7 @@
package models package models
import ( import (
"log"
"sync" "sync"
"time" "time"
@ -74,9 +75,14 @@ func (r *RunningAggregator) Period() time.Duration {
return r.Config.Period return r.Config.Period
} }
func (r *RunningAggregator) SetPeriodStart(start time.Time) { func (r *RunningAggregator) EndPeriod() time.Time {
return r.periodEnd
}
func (r *RunningAggregator) UpdateWindow(start, until time.Time) {
r.periodStart = start r.periodStart = start
r.periodEnd = r.periodStart.Add(r.Config.Period).Add(r.Config.Delay) r.periodEnd = until
log.Printf("D! [%s] Updated aggregation range [%s, %s]", r.Name(), start, until)
} }
func (r *RunningAggregator) MakeMetric(metric telegraf.Metric) telegraf.Metric { func (r *RunningAggregator) MakeMetric(metric telegraf.Metric) telegraf.Metric {
@ -97,10 +103,6 @@ func (r *RunningAggregator) MakeMetric(metric telegraf.Metric) telegraf.Metric {
return m return m
} }
func (r *RunningAggregator) metricDropped(metric telegraf.Metric) {
r.MetricsDropped.Incr(1)
}
// Add a metric to the aggregator and return true if the original metric // Add a metric to the aggregator and return true if the original metric
// should be dropped. // should be dropped.
func (r *RunningAggregator) Add(m telegraf.Metric) bool { func (r *RunningAggregator) Add(m telegraf.Metric) bool {
@ -108,22 +110,25 @@ func (r *RunningAggregator) Add(m telegraf.Metric) bool {
return false return false
} }
// Make a copy of the metric but don't retain tracking; it doesn't make // Make a copy of the metric but don't retain tracking. We do not fail a
// sense to fail a metric's delivery due to the aggregation not being // delivery due to the aggregation not being sent because we can't create
// sent because we can't create aggregations of historical data. // aggregations of historical data. Additionally, waiting for the
// aggregation to be pushed would introduce a hefty latency to delivery.
m = metric.FromMetric(m) m = metric.FromMetric(m)
r.Config.Filter.Modify(m) r.Config.Filter.Modify(m)
if len(m.FieldList()) == 0 { if len(m.FieldList()) == 0 {
r.metricDropped(m) r.MetricsFiltered.Incr(1)
return r.Config.DropOriginal return r.Config.DropOriginal
} }
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
if r.periodStart.IsZero() || m.Time().After(r.periodEnd) { if m.Time().Before(r.periodStart) || m.Time().After(r.periodEnd.Add(r.Config.Delay)) {
r.metricDropped(m) log.Printf("D! [%s] metric is outside aggregation window; discarding. %s: m: %s e: %s",
r.Name(), m.Time(), r.periodStart, r.periodEnd)
r.MetricsDropped.Incr(1)
return r.Config.DropOriginal return r.Config.DropOriginal
} }
@ -135,8 +140,10 @@ func (r *RunningAggregator) Push(acc telegraf.Accumulator) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
r.periodStart = r.periodEnd since := r.periodEnd
r.periodEnd = r.periodStart.Add(r.Config.Period).Add(r.Config.Delay) until := r.periodEnd.Add(r.Config.Period)
r.UpdateWindow(since, until)
r.push(acc) r.push(acc)
r.Aggregator.Reset() r.Aggregator.Reset()
} }

View File

@ -23,7 +23,7 @@ func TestAdd(t *testing.T) {
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
now := time.Now() now := time.Now()
ra.SetPeriodStart(now) ra.UpdateWindow(now, now.Add(ra.Config.Period))
m := testutil.MustMetric("RITest", m := testutil.MustMetric("RITest",
map[string]string{}, map[string]string{},
@ -51,7 +51,7 @@ func TestAddMetricsOutsideCurrentPeriod(t *testing.T) {
require.NoError(t, ra.Config.Filter.Compile()) require.NoError(t, ra.Config.Filter.Compile())
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
now := time.Now() now := time.Now()
ra.SetPeriodStart(now) ra.UpdateWindow(now, now.Add(ra.Config.Period))
m := testutil.MustMetric("RITest", m := testutil.MustMetric("RITest",
map[string]string{}, map[string]string{},
@ -86,7 +86,7 @@ func TestAddMetricsOutsideCurrentPeriod(t *testing.T) {
ra.Push(&acc) ra.Push(&acc)
require.Equal(t, 1, len(acc.Metrics)) require.Equal(t, 1, len(acc.Metrics))
require.Equal(t, int64(202), acc.Metrics[0].Fields["sum"]) require.Equal(t, int64(101), acc.Metrics[0].Fields["sum"])
} }
func TestAddAndPushOnePeriod(t *testing.T) { func TestAddAndPushOnePeriod(t *testing.T) {
@ -102,7 +102,7 @@ func TestAddAndPushOnePeriod(t *testing.T) {
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
now := time.Now() now := time.Now()
ra.SetPeriodStart(now) ra.UpdateWindow(now, now.Add(ra.Config.Period))
m := testutil.MustMetric("RITest", m := testutil.MustMetric("RITest",
map[string]string{}, map[string]string{},
@ -129,7 +129,7 @@ func TestAddDropOriginal(t *testing.T) {
require.NoError(t, ra.Config.Filter.Compile()) require.NoError(t, ra.Config.Filter.Compile())
now := time.Now() now := time.Now()
ra.SetPeriodStart(now) ra.UpdateWindow(now, now.Add(ra.Config.Period))
m := testutil.MustMetric("RITest", m := testutil.MustMetric("RITest",
map[string]string{}, map[string]string{},

View File

@ -10,7 +10,6 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -204,7 +203,7 @@ func (a *Accumulator) AddError(err error) {
a.Unlock() a.Unlock()
} }
func (a *Accumulator) SetPrecision(precision, interval time.Duration) { func (a *Accumulator) SetPrecision(precision time.Duration) {
return return
} }