diff --git a/agent/agent.go b/agent/agent.go index b68c55d13..9ac51471a 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -265,57 +265,45 @@ func (a *Agent) runInputs( interval = input.Config.Interval } + var ticker Ticker + if a.Config.Agent.RoundInterval { + ticker = NewAlignedTicker(startTime, interval, jitter) + } else { + ticker = NewUnalignedTicker(interval, jitter) + } + defer ticker.Stop() + acc := NewAccumulator(input, dst) acc.SetPrecision(a.Precision()) wg.Add(1) go func(input *models.RunningInput) { defer wg.Done() - - if a.Config.Agent.RoundInterval { - err := internal.SleepContext( - ctx, internal.AlignDuration(startTime, interval)) - if err != nil { - return - } - } - - a.gatherOnInterval(ctx, acc, input, interval, jitter) + a.gatherLoop(ctx, acc, input, ticker) }(input) } - wg.Wait() + wg.Wait() return nil } // gather runs an input's gather function periodically until the context is // done. -func (a *Agent) gatherOnInterval( +func (a *Agent) gatherLoop( ctx context.Context, acc telegraf.Accumulator, input *models.RunningInput, - interval time.Duration, - jitter time.Duration, + ticker Ticker, ) { defer panicRecover(input) - ticker := time.NewTicker(interval) - defer ticker.Stop() - for { - err := internal.SleepContext(ctx, internal.RandomDuration(jitter)) - if err != nil { - return - } - - err = a.gatherOnce(acc, input, interval) - if err != nil { - acc.AddError(err) - } - select { - case <-ticker.C: - continue + case <-ticker.Elapsed(): + err := a.gatherOnce(acc, input, ticker) + if err != nil { + acc.AddError(err) + } case <-ctx.Done(): return } @@ -327,11 +315,8 @@ func (a *Agent) gatherOnInterval( func (a *Agent) gatherOnce( acc telegraf.Accumulator, input *models.RunningInput, - timeout time.Duration, + ticker Ticker, ) error { - ticker := time.NewTicker(timeout) - defer ticker.Stop() - done := make(chan error) go func() { done <- input.Gather(acc) @@ -341,7 +326,7 @@ func (a *Agent) gatherOnce( select { case err := <-done: return err - case <-ticker.C: + case <-ticker.Elapsed(): log.Printf("W! [agent] [%s] did not complete within its interval", input.LogName()) } @@ -514,10 +499,13 @@ func (a *Agent) runOutputs( jitter = *output.Config.FlushJitter } + ticker := NewRollingTicker(interval, jitter) + defer ticker.Stop() + wg.Add(1) go func(output *models.RunningOutput) { defer wg.Done() - a.flushLoop(ctx, startTime, output, interval, jitter) + a.flushLoop(ctx, output, ticker) }(output) } @@ -542,10 +530,8 @@ func (a *Agent) runOutputs( // done. func (a *Agent) flushLoop( ctx context.Context, - startTime time.Time, output *models.RunningOutput, - interval time.Duration, - jitter time.Duration, + ticker Ticker, ) { logError := func(err error) { if err != nil { @@ -558,44 +544,30 @@ func (a *Agent) flushLoop( watchForFlushSignal(flushRequested) defer stopListeningForFlushSignal(flushRequested) - // align to round interval - if a.Config.Agent.RoundInterval { - err := internal.SleepContext( - ctx, internal.AlignDuration(startTime, interval)) - if err != nil { - return - } - } - - // since we are watching two channels we need a ticker with the jitter - // integrated. - ticker := NewTicker(interval, jitter) - defer ticker.Stop() - for { // Favor shutdown over other methods. select { case <-ctx.Done(): - logError(a.flushOnce(output, interval, output.Write)) + logError(a.flushOnce(output, ticker, output.Write)) return default: } select { case <-ctx.Done(): - logError(a.flushOnce(output, interval, output.Write)) + logError(a.flushOnce(output, ticker, output.Write)) return - case <-ticker.C: - logError(a.flushOnce(output, interval, output.Write)) + case <-ticker.Elapsed(): + logError(a.flushOnce(output, ticker, output.Write)) case <-flushRequested: - logError(a.flushOnce(output, interval, output.Write)) + logError(a.flushOnce(output, ticker, output.Write)) case <-output.BatchReady: // Favor the ticker over batch ready select { - case <-ticker.C: - logError(a.flushOnce(output, interval, output.Write)) + case <-ticker.Elapsed(): + logError(a.flushOnce(output, ticker, output.Write)) default: - logError(a.flushOnce(output, interval, output.WriteBatch)) + logError(a.flushOnce(output, ticker, output.WriteBatch)) } } } @@ -605,12 +577,9 @@ func (a *Agent) flushLoop( // interval it fails to complete before. func (a *Agent) flushOnce( output *models.RunningOutput, - timeout time.Duration, + ticker Ticker, writeFunc func() error, ) error { - ticker := time.NewTicker(timeout) - defer ticker.Stop() - done := make(chan error) go func() { done <- writeFunc() @@ -621,7 +590,7 @@ func (a *Agent) flushOnce( case err := <-done: output.LogBufferStatus() return err - case <-ticker.C: + case <-ticker.Elapsed(): log.Printf("W! [agent] [%q] did not complete within its flush interval", output.LogName()) output.LogBufferStatus() diff --git a/agent/tick.go b/agent/tick.go index 64dbff50b..93e3a3d76 100644 --- a/agent/tick.go +++ b/agent/tick.go @@ -5,53 +5,264 @@ import ( "sync" "time" + "github.com/benbjohnson/clock" "github.com/influxdata/telegraf/internal" ) -type Ticker struct { - C chan time.Time - ticker *time.Ticker - jitter time.Duration - wg sync.WaitGroup - cancelFunc context.CancelFunc +type empty struct{} + +type Ticker interface { + Elapsed() <-chan time.Time + Stop() } -func NewTicker( - interval time.Duration, - jitter time.Duration, -) *Ticker { - ctx, cancel := context.WithCancel(context.Background()) +// AlignedTicker delivers ticks at aligned times plus an optional jitter. Each +// tick is realigned to avoid drift and handle changes to the system clock. +// +// The ticks may have an jitter duration applied to them as an random offset to +// the interval. However the overall pace of is that of the interval, so on +// average you will have one collection each interval. +// +// The first tick is emitted at the next alignment. +// +// Ticks are dropped for slow consumers. +// +// The implementation currently does not recalculate until the next tick with +// no maximum sleep, when using large intervals alignment is not corrected +// until the next tick. +type AlignedTicker struct { + interval time.Duration + jitter time.Duration + ch chan time.Time + cancel context.CancelFunc + wg sync.WaitGroup +} - t := &Ticker{ - C: make(chan time.Time, 1), - ticker: time.NewTicker(interval), - jitter: jitter, - cancelFunc: cancel, +func NewAlignedTicker(now time.Time, interval, jitter time.Duration) *AlignedTicker { + return newAlignedTicker(now, interval, jitter, clock.New()) +} + +func newAlignedTicker(now time.Time, interval, jitter time.Duration, clock clock.Clock) *AlignedTicker { + ctx, cancel := context.WithCancel(context.Background()) + t := &AlignedTicker{ + interval: interval, + jitter: jitter, + ch: make(chan time.Time, 1), + cancel: cancel, } + d := t.next(now) + timer := clock.Timer(d) + t.wg.Add(1) - go t.relayTime(ctx) + go func() { + defer t.wg.Done() + t.run(ctx, timer) + }() return t } -func (t *Ticker) Stop() { - t.cancelFunc() - t.wg.Wait() +func (t *AlignedTicker) next(now time.Time) time.Duration { + next := internal.AlignTime(now, t.interval) + d := next.Sub(now) + if d == 0 { + d = t.interval + } + d += internal.RandomDuration(t.jitter) + return d } -func (t *Ticker) relayTime(ctx context.Context) { - defer t.wg.Done() +func (t *AlignedTicker) run(ctx context.Context, timer *clock.Timer) { for { select { - case tm := <-t.ticker.C: - internal.SleepContext(ctx, internal.RandomDuration(t.jitter)) + case <-ctx.Done(): + timer.Stop() + return + case now := <-timer.C: select { - case t.C <- tm: + case t.ch <- now: default: } - case <-ctx.Done(): - return + + d := t.next(now) + timer.Reset(d) } } } + +func (t *AlignedTicker) Elapsed() <-chan time.Time { + return t.ch +} + +func (t *AlignedTicker) Stop() { + t.cancel() + t.wg.Wait() +} + +// UnalignedTicker delivers ticks at regular but unaligned intervals. No +// effort is made to avoid drift. +// +// The ticks may have an jitter duration applied to them as an random offset to +// the interval. However the overall pace of is that of the interval, so on +// average you will have one collection each interval. +// +// The first tick is emitted immediately. +// +// Ticks are dropped for slow consumers. +type UnalignedTicker struct { + interval time.Duration + jitter time.Duration + ch chan time.Time + cancel context.CancelFunc + wg sync.WaitGroup +} + +func NewUnalignedTicker(interval, jitter time.Duration) *UnalignedTicker { + return newUnalignedTicker(interval, jitter, clock.New()) +} + +func newUnalignedTicker(interval, jitter time.Duration, clock clock.Clock) *UnalignedTicker { + ctx, cancel := context.WithCancel(context.Background()) + t := &UnalignedTicker{ + interval: interval, + jitter: jitter, + ch: make(chan time.Time, 1), + cancel: cancel, + } + + ticker := clock.Ticker(t.interval) + t.ch <- clock.Now() + + t.wg.Add(1) + go func() { + defer t.wg.Done() + t.run(ctx, ticker, clock) + }() + + return t +} + +func sleep(ctx context.Context, duration time.Duration, clock clock.Clock) error { + if duration == 0 { + return nil + } + + t := clock.Timer(duration) + select { + case <-t.C: + return nil + case <-ctx.Done(): + t.Stop() + return ctx.Err() + } +} + +func (t *UnalignedTicker) run(ctx context.Context, ticker *clock.Ticker, clock clock.Clock) { + for { + select { + case <-ctx.Done(): + ticker.Stop() + return + case <-ticker.C: + jitter := internal.RandomDuration(t.jitter) + err := sleep(ctx, jitter, clock) + if err != nil { + ticker.Stop() + return + } + select { + case t.ch <- clock.Now(): + default: + } + } + } +} + +func (t *UnalignedTicker) InjectTick() { + t.ch <- time.Now() +} + +func (t *UnalignedTicker) Elapsed() <-chan time.Time { + return t.ch +} + +func (t *UnalignedTicker) Stop() { + t.cancel() + t.wg.Wait() +} + +// RollingTicker delivers ticks at regular but unaligned intervals. +// +// Because the next interval is scheduled based on the interval + jitter, you +// are guaranteed at least interval seconds without missing a tick and ticks +// will be evenly scheduled over time. +// +// On average you will have one collection each interval + (jitter/2). +// +// The first tick is emitted after interval+jitter seconds. +// +// Ticks are dropped for slow consumers. +type RollingTicker struct { + interval time.Duration + jitter time.Duration + ch chan time.Time + cancel context.CancelFunc + wg sync.WaitGroup +} + +func NewRollingTicker(interval, jitter time.Duration) *RollingTicker { + return newRollingTicker(interval, jitter, clock.New()) +} + +func newRollingTicker(interval, jitter time.Duration, clock clock.Clock) *RollingTicker { + ctx, cancel := context.WithCancel(context.Background()) + t := &RollingTicker{ + interval: interval, + jitter: jitter, + ch: make(chan time.Time, 1), + cancel: cancel, + } + + d := t.next() + timer := clock.Timer(d) + + t.wg.Add(1) + go func() { + defer t.wg.Done() + t.run(ctx, timer) + }() + + return t +} + +func (t *RollingTicker) next() time.Duration { + return t.interval + internal.RandomDuration(t.jitter) +} + +func (t *RollingTicker) run(ctx context.Context, timer *clock.Timer) { + for { + select { + case <-ctx.Done(): + timer.Stop() + return + case now := <-timer.C: + select { + case t.ch <- now: + default: + } + + d := t.next() + timer.Reset(d) + } + } +} + +func (t *RollingTicker) Elapsed() <-chan time.Time { + return t.ch +} + +func (t *RollingTicker) Stop() { + t.cancel() + t.wg.Wait() +} diff --git a/agent/tick_test.go b/agent/tick_test.go new file mode 100644 index 000000000..6e9755ceb --- /dev/null +++ b/agent/tick_test.go @@ -0,0 +1,251 @@ +package agent + +import ( + "fmt" + "strings" + "testing" + "time" + + "github.com/benbjohnson/clock" + "github.com/stretchr/testify/require" +) + +var format = "2006-01-02T15:04:05.999Z07:00" + +func TestAlignedTicker(t *testing.T) { + interval := 10 * time.Second + jitter := 0 * time.Second + + clock := clock.NewMock() + since := clock.Now() + until := since.Add(60 * time.Second) + + ticker := newAlignedTicker(since, interval, jitter, clock) + + expected := []time.Time{ + time.Unix(10, 0).UTC(), + time.Unix(20, 0).UTC(), + time.Unix(30, 0).UTC(), + time.Unix(40, 0).UTC(), + time.Unix(50, 0).UTC(), + time.Unix(60, 0).UTC(), + } + + actual := []time.Time{} + for !clock.Now().After(until) { + select { + case tm := <-ticker.Elapsed(): + actual = append(actual, tm.UTC()) + default: + } + clock.Add(10 * time.Second) + } + + require.Equal(t, expected, actual) +} + +func TestAlignedTickerJitter(t *testing.T) { + interval := 10 * time.Second + jitter := 5 * time.Second + + clock := clock.NewMock() + since := clock.Now() + until := since.Add(60 * time.Second) + + ticker := newAlignedTicker(since, interval, jitter, clock) + + last := since + for !clock.Now().After(until) { + select { + case tm := <-ticker.Elapsed(): + require.True(t, tm.Sub(last) <= 15*time.Second) + require.True(t, tm.Sub(last) >= 5*time.Second) + last = last.Add(interval) + default: + } + clock.Add(5 * time.Second) + } +} + +func TestAlignedTickerMissedTick(t *testing.T) { + interval := 10 * time.Second + jitter := 0 * time.Second + + clock := clock.NewMock() + since := clock.Now() + + ticker := newAlignedTicker(since, interval, jitter, clock) + + clock.Add(25 * time.Second) + tm := <-ticker.Elapsed() + require.Equal(t, time.Unix(10, 0).UTC(), tm.UTC()) + clock.Add(5 * time.Second) + tm = <-ticker.Elapsed() + require.Equal(t, time.Unix(30, 0).UTC(), tm.UTC()) +} + +func TestUnalignedTicker(t *testing.T) { + interval := 10 * time.Second + jitter := 0 * time.Second + + clock := clock.NewMock() + clock.Add(1 * time.Second) + since := clock.Now() + until := since.Add(60 * time.Second) + + ticker := newUnalignedTicker(interval, jitter, clock) + + expected := []time.Time{ + time.Unix(1, 0).UTC(), + time.Unix(11, 0).UTC(), + time.Unix(21, 0).UTC(), + time.Unix(31, 0).UTC(), + time.Unix(41, 0).UTC(), + time.Unix(51, 0).UTC(), + time.Unix(61, 0).UTC(), + } + + actual := []time.Time{} + for !clock.Now().After(until) { + select { + case tm := <-ticker.Elapsed(): + actual = append(actual, tm.UTC()) + default: + } + clock.Add(10 * time.Second) + } + + require.Equal(t, expected, actual) +} + +func TestRollingTicker(t *testing.T) { + interval := 10 * time.Second + jitter := 0 * time.Second + + clock := clock.NewMock() + clock.Add(1 * time.Second) + since := clock.Now() + until := since.Add(60 * time.Second) + + ticker := newUnalignedTicker(interval, jitter, clock) + + expected := []time.Time{ + time.Unix(1, 0).UTC(), + time.Unix(11, 0).UTC(), + time.Unix(21, 0).UTC(), + time.Unix(31, 0).UTC(), + time.Unix(41, 0).UTC(), + time.Unix(51, 0).UTC(), + time.Unix(61, 0).UTC(), + } + + actual := []time.Time{} + for !clock.Now().After(until) { + select { + case tm := <-ticker.Elapsed(): + actual = append(actual, tm.UTC()) + default: + } + clock.Add(10 * time.Second) + } + + require.Equal(t, expected, actual) +} + +// Simulates running the Ticker for an hour and displays stats about the +// operation. +func TestAlignedTickerDistribution(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + interval := 10 * time.Second + jitter := 5 * time.Second + + clock := clock.NewMock() + since := clock.Now() + + ticker := newAlignedTicker(since, interval, jitter, clock) + dist := simulatedDist(ticker, clock) + printDist(dist) + require.True(t, 350 < dist.Count) + require.True(t, 9 < dist.Mean() && dist.Mean() < 11) +} + +// Simulates running the Ticker for an hour and displays stats about the +// operation. +func TestUnalignedTickerDistribution(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + interval := 10 * time.Second + jitter := 5 * time.Second + + clock := clock.NewMock() + + ticker := newUnalignedTicker(interval, jitter, clock) + dist := simulatedDist(ticker, clock) + printDist(dist) + require.True(t, 350 < dist.Count) + require.True(t, 9 < dist.Mean() && dist.Mean() < 11) +} + +// Simulates running the Ticker for an hour and displays stats about the +// operation. +func TestRollingTickerDistribution(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + interval := 10 * time.Second + jitter := 5 * time.Second + + clock := clock.NewMock() + + ticker := newRollingTicker(interval, jitter, clock) + dist := simulatedDist(ticker, clock) + printDist(dist) + require.True(t, 275 < dist.Count) + require.True(t, 12 < dist.Mean() && 13 > dist.Mean()) +} + +type Distribution struct { + Buckets [60]int + Count int + Waittime float64 +} + +func (d *Distribution) Mean() float64 { + return d.Waittime / float64(d.Count) +} + +func printDist(dist Distribution) { + for i, count := range dist.Buckets { + fmt.Printf("%2d %s\n", i, strings.Repeat("x", count)) + } + fmt.Printf("Average interval: %f\n", dist.Mean()) + fmt.Printf("Count: %d\n", dist.Count) +} + +func simulatedDist(ticker Ticker, clock *clock.Mock) Distribution { + since := clock.Now() + until := since.Add(1 * time.Hour) + + var dist Distribution + + last := clock.Now() + for !clock.Now().After(until) { + select { + case tm := <-ticker.Elapsed(): + dist.Buckets[tm.Second()] += 1 + dist.Count++ + dist.Waittime += tm.Sub(last).Seconds() + last = tm + default: + clock.Add(1 * time.Second) + } + } + + return dist +} diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 0c7436941..4b811d8b7 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -25,6 +25,7 @@ following works: - github.com/aristanetworks/glog [Apache License 2.0](https://github.com/aristanetworks/glog/blob/master/LICENSE) - github.com/aristanetworks/goarista [Apache License 2.0](https://github.com/aristanetworks/goarista/blob/master/COPYING) - github.com/aws/aws-sdk-go [Apache License 2.0](https://github.com/aws/aws-sdk-go/blob/master/LICENSE.txt) +- github.com/benbjohnson/clock [MIT License](https://github.com/benbjohnson/clock/blob/master/LICENSE) - github.com/beorn7/perks [MIT License](https://github.com/beorn7/perks/blob/master/LICENSE) - github.com/caio/go-tdigest [MIT License](https://github.com/caio/go-tdigest/blob/master/LICENSE) - github.com/cenkalti/backoff [MIT License](https://github.com/cenkalti/backoff/blob/master/LICENSE) diff --git a/go.mod b/go.mod index 4986adc77..9645c925f 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/aristanetworks/goarista v0.0.0-20190325233358-a123909ec740 github.com/armon/go-metrics v0.3.0 // indirect github.com/aws/aws-sdk-go v1.30.9 + github.com/benbjohnson/clock v1.0.0 github.com/bitly/go-hostpool v0.1.0 // indirect github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect github.com/caio/go-tdigest v2.3.0+incompatible // indirect diff --git a/go.sum b/go.sum index d0d30aa18..53073401d 100644 --- a/go.sum +++ b/go.sum @@ -112,6 +112,8 @@ github.com/armon/go-metrics v0.3.0 h1:B7AQgHi8QSEi4uHu7Sbsga+IJDU+CENgjxoo81vDUq github.com/armon/go-metrics v0.3.0/go.mod h1:zXjbSimjXTd7vOpY8B0/2LpvNvDoXBuplAD+gJD3GYs= github.com/aws/aws-sdk-go v1.30.9 h1:DntpBUKkchINPDbhEzDRin1eEn1TG9TZFlzWPf0i8to= github.com/aws/aws-sdk-go v1.30.9/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= +github.com/benbjohnson/clock v1.0.0 h1:78Jk/r6m4wCi6sndMpty7A//t4dw/RW5fV4ZgDVfX1w= +github.com/benbjohnson/clock v1.0.0/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= diff --git a/internal/internal.go b/internal/internal.go index 12e4b3af2..777128f66 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -5,12 +5,11 @@ import ( "bytes" "compress/gzip" "context" - "crypto/rand" "errors" "fmt" "io" "math" - "math/big" + "math/rand" "os" "os/exec" "runtime" @@ -211,12 +210,8 @@ func RandomSleep(max time.Duration, shutdown chan struct{}) { if max == 0 { return } - maxSleep := big.NewInt(max.Nanoseconds()) - var sleepns int64 - if j, err := rand.Int(rand.Reader, maxSleep); err == nil { - sleepns = j.Int64() - } + sleepns := rand.Int63n(max.Nanoseconds()) t := time.NewTimer(time.Nanosecond * time.Duration(sleepns)) select { @@ -234,11 +229,7 @@ func RandomDuration(max time.Duration) time.Duration { return 0 } - var sleepns int64 - maxSleep := big.NewInt(max.Nanoseconds()) - if j, err := rand.Int(rand.Reader, maxSleep); err == nil { - sleepns = j.Int64() - } + sleepns := rand.Int63n(max.Nanoseconds()) return time.Duration(sleepns) }