From 4e3244c57513e0599cf6ff53d780ce649f5c31d5 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Fri, 29 Mar 2019 15:40:33 -0700 Subject: [PATCH] Fix aggregator window and shutdown of multiple aggregators (#5644) --- accumulator.go | 9 +- agent/accumulator.go | 17 +-- agent/accumulator_test.go | 7 +- agent/agent.go | 129 ++++++++++++++------- agent/agent_test.go | 65 ++++++++++- internal/internal.go | 2 + internal/internal_test.go | 34 ++++++ internal/models/running_aggregator.go | 35 +++--- internal/models/running_aggregator_test.go | 10 +- testutil/accumulator.go | 3 +- 10 files changed, 218 insertions(+), 93 deletions(-) diff --git a/accumulator.go b/accumulator.go index 825455c4c..1ea5737a8 100644 --- a/accumulator.go +++ b/accumulator.go @@ -41,11 +41,10 @@ type Accumulator interface { // AddMetric adds an metric to the accumulator. AddMetric(Metric) - // SetPrecision takes two time.Duration objects. If the first is non-zero, - // it sets that as the precision. Otherwise, it takes the second argument - // as the order of time that the metrics should be rounded to, with the - // maximum being 1s. - SetPrecision(precision, interval time.Duration) + // SetPrecision sets the timestamp rounding precision. All metrics addeds + // added to the accumulator will have their timestamp rounded to the + // nearest multiple of precision. + SetPrecision(precision time.Duration) // Report an error. AddError(err error) diff --git a/agent/accumulator.go b/agent/accumulator.go index 0533a06e2..9e0bb11ca 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -114,21 +114,8 @@ func (ac *accumulator) AddError(err error) { log.Printf("E! [%s]: Error in plugin: %v", ac.maker.Name(), err) } -func (ac *accumulator) SetPrecision(precision, interval time.Duration) { - if precision > 0 { - ac.precision = precision - return - } - switch { - case interval >= time.Second: - ac.precision = time.Second - case interval >= time.Millisecond: - ac.precision = time.Millisecond - case interval >= time.Microsecond: - ac.precision = time.Microsecond - default: - ac.precision = time.Nanosecond - } +func (ac *accumulator) SetPrecision(precision time.Duration) { + ac.precision = precision } func (ac *accumulator) getTime(t []time.Time) time.Time { diff --git a/agent/accumulator_test.go b/agent/accumulator_test.go index 316ad124b..933821701 100644 --- a/agent/accumulator_test.go +++ b/agent/accumulator_test.go @@ -74,7 +74,6 @@ func TestSetPrecision(t *testing.T) { name string unset bool precision time.Duration - interval time.Duration timestamp time.Time expected time.Time }{ @@ -86,13 +85,13 @@ func TestSetPrecision(t *testing.T) { }, { name: "second interval", - interval: time.Second, + precision: time.Second, timestamp: time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC), expected: time.Date(2006, time.February, 10, 12, 0, 0, 0, time.UTC), }, { name: "microsecond interval", - interval: time.Microsecond, + precision: time.Microsecond, timestamp: time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC), expected: time.Date(2006, time.February, 10, 12, 0, 0, 82913000, time.UTC), }, @@ -109,7 +108,7 @@ func TestSetPrecision(t *testing.T) { a := NewAccumulator(&TestMetricMaker{}, metrics) if !tt.unset { - a.SetPrecision(tt.precision, tt.interval) + a.SetPrecision(tt.precision) } a.AddFields("acctest", diff --git a/agent/agent.go b/agent/agent.go index d83748811..ae2de85bf 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -180,8 +180,7 @@ func (a *Agent) Test(ctx context.Context) error { } acc := NewAccumulator(input, metricC) - acc.SetPrecision(a.Config.Agent.Precision.Duration, - a.Config.Agent.Interval.Duration) + acc.SetPrecision(a.Precision()) input.SetDefaultTags(a.Config.Tags) // Special instructions for some inputs. cpu, for example, needs to be @@ -189,8 +188,7 @@ func (a *Agent) Test(ctx context.Context) error { switch input.Name() { case "inputs.cpu", "inputs.mongodb", "inputs.procstat": nulAcc := NewAccumulator(input, nulC) - nulAcc.SetPrecision(a.Config.Agent.Precision.Duration, - a.Config.Agent.Interval.Duration) + nulAcc.SetPrecision(a.Precision()) if err := input.Input.Gather(nulAcc); err != nil { return err } @@ -222,7 +220,6 @@ func (a *Agent) runInputs( var wg sync.WaitGroup for _, input := range a.Config.Inputs { interval := a.Config.Agent.Interval.Duration - precision := a.Config.Agent.Precision.Duration jitter := a.Config.Agent.CollectionJitter.Duration // Overwrite agent interval if this plugin has its own. @@ -231,7 +228,7 @@ func (a *Agent) runInputs( } acc := NewAccumulator(input, dst) - acc.SetPrecision(precision, interval) + acc.SetPrecision(a.Precision()) wg.Add(1) go func(input *models.RunningInput) { @@ -339,10 +336,27 @@ func (a *Agent) applyProcessors(m telegraf.Metric) []telegraf.Metric { return metrics } -// runAggregators triggers the periodic push for Aggregators. +func updateWindow(start time.Time, roundInterval bool, period time.Duration) (time.Time, time.Time) { + var until time.Time + if roundInterval { + until = internal.AlignTime(start, period) + if until == start { + until = internal.AlignTime(start.Add(time.Nanosecond), period) + } + } else { + until = start.Add(period) + } + + since := until.Add(-period) + + return since, until +} + +// runAggregators adds metrics to the aggregators and triggers their periodic +// push call. // -// When the context is done a final push will occur and then this function -// will return. +// Runs until src is closed and all metrics have been processed. Will call +// push one final time before returning. func (a *Agent) runAggregators( startTime time.Time, src <-chan telegraf.Metric, @@ -350,6 +364,13 @@ func (a *Agent) runAggregators( ) error { ctx, cancel := context.WithCancel(context.Background()) + // Before calling Add, initialize the aggregation window. This ensures + // that any metric created after start time will be aggregated. + for _, agg := range a.Config.Aggregators { + since, until := updateWindow(startTime, a.Config.Agent.RoundInterval, agg.Period()) + agg.UpdateWindow(since, until) + } + var wg sync.WaitGroup wg.Add(1) go func() { @@ -371,33 +392,29 @@ func (a *Agent) runAggregators( cancel() }() - precision := a.Config.Agent.Precision.Duration - interval := a.Config.Agent.Interval.Duration aggregations := make(chan telegraf.Metric, 100) - for _, agg := range a.Config.Aggregators { - wg.Add(1) - go func(agg *models.RunningAggregator) { - defer func() { - wg.Done() - close(aggregations) - }() + wg.Add(1) + go func() { + defer wg.Done() - if a.Config.Agent.RoundInterval { - // Aggregators are aligned to the agent interval regardless of - // their period. - err := internal.SleepContext(ctx, internal.AlignDuration(startTime, interval)) - if err != nil { - return - } - } + var aggWg sync.WaitGroup + for _, agg := range a.Config.Aggregators { + aggWg.Add(1) + go func(agg *models.RunningAggregator) { + defer aggWg.Done() - agg.SetPeriodStart(startTime) + acc := NewAccumulator(agg, aggregations) + acc.SetPrecision(a.Precision()) + fmt.Println(1) + a.push(ctx, agg, acc) + fmt.Println(2) + }(agg) + } - acc := NewAccumulator(agg, aggregations) - acc.SetPrecision(precision, interval) - a.push(ctx, agg, acc) - }(agg) - } + aggWg.Wait() + fmt.Println(3) + close(aggregations) + }() for metric := range aggregations { metrics := a.applyProcessors(metric) @@ -405,39 +422,42 @@ func (a *Agent) runAggregators( dst <- metric } } + fmt.Println(4) wg.Wait() + fmt.Println(5) return nil } -// push runs the push for a single aggregator every period. More simple than -// the output/input version as timeout should be less likely.... not really -// because the output channel can block for now. +// push runs the push for a single aggregator every period. func (a *Agent) push( ctx context.Context, aggregator *models.RunningAggregator, acc telegraf.Accumulator, ) { - ticker := time.NewTicker(aggregator.Period()) - defer ticker.Stop() - for { + // Ensures that Push will be called for each period, even if it has + // already elapsed before this function is called. This is guaranteed + // because so long as only Push updates the EndPeriod. This method + // also avoids drift by not using a ticker. + until := time.Until(aggregator.EndPeriod()) + select { - case <-ticker.C: + case <-time.After(until): + aggregator.Push(acc) break case <-ctx.Done(): aggregator.Push(acc) return } - - aggregator.Push(acc) } } // runOutputs triggers the periodic write for Outputs. // -// When the context is done, outputs continue to run until their buffer is -// closed, afterwich they run flush once more. + +// Runs until src is closed and all metrics have been processed. Will call +// Write one final time before returning. func (a *Agent) runOutputs( startTime time.Time, src <-chan telegraf.Metric, @@ -608,7 +628,7 @@ func (a *Agent) startServiceInputs( // Gather() accumulator does apply rounding according to the // precision agent setting. acc := NewAccumulator(input, dst) - acc.SetPrecision(time.Nanosecond, 0) + acc.SetPrecision(time.Nanosecond) err := si.Start(acc) if err != nil { @@ -638,6 +658,27 @@ func (a *Agent) stopServiceInputs() { } } +// Returns the rounding precision for metrics. +func (a *Agent) Precision() time.Duration { + precision := a.Config.Agent.Precision.Duration + interval := a.Config.Agent.Interval.Duration + + if precision > 0 { + return precision + } + + switch { + case interval >= time.Second: + return time.Second + case interval >= time.Millisecond: + return time.Millisecond + case interval >= time.Microsecond: + return time.Microsecond + default: + return time.Nanosecond + } +} + // panicRecover displays an error if an input panics. func panicRecover(input *models.RunningInput) { if err := recover(); err != nil { diff --git a/agent/agent_test.go b/agent/agent_test.go index a5920ce1c..c822a236b 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -2,15 +2,13 @@ package agent import ( "testing" + "time" "github.com/influxdata/telegraf/internal/config" - - // needing to load the plugins _ "github.com/influxdata/telegraf/plugins/inputs/all" - // needing to load the outputs _ "github.com/influxdata/telegraf/plugins/outputs/all" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestAgent_OmitHostname(t *testing.T) { @@ -109,3 +107,62 @@ func TestAgent_LoadOutput(t *testing.T) { a, _ = NewAgent(c) assert.Equal(t, 3, len(a.Config.Outputs)) } + +func TestWindow(t *testing.T) { + parse := func(s string) time.Time { + tm, err := time.Parse(time.RFC3339, s) + if err != nil { + panic(err) + } + return tm + } + + tests := []struct { + name string + start time.Time + roundInterval bool + period time.Duration + since time.Time + until time.Time + }{ + { + name: "round with exact alignment", + start: parse("2018-03-27T00:00:00Z"), + roundInterval: true, + period: 30 * time.Second, + since: parse("2018-03-27T00:00:00Z"), + until: parse("2018-03-27T00:00:30Z"), + }, + { + name: "round with alignment needed", + start: parse("2018-03-27T00:00:05Z"), + roundInterval: true, + period: 30 * time.Second, + since: parse("2018-03-27T00:00:00Z"), + until: parse("2018-03-27T00:00:30Z"), + }, + { + name: "no round with exact alignment", + start: parse("2018-03-27T00:00:00Z"), + roundInterval: false, + period: 30 * time.Second, + since: parse("2018-03-27T00:00:00Z"), + until: parse("2018-03-27T00:00:30Z"), + }, + { + name: "no found with alignment needed", + start: parse("2018-03-27T00:00:05Z"), + roundInterval: false, + period: 30 * time.Second, + since: parse("2018-03-27T00:00:05Z"), + until: parse("2018-03-27T00:00:35Z"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + since, until := updateWindow(tt.start, tt.roundInterval, tt.period) + require.Equal(t, tt.since, since, "since") + require.Equal(t, tt.until, until, "until") + }) + } +} diff --git a/internal/internal.go b/internal/internal.go index b373c9c35..133b19e9b 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -288,11 +288,13 @@ func SleepContext(ctx context.Context, duration time.Duration) error { } // AlignDuration returns the duration until next aligned interval. +// If the current time is aligned a 0 duration is returned. func AlignDuration(tm time.Time, interval time.Duration) time.Duration { return AlignTime(tm, interval).Sub(tm) } // AlignTime returns the time of the next aligned interval. +// If the current time is aligned the current time is returned. func AlignTime(tm time.Time, interval time.Duration) time.Time { truncated := tm.Truncate(interval) if truncated == tm { diff --git a/internal/internal_test.go b/internal/internal_test.go index 681e1f808..da2fe01c5 100644 --- a/internal/internal_test.go +++ b/internal/internal_test.go @@ -271,6 +271,40 @@ func TestAlignDuration(t *testing.T) { } } +func TestAlignTime(t *testing.T) { + rfc3339 := func(value string) time.Time { + t, _ := time.Parse(time.RFC3339, value) + return t + } + + tests := []struct { + name string + now time.Time + interval time.Duration + expected time.Time + }{ + { + name: "aligned", + now: rfc3339("2018-01-01T01:01:00Z"), + interval: 10 * time.Second, + expected: rfc3339("2018-01-01T01:01:00Z"), + }, + { + name: "aligned", + now: rfc3339("2018-01-01T01:01:01Z"), + interval: 10 * time.Second, + expected: rfc3339("2018-01-01T01:01:10Z"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actual := AlignTime(tt.now, tt.interval) + require.Equal(t, tt.expected, actual) + }) + } +} + func TestParseTimestamp(t *testing.T) { time, err := ParseTimestamp("2019-02-20 21:50:34.029665", "2006-01-02 15:04:05.000000") assert.Nil(t, err) diff --git a/internal/models/running_aggregator.go b/internal/models/running_aggregator.go index f54b5266e..8a2cd576a 100644 --- a/internal/models/running_aggregator.go +++ b/internal/models/running_aggregator.go @@ -1,6 +1,7 @@ package models import ( + "log" "sync" "time" @@ -74,9 +75,14 @@ func (r *RunningAggregator) Period() time.Duration { return r.Config.Period } -func (r *RunningAggregator) SetPeriodStart(start time.Time) { +func (r *RunningAggregator) EndPeriod() time.Time { + return r.periodEnd +} + +func (r *RunningAggregator) UpdateWindow(start, until time.Time) { r.periodStart = start - r.periodEnd = r.periodStart.Add(r.Config.Period).Add(r.Config.Delay) + r.periodEnd = until + log.Printf("D! [%s] Updated aggregation range [%s, %s]", r.Name(), start, until) } func (r *RunningAggregator) MakeMetric(metric telegraf.Metric) telegraf.Metric { @@ -97,10 +103,6 @@ func (r *RunningAggregator) MakeMetric(metric telegraf.Metric) telegraf.Metric { return m } -func (r *RunningAggregator) metricDropped(metric telegraf.Metric) { - r.MetricsDropped.Incr(1) -} - // Add a metric to the aggregator and return true if the original metric // should be dropped. func (r *RunningAggregator) Add(m telegraf.Metric) bool { @@ -108,22 +110,25 @@ func (r *RunningAggregator) Add(m telegraf.Metric) bool { return false } - // Make a copy of the metric but don't retain tracking; it doesn't make - // sense to fail a metric's delivery due to the aggregation not being - // sent because we can't create aggregations of historical data. + // Make a copy of the metric but don't retain tracking. We do not fail a + // delivery due to the aggregation not being sent because we can't create + // aggregations of historical data. Additionally, waiting for the + // aggregation to be pushed would introduce a hefty latency to delivery. m = metric.FromMetric(m) r.Config.Filter.Modify(m) if len(m.FieldList()) == 0 { - r.metricDropped(m) + r.MetricsFiltered.Incr(1) return r.Config.DropOriginal } r.Lock() defer r.Unlock() - if r.periodStart.IsZero() || m.Time().After(r.periodEnd) { - r.metricDropped(m) + if m.Time().Before(r.periodStart) || m.Time().After(r.periodEnd.Add(r.Config.Delay)) { + log.Printf("D! [%s] metric is outside aggregation window; discarding. %s: m: %s e: %s", + r.Name(), m.Time(), r.periodStart, r.periodEnd) + r.MetricsDropped.Incr(1) return r.Config.DropOriginal } @@ -135,8 +140,10 @@ func (r *RunningAggregator) Push(acc telegraf.Accumulator) { r.Lock() defer r.Unlock() - r.periodStart = r.periodEnd - r.periodEnd = r.periodStart.Add(r.Config.Period).Add(r.Config.Delay) + since := r.periodEnd + until := r.periodEnd.Add(r.Config.Period) + r.UpdateWindow(since, until) + r.push(acc) r.Aggregator.Reset() } diff --git a/internal/models/running_aggregator_test.go b/internal/models/running_aggregator_test.go index 76c7e4e5d..19476eecf 100644 --- a/internal/models/running_aggregator_test.go +++ b/internal/models/running_aggregator_test.go @@ -23,7 +23,7 @@ func TestAdd(t *testing.T) { acc := testutil.Accumulator{} now := time.Now() - ra.SetPeriodStart(now) + ra.UpdateWindow(now, now.Add(ra.Config.Period)) m := testutil.MustMetric("RITest", map[string]string{}, @@ -51,7 +51,7 @@ func TestAddMetricsOutsideCurrentPeriod(t *testing.T) { require.NoError(t, ra.Config.Filter.Compile()) acc := testutil.Accumulator{} now := time.Now() - ra.SetPeriodStart(now) + ra.UpdateWindow(now, now.Add(ra.Config.Period)) m := testutil.MustMetric("RITest", map[string]string{}, @@ -86,7 +86,7 @@ func TestAddMetricsOutsideCurrentPeriod(t *testing.T) { ra.Push(&acc) require.Equal(t, 1, len(acc.Metrics)) - require.Equal(t, int64(202), acc.Metrics[0].Fields["sum"]) + require.Equal(t, int64(101), acc.Metrics[0].Fields["sum"]) } func TestAddAndPushOnePeriod(t *testing.T) { @@ -102,7 +102,7 @@ func TestAddAndPushOnePeriod(t *testing.T) { acc := testutil.Accumulator{} now := time.Now() - ra.SetPeriodStart(now) + ra.UpdateWindow(now, now.Add(ra.Config.Period)) m := testutil.MustMetric("RITest", map[string]string{}, @@ -129,7 +129,7 @@ func TestAddDropOriginal(t *testing.T) { require.NoError(t, ra.Config.Filter.Compile()) now := time.Now() - ra.SetPeriodStart(now) + ra.UpdateWindow(now, now.Add(ra.Config.Period)) m := testutil.MustMetric("RITest", map[string]string{}, diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 3fe291699..a7b9fe8f6 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -10,7 +10,6 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/stretchr/testify/assert" ) @@ -204,7 +203,7 @@ func (a *Accumulator) AddError(err error) { a.Unlock() } -func (a *Accumulator) SetPrecision(precision, interval time.Duration) { +func (a *Accumulator) SetPrecision(precision time.Duration) { return }