Rework plugin tickers to prevent drift and spread write ticks (#7390)
This commit is contained in:
		
							parent
							
								
									c8dbf13fc1
								
							
						
					
					
						commit
						fd76c8bf21
					
				|  | @ -265,57 +265,45 @@ func (a *Agent) runInputs( | ||||||
| 			interval = input.Config.Interval | 			interval = input.Config.Interval | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
|  | 		var ticker Ticker | ||||||
|  | 		if a.Config.Agent.RoundInterval { | ||||||
|  | 			ticker = NewAlignedTicker(startTime, interval, jitter) | ||||||
|  | 		} else { | ||||||
|  | 			ticker = NewUnalignedTicker(interval, jitter) | ||||||
|  | 		} | ||||||
|  | 		defer ticker.Stop() | ||||||
|  | 
 | ||||||
| 		acc := NewAccumulator(input, dst) | 		acc := NewAccumulator(input, dst) | ||||||
| 		acc.SetPrecision(a.Precision()) | 		acc.SetPrecision(a.Precision()) | ||||||
| 
 | 
 | ||||||
| 		wg.Add(1) | 		wg.Add(1) | ||||||
| 		go func(input *models.RunningInput) { | 		go func(input *models.RunningInput) { | ||||||
| 			defer wg.Done() | 			defer wg.Done() | ||||||
| 
 | 			a.gatherLoop(ctx, acc, input, ticker) | ||||||
| 			if a.Config.Agent.RoundInterval { |  | ||||||
| 				err := internal.SleepContext( |  | ||||||
| 					ctx, internal.AlignDuration(startTime, interval)) |  | ||||||
| 				if err != nil { |  | ||||||
| 					return |  | ||||||
| 				} |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			a.gatherOnInterval(ctx, acc, input, interval, jitter) |  | ||||||
| 		}(input) | 		}(input) | ||||||
| 	} | 	} | ||||||
| 	wg.Wait() |  | ||||||
| 
 | 
 | ||||||
|  | 	wg.Wait() | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // gather runs an input's gather function periodically until the context is
 | // gather runs an input's gather function periodically until the context is
 | ||||||
| // done.
 | // done.
 | ||||||
| func (a *Agent) gatherOnInterval( | func (a *Agent) gatherLoop( | ||||||
| 	ctx context.Context, | 	ctx context.Context, | ||||||
| 	acc telegraf.Accumulator, | 	acc telegraf.Accumulator, | ||||||
| 	input *models.RunningInput, | 	input *models.RunningInput, | ||||||
| 	interval time.Duration, | 	ticker Ticker, | ||||||
| 	jitter time.Duration, |  | ||||||
| ) { | ) { | ||||||
| 	defer panicRecover(input) | 	defer panicRecover(input) | ||||||
| 
 | 
 | ||||||
| 	ticker := time.NewTicker(interval) |  | ||||||
| 	defer ticker.Stop() |  | ||||||
| 
 |  | ||||||
| 	for { | 	for { | ||||||
| 		err := internal.SleepContext(ctx, internal.RandomDuration(jitter)) | 		select { | ||||||
| 		if err != nil { | 		case <-ticker.Elapsed(): | ||||||
| 			return | 			err := a.gatherOnce(acc, input, ticker) | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		err = a.gatherOnce(acc, input, interval) |  | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				acc.AddError(err) | 				acc.AddError(err) | ||||||
| 			} | 			} | ||||||
| 
 |  | ||||||
| 		select { |  | ||||||
| 		case <-ticker.C: |  | ||||||
| 			continue |  | ||||||
| 		case <-ctx.Done(): | 		case <-ctx.Done(): | ||||||
| 			return | 			return | ||||||
| 		} | 		} | ||||||
|  | @ -327,11 +315,8 @@ func (a *Agent) gatherOnInterval( | ||||||
| func (a *Agent) gatherOnce( | func (a *Agent) gatherOnce( | ||||||
| 	acc telegraf.Accumulator, | 	acc telegraf.Accumulator, | ||||||
| 	input *models.RunningInput, | 	input *models.RunningInput, | ||||||
| 	timeout time.Duration, | 	ticker Ticker, | ||||||
| ) error { | ) error { | ||||||
| 	ticker := time.NewTicker(timeout) |  | ||||||
| 	defer ticker.Stop() |  | ||||||
| 
 |  | ||||||
| 	done := make(chan error) | 	done := make(chan error) | ||||||
| 	go func() { | 	go func() { | ||||||
| 		done <- input.Gather(acc) | 		done <- input.Gather(acc) | ||||||
|  | @ -341,7 +326,7 @@ func (a *Agent) gatherOnce( | ||||||
| 		select { | 		select { | ||||||
| 		case err := <-done: | 		case err := <-done: | ||||||
| 			return err | 			return err | ||||||
| 		case <-ticker.C: | 		case <-ticker.Elapsed(): | ||||||
| 			log.Printf("W! [agent] [%s] did not complete within its interval", | 			log.Printf("W! [agent] [%s] did not complete within its interval", | ||||||
| 				input.LogName()) | 				input.LogName()) | ||||||
| 		} | 		} | ||||||
|  | @ -514,10 +499,13 @@ func (a *Agent) runOutputs( | ||||||
| 			jitter = *output.Config.FlushJitter | 			jitter = *output.Config.FlushJitter | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
|  | 		ticker := NewRollingTicker(interval, jitter) | ||||||
|  | 		defer ticker.Stop() | ||||||
|  | 
 | ||||||
| 		wg.Add(1) | 		wg.Add(1) | ||||||
| 		go func(output *models.RunningOutput) { | 		go func(output *models.RunningOutput) { | ||||||
| 			defer wg.Done() | 			defer wg.Done() | ||||||
| 			a.flushLoop(ctx, startTime, output, interval, jitter) | 			a.flushLoop(ctx, output, ticker) | ||||||
| 		}(output) | 		}(output) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -542,10 +530,8 @@ func (a *Agent) runOutputs( | ||||||
| // done.
 | // done.
 | ||||||
| func (a *Agent) flushLoop( | func (a *Agent) flushLoop( | ||||||
| 	ctx context.Context, | 	ctx context.Context, | ||||||
| 	startTime time.Time, |  | ||||||
| 	output *models.RunningOutput, | 	output *models.RunningOutput, | ||||||
| 	interval time.Duration, | 	ticker Ticker, | ||||||
| 	jitter time.Duration, |  | ||||||
| ) { | ) { | ||||||
| 	logError := func(err error) { | 	logError := func(err error) { | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
|  | @ -558,44 +544,30 @@ func (a *Agent) flushLoop( | ||||||
| 	watchForFlushSignal(flushRequested) | 	watchForFlushSignal(flushRequested) | ||||||
| 	defer stopListeningForFlushSignal(flushRequested) | 	defer stopListeningForFlushSignal(flushRequested) | ||||||
| 
 | 
 | ||||||
| 	// align to round interval
 |  | ||||||
| 	if a.Config.Agent.RoundInterval { |  | ||||||
| 		err := internal.SleepContext( |  | ||||||
| 			ctx, internal.AlignDuration(startTime, interval)) |  | ||||||
| 		if err != nil { |  | ||||||
| 			return |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	// since we are watching two channels we need a ticker with the jitter
 |  | ||||||
| 	// integrated.
 |  | ||||||
| 	ticker := NewTicker(interval, jitter) |  | ||||||
| 	defer ticker.Stop() |  | ||||||
| 
 |  | ||||||
| 	for { | 	for { | ||||||
| 		// Favor shutdown over other methods.
 | 		// Favor shutdown over other methods.
 | ||||||
| 		select { | 		select { | ||||||
| 		case <-ctx.Done(): | 		case <-ctx.Done(): | ||||||
| 			logError(a.flushOnce(output, interval, output.Write)) | 			logError(a.flushOnce(output, ticker, output.Write)) | ||||||
| 			return | 			return | ||||||
| 		default: | 		default: | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		select { | 		select { | ||||||
| 		case <-ctx.Done(): | 		case <-ctx.Done(): | ||||||
| 			logError(a.flushOnce(output, interval, output.Write)) | 			logError(a.flushOnce(output, ticker, output.Write)) | ||||||
| 			return | 			return | ||||||
| 		case <-ticker.C: | 		case <-ticker.Elapsed(): | ||||||
| 			logError(a.flushOnce(output, interval, output.Write)) | 			logError(a.flushOnce(output, ticker, output.Write)) | ||||||
| 		case <-flushRequested: | 		case <-flushRequested: | ||||||
| 			logError(a.flushOnce(output, interval, output.Write)) | 			logError(a.flushOnce(output, ticker, output.Write)) | ||||||
| 		case <-output.BatchReady: | 		case <-output.BatchReady: | ||||||
| 			// Favor the ticker over batch ready
 | 			// Favor the ticker over batch ready
 | ||||||
| 			select { | 			select { | ||||||
| 			case <-ticker.C: | 			case <-ticker.Elapsed(): | ||||||
| 				logError(a.flushOnce(output, interval, output.Write)) | 				logError(a.flushOnce(output, ticker, output.Write)) | ||||||
| 			default: | 			default: | ||||||
| 				logError(a.flushOnce(output, interval, output.WriteBatch)) | 				logError(a.flushOnce(output, ticker, output.WriteBatch)) | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | @ -605,12 +577,9 @@ func (a *Agent) flushLoop( | ||||||
| // interval it fails to complete before.
 | // interval it fails to complete before.
 | ||||||
| func (a *Agent) flushOnce( | func (a *Agent) flushOnce( | ||||||
| 	output *models.RunningOutput, | 	output *models.RunningOutput, | ||||||
| 	timeout time.Duration, | 	ticker Ticker, | ||||||
| 	writeFunc func() error, | 	writeFunc func() error, | ||||||
| ) error { | ) error { | ||||||
| 	ticker := time.NewTicker(timeout) |  | ||||||
| 	defer ticker.Stop() |  | ||||||
| 
 |  | ||||||
| 	done := make(chan error) | 	done := make(chan error) | ||||||
| 	go func() { | 	go func() { | ||||||
| 		done <- writeFunc() | 		done <- writeFunc() | ||||||
|  | @ -621,7 +590,7 @@ func (a *Agent) flushOnce( | ||||||
| 		case err := <-done: | 		case err := <-done: | ||||||
| 			output.LogBufferStatus() | 			output.LogBufferStatus() | ||||||
| 			return err | 			return err | ||||||
| 		case <-ticker.C: | 		case <-ticker.Elapsed(): | ||||||
| 			log.Printf("W! [agent] [%q] did not complete within its flush interval", | 			log.Printf("W! [agent] [%q] did not complete within its flush interval", | ||||||
| 				output.LogName()) | 				output.LogName()) | ||||||
| 			output.LogBufferStatus() | 			output.LogBufferStatus() | ||||||
|  |  | ||||||
							
								
								
									
										259
									
								
								agent/tick.go
								
								
								
								
							
							
						
						
									
										259
									
								
								agent/tick.go
								
								
								
								
							|  | @ -5,53 +5,264 @@ import ( | ||||||
| 	"sync" | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
| 
 | 
 | ||||||
|  | 	"github.com/benbjohnson/clock" | ||||||
| 	"github.com/influxdata/telegraf/internal" | 	"github.com/influxdata/telegraf/internal" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| type Ticker struct { | type empty struct{} | ||||||
| 	C          chan time.Time | 
 | ||||||
| 	ticker     *time.Ticker | type Ticker interface { | ||||||
| 	jitter     time.Duration | 	Elapsed() <-chan time.Time | ||||||
| 	wg         sync.WaitGroup | 	Stop() | ||||||
| 	cancelFunc context.CancelFunc |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func NewTicker( | // AlignedTicker delivers ticks at aligned times plus an optional jitter.  Each
 | ||||||
| 	interval time.Duration, | // tick is realigned to avoid drift and handle changes to the system clock.
 | ||||||
| 	jitter time.Duration, | //
 | ||||||
| ) *Ticker { | // The ticks may have an jitter duration applied to them as an random offset to
 | ||||||
| 	ctx, cancel := context.WithCancel(context.Background()) | // the interval.  However the overall pace of is that of the interval, so on
 | ||||||
|  | // average you will have one collection each interval.
 | ||||||
|  | //
 | ||||||
|  | // The first tick is emitted at the next alignment.
 | ||||||
|  | //
 | ||||||
|  | // Ticks are dropped for slow consumers.
 | ||||||
|  | //
 | ||||||
|  | // The implementation currently does not recalculate until the next tick with
 | ||||||
|  | // no maximum sleep, when using large intervals alignment is not corrected
 | ||||||
|  | // until the next tick.
 | ||||||
|  | type AlignedTicker struct { | ||||||
|  | 	interval time.Duration | ||||||
|  | 	jitter   time.Duration | ||||||
|  | 	ch       chan time.Time | ||||||
|  | 	cancel   context.CancelFunc | ||||||
|  | 	wg       sync.WaitGroup | ||||||
|  | } | ||||||
| 
 | 
 | ||||||
| 	t := &Ticker{ | func NewAlignedTicker(now time.Time, interval, jitter time.Duration) *AlignedTicker { | ||||||
| 		C:          make(chan time.Time, 1), | 	return newAlignedTicker(now, interval, jitter, clock.New()) | ||||||
| 		ticker:     time.NewTicker(interval), | } | ||||||
|  | 
 | ||||||
|  | func newAlignedTicker(now time.Time, interval, jitter time.Duration, clock clock.Clock) *AlignedTicker { | ||||||
|  | 	ctx, cancel := context.WithCancel(context.Background()) | ||||||
|  | 	t := &AlignedTicker{ | ||||||
|  | 		interval: interval, | ||||||
| 		jitter:   jitter, | 		jitter:   jitter, | ||||||
| 		cancelFunc: cancel, | 		ch:       make(chan time.Time, 1), | ||||||
|  | 		cancel:   cancel, | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	d := t.next(now) | ||||||
|  | 	timer := clock.Timer(d) | ||||||
|  | 
 | ||||||
| 	t.wg.Add(1) | 	t.wg.Add(1) | ||||||
| 	go t.relayTime(ctx) | 	go func() { | ||||||
|  | 		defer t.wg.Done() | ||||||
|  | 		t.run(ctx, timer) | ||||||
|  | 	}() | ||||||
| 
 | 
 | ||||||
| 	return t | 	return t | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (t *Ticker) Stop() { | func (t *AlignedTicker) next(now time.Time) time.Duration { | ||||||
| 	t.cancelFunc() | 	next := internal.AlignTime(now, t.interval) | ||||||
|  | 	d := next.Sub(now) | ||||||
|  | 	if d == 0 { | ||||||
|  | 		d = t.interval | ||||||
|  | 	} | ||||||
|  | 	d += internal.RandomDuration(t.jitter) | ||||||
|  | 	return d | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *AlignedTicker) run(ctx context.Context, timer *clock.Timer) { | ||||||
|  | 	for { | ||||||
|  | 		select { | ||||||
|  | 		case <-ctx.Done(): | ||||||
|  | 			timer.Stop() | ||||||
|  | 			return | ||||||
|  | 		case now := <-timer.C: | ||||||
|  | 			select { | ||||||
|  | 			case t.ch <- now: | ||||||
|  | 			default: | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			d := t.next(now) | ||||||
|  | 			timer.Reset(d) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *AlignedTicker) Elapsed() <-chan time.Time { | ||||||
|  | 	return t.ch | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *AlignedTicker) Stop() { | ||||||
|  | 	t.cancel() | ||||||
| 	t.wg.Wait() | 	t.wg.Wait() | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (t *Ticker) relayTime(ctx context.Context) { | // UnalignedTicker delivers ticks at regular but unaligned intervals.  No
 | ||||||
|  | // effort is made to avoid drift.
 | ||||||
|  | //
 | ||||||
|  | // The ticks may have an jitter duration applied to them as an random offset to
 | ||||||
|  | // the interval.  However the overall pace of is that of the interval, so on
 | ||||||
|  | // average you will have one collection each interval.
 | ||||||
|  | //
 | ||||||
|  | // The first tick is emitted immediately.
 | ||||||
|  | //
 | ||||||
|  | // Ticks are dropped for slow consumers.
 | ||||||
|  | type UnalignedTicker struct { | ||||||
|  | 	interval time.Duration | ||||||
|  | 	jitter   time.Duration | ||||||
|  | 	ch       chan time.Time | ||||||
|  | 	cancel   context.CancelFunc | ||||||
|  | 	wg       sync.WaitGroup | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func NewUnalignedTicker(interval, jitter time.Duration) *UnalignedTicker { | ||||||
|  | 	return newUnalignedTicker(interval, jitter, clock.New()) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func newUnalignedTicker(interval, jitter time.Duration, clock clock.Clock) *UnalignedTicker { | ||||||
|  | 	ctx, cancel := context.WithCancel(context.Background()) | ||||||
|  | 	t := &UnalignedTicker{ | ||||||
|  | 		interval: interval, | ||||||
|  | 		jitter:   jitter, | ||||||
|  | 		ch:       make(chan time.Time, 1), | ||||||
|  | 		cancel:   cancel, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	ticker := clock.Ticker(t.interval) | ||||||
|  | 	t.ch <- clock.Now() | ||||||
|  | 
 | ||||||
|  | 	t.wg.Add(1) | ||||||
|  | 	go func() { | ||||||
| 		defer t.wg.Done() | 		defer t.wg.Done() | ||||||
|  | 		t.run(ctx, ticker, clock) | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	return t | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func sleep(ctx context.Context, duration time.Duration, clock clock.Clock) error { | ||||||
|  | 	if duration == 0 { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	t := clock.Timer(duration) | ||||||
|  | 	select { | ||||||
|  | 	case <-t.C: | ||||||
|  | 		return nil | ||||||
|  | 	case <-ctx.Done(): | ||||||
|  | 		t.Stop() | ||||||
|  | 		return ctx.Err() | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *UnalignedTicker) run(ctx context.Context, ticker *clock.Ticker, clock clock.Clock) { | ||||||
| 	for { | 	for { | ||||||
| 		select { | 		select { | ||||||
| 		case tm := <-t.ticker.C: | 		case <-ctx.Done(): | ||||||
| 			internal.SleepContext(ctx, internal.RandomDuration(t.jitter)) | 			ticker.Stop() | ||||||
|  | 			return | ||||||
|  | 		case <-ticker.C: | ||||||
|  | 			jitter := internal.RandomDuration(t.jitter) | ||||||
|  | 			err := sleep(ctx, jitter, clock) | ||||||
|  | 			if err != nil { | ||||||
|  | 				ticker.Stop() | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
| 			select { | 			select { | ||||||
| 			case t.C <- tm: | 			case t.ch <- clock.Now(): | ||||||
| 			default: | 			default: | ||||||
| 			} | 			} | ||||||
| 		case <-ctx.Done(): |  | ||||||
| 			return |  | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func (t *UnalignedTicker) InjectTick() { | ||||||
|  | 	t.ch <- time.Now() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *UnalignedTicker) Elapsed() <-chan time.Time { | ||||||
|  | 	return t.ch | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *UnalignedTicker) Stop() { | ||||||
|  | 	t.cancel() | ||||||
|  | 	t.wg.Wait() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // RollingTicker delivers ticks at regular but unaligned intervals.
 | ||||||
|  | //
 | ||||||
|  | // Because the next interval is scheduled based on the interval + jitter, you
 | ||||||
|  | // are guaranteed at least interval seconds without missing a tick and ticks
 | ||||||
|  | // will be evenly scheduled over time.
 | ||||||
|  | //
 | ||||||
|  | // On average you will have one collection each interval + (jitter/2).
 | ||||||
|  | //
 | ||||||
|  | // The first tick is emitted after interval+jitter seconds.
 | ||||||
|  | //
 | ||||||
|  | // Ticks are dropped for slow consumers.
 | ||||||
|  | type RollingTicker struct { | ||||||
|  | 	interval time.Duration | ||||||
|  | 	jitter   time.Duration | ||||||
|  | 	ch       chan time.Time | ||||||
|  | 	cancel   context.CancelFunc | ||||||
|  | 	wg       sync.WaitGroup | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func NewRollingTicker(interval, jitter time.Duration) *RollingTicker { | ||||||
|  | 	return newRollingTicker(interval, jitter, clock.New()) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func newRollingTicker(interval, jitter time.Duration, clock clock.Clock) *RollingTicker { | ||||||
|  | 	ctx, cancel := context.WithCancel(context.Background()) | ||||||
|  | 	t := &RollingTicker{ | ||||||
|  | 		interval: interval, | ||||||
|  | 		jitter:   jitter, | ||||||
|  | 		ch:       make(chan time.Time, 1), | ||||||
|  | 		cancel:   cancel, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	d := t.next() | ||||||
|  | 	timer := clock.Timer(d) | ||||||
|  | 
 | ||||||
|  | 	t.wg.Add(1) | ||||||
|  | 	go func() { | ||||||
|  | 		defer t.wg.Done() | ||||||
|  | 		t.run(ctx, timer) | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	return t | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *RollingTicker) next() time.Duration { | ||||||
|  | 	return t.interval + internal.RandomDuration(t.jitter) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *RollingTicker) run(ctx context.Context, timer *clock.Timer) { | ||||||
|  | 	for { | ||||||
|  | 		select { | ||||||
|  | 		case <-ctx.Done(): | ||||||
|  | 			timer.Stop() | ||||||
|  | 			return | ||||||
|  | 		case now := <-timer.C: | ||||||
|  | 			select { | ||||||
|  | 			case t.ch <- now: | ||||||
|  | 			default: | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			d := t.next() | ||||||
|  | 			timer.Reset(d) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *RollingTicker) Elapsed() <-chan time.Time { | ||||||
|  | 	return t.ch | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *RollingTicker) Stop() { | ||||||
|  | 	t.cancel() | ||||||
|  | 	t.wg.Wait() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | @ -0,0 +1,251 @@ | ||||||
|  | package agent | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"strings" | ||||||
|  | 	"testing" | ||||||
|  | 	"time" | ||||||
|  | 
 | ||||||
|  | 	"github.com/benbjohnson/clock" | ||||||
|  | 	"github.com/stretchr/testify/require" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | var format = "2006-01-02T15:04:05.999Z07:00" | ||||||
|  | 
 | ||||||
|  | func TestAlignedTicker(t *testing.T) { | ||||||
|  | 	interval := 10 * time.Second | ||||||
|  | 	jitter := 0 * time.Second | ||||||
|  | 
 | ||||||
|  | 	clock := clock.NewMock() | ||||||
|  | 	since := clock.Now() | ||||||
|  | 	until := since.Add(60 * time.Second) | ||||||
|  | 
 | ||||||
|  | 	ticker := newAlignedTicker(since, interval, jitter, clock) | ||||||
|  | 
 | ||||||
|  | 	expected := []time.Time{ | ||||||
|  | 		time.Unix(10, 0).UTC(), | ||||||
|  | 		time.Unix(20, 0).UTC(), | ||||||
|  | 		time.Unix(30, 0).UTC(), | ||||||
|  | 		time.Unix(40, 0).UTC(), | ||||||
|  | 		time.Unix(50, 0).UTC(), | ||||||
|  | 		time.Unix(60, 0).UTC(), | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	actual := []time.Time{} | ||||||
|  | 	for !clock.Now().After(until) { | ||||||
|  | 		select { | ||||||
|  | 		case tm := <-ticker.Elapsed(): | ||||||
|  | 			actual = append(actual, tm.UTC()) | ||||||
|  | 		default: | ||||||
|  | 		} | ||||||
|  | 		clock.Add(10 * time.Second) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	require.Equal(t, expected, actual) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestAlignedTickerJitter(t *testing.T) { | ||||||
|  | 	interval := 10 * time.Second | ||||||
|  | 	jitter := 5 * time.Second | ||||||
|  | 
 | ||||||
|  | 	clock := clock.NewMock() | ||||||
|  | 	since := clock.Now() | ||||||
|  | 	until := since.Add(60 * time.Second) | ||||||
|  | 
 | ||||||
|  | 	ticker := newAlignedTicker(since, interval, jitter, clock) | ||||||
|  | 
 | ||||||
|  | 	last := since | ||||||
|  | 	for !clock.Now().After(until) { | ||||||
|  | 		select { | ||||||
|  | 		case tm := <-ticker.Elapsed(): | ||||||
|  | 			require.True(t, tm.Sub(last) <= 15*time.Second) | ||||||
|  | 			require.True(t, tm.Sub(last) >= 5*time.Second) | ||||||
|  | 			last = last.Add(interval) | ||||||
|  | 		default: | ||||||
|  | 		} | ||||||
|  | 		clock.Add(5 * time.Second) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestAlignedTickerMissedTick(t *testing.T) { | ||||||
|  | 	interval := 10 * time.Second | ||||||
|  | 	jitter := 0 * time.Second | ||||||
|  | 
 | ||||||
|  | 	clock := clock.NewMock() | ||||||
|  | 	since := clock.Now() | ||||||
|  | 
 | ||||||
|  | 	ticker := newAlignedTicker(since, interval, jitter, clock) | ||||||
|  | 
 | ||||||
|  | 	clock.Add(25 * time.Second) | ||||||
|  | 	tm := <-ticker.Elapsed() | ||||||
|  | 	require.Equal(t, time.Unix(10, 0).UTC(), tm.UTC()) | ||||||
|  | 	clock.Add(5 * time.Second) | ||||||
|  | 	tm = <-ticker.Elapsed() | ||||||
|  | 	require.Equal(t, time.Unix(30, 0).UTC(), tm.UTC()) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestUnalignedTicker(t *testing.T) { | ||||||
|  | 	interval := 10 * time.Second | ||||||
|  | 	jitter := 0 * time.Second | ||||||
|  | 
 | ||||||
|  | 	clock := clock.NewMock() | ||||||
|  | 	clock.Add(1 * time.Second) | ||||||
|  | 	since := clock.Now() | ||||||
|  | 	until := since.Add(60 * time.Second) | ||||||
|  | 
 | ||||||
|  | 	ticker := newUnalignedTicker(interval, jitter, clock) | ||||||
|  | 
 | ||||||
|  | 	expected := []time.Time{ | ||||||
|  | 		time.Unix(1, 0).UTC(), | ||||||
|  | 		time.Unix(11, 0).UTC(), | ||||||
|  | 		time.Unix(21, 0).UTC(), | ||||||
|  | 		time.Unix(31, 0).UTC(), | ||||||
|  | 		time.Unix(41, 0).UTC(), | ||||||
|  | 		time.Unix(51, 0).UTC(), | ||||||
|  | 		time.Unix(61, 0).UTC(), | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	actual := []time.Time{} | ||||||
|  | 	for !clock.Now().After(until) { | ||||||
|  | 		select { | ||||||
|  | 		case tm := <-ticker.Elapsed(): | ||||||
|  | 			actual = append(actual, tm.UTC()) | ||||||
|  | 		default: | ||||||
|  | 		} | ||||||
|  | 		clock.Add(10 * time.Second) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	require.Equal(t, expected, actual) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestRollingTicker(t *testing.T) { | ||||||
|  | 	interval := 10 * time.Second | ||||||
|  | 	jitter := 0 * time.Second | ||||||
|  | 
 | ||||||
|  | 	clock := clock.NewMock() | ||||||
|  | 	clock.Add(1 * time.Second) | ||||||
|  | 	since := clock.Now() | ||||||
|  | 	until := since.Add(60 * time.Second) | ||||||
|  | 
 | ||||||
|  | 	ticker := newUnalignedTicker(interval, jitter, clock) | ||||||
|  | 
 | ||||||
|  | 	expected := []time.Time{ | ||||||
|  | 		time.Unix(1, 0).UTC(), | ||||||
|  | 		time.Unix(11, 0).UTC(), | ||||||
|  | 		time.Unix(21, 0).UTC(), | ||||||
|  | 		time.Unix(31, 0).UTC(), | ||||||
|  | 		time.Unix(41, 0).UTC(), | ||||||
|  | 		time.Unix(51, 0).UTC(), | ||||||
|  | 		time.Unix(61, 0).UTC(), | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	actual := []time.Time{} | ||||||
|  | 	for !clock.Now().After(until) { | ||||||
|  | 		select { | ||||||
|  | 		case tm := <-ticker.Elapsed(): | ||||||
|  | 			actual = append(actual, tm.UTC()) | ||||||
|  | 		default: | ||||||
|  | 		} | ||||||
|  | 		clock.Add(10 * time.Second) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	require.Equal(t, expected, actual) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Simulates running the Ticker for an hour and displays stats about the
 | ||||||
|  | // operation.
 | ||||||
|  | func TestAlignedTickerDistribution(t *testing.T) { | ||||||
|  | 	if testing.Short() { | ||||||
|  | 		t.Skip("skipping test in short mode.") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	interval := 10 * time.Second | ||||||
|  | 	jitter := 5 * time.Second | ||||||
|  | 
 | ||||||
|  | 	clock := clock.NewMock() | ||||||
|  | 	since := clock.Now() | ||||||
|  | 
 | ||||||
|  | 	ticker := newAlignedTicker(since, interval, jitter, clock) | ||||||
|  | 	dist := simulatedDist(ticker, clock) | ||||||
|  | 	printDist(dist) | ||||||
|  | 	require.True(t, 350 < dist.Count) | ||||||
|  | 	require.True(t, 9 < dist.Mean() && dist.Mean() < 11) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Simulates running the Ticker for an hour and displays stats about the
 | ||||||
|  | // operation.
 | ||||||
|  | func TestUnalignedTickerDistribution(t *testing.T) { | ||||||
|  | 	if testing.Short() { | ||||||
|  | 		t.Skip("skipping test in short mode.") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	interval := 10 * time.Second | ||||||
|  | 	jitter := 5 * time.Second | ||||||
|  | 
 | ||||||
|  | 	clock := clock.NewMock() | ||||||
|  | 
 | ||||||
|  | 	ticker := newUnalignedTicker(interval, jitter, clock) | ||||||
|  | 	dist := simulatedDist(ticker, clock) | ||||||
|  | 	printDist(dist) | ||||||
|  | 	require.True(t, 350 < dist.Count) | ||||||
|  | 	require.True(t, 9 < dist.Mean() && dist.Mean() < 11) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Simulates running the Ticker for an hour and displays stats about the
 | ||||||
|  | // operation.
 | ||||||
|  | func TestRollingTickerDistribution(t *testing.T) { | ||||||
|  | 	if testing.Short() { | ||||||
|  | 		t.Skip("skipping test in short mode.") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	interval := 10 * time.Second | ||||||
|  | 	jitter := 5 * time.Second | ||||||
|  | 
 | ||||||
|  | 	clock := clock.NewMock() | ||||||
|  | 
 | ||||||
|  | 	ticker := newRollingTicker(interval, jitter, clock) | ||||||
|  | 	dist := simulatedDist(ticker, clock) | ||||||
|  | 	printDist(dist) | ||||||
|  | 	require.True(t, 275 < dist.Count) | ||||||
|  | 	require.True(t, 12 < dist.Mean() && 13 > dist.Mean()) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type Distribution struct { | ||||||
|  | 	Buckets  [60]int | ||||||
|  | 	Count    int | ||||||
|  | 	Waittime float64 | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (d *Distribution) Mean() float64 { | ||||||
|  | 	return d.Waittime / float64(d.Count) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func printDist(dist Distribution) { | ||||||
|  | 	for i, count := range dist.Buckets { | ||||||
|  | 		fmt.Printf("%2d %s\n", i, strings.Repeat("x", count)) | ||||||
|  | 	} | ||||||
|  | 	fmt.Printf("Average interval: %f\n", dist.Mean()) | ||||||
|  | 	fmt.Printf("Count: %d\n", dist.Count) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func simulatedDist(ticker Ticker, clock *clock.Mock) Distribution { | ||||||
|  | 	since := clock.Now() | ||||||
|  | 	until := since.Add(1 * time.Hour) | ||||||
|  | 
 | ||||||
|  | 	var dist Distribution | ||||||
|  | 
 | ||||||
|  | 	last := clock.Now() | ||||||
|  | 	for !clock.Now().After(until) { | ||||||
|  | 		select { | ||||||
|  | 		case tm := <-ticker.Elapsed(): | ||||||
|  | 			dist.Buckets[tm.Second()] += 1 | ||||||
|  | 			dist.Count++ | ||||||
|  | 			dist.Waittime += tm.Sub(last).Seconds() | ||||||
|  | 			last = tm | ||||||
|  | 		default: | ||||||
|  | 			clock.Add(1 * time.Second) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return dist | ||||||
|  | } | ||||||
|  | @ -25,6 +25,7 @@ following works: | ||||||
| - github.com/aristanetworks/glog [Apache License 2.0](https://github.com/aristanetworks/glog/blob/master/LICENSE) | - github.com/aristanetworks/glog [Apache License 2.0](https://github.com/aristanetworks/glog/blob/master/LICENSE) | ||||||
| - github.com/aristanetworks/goarista [Apache License 2.0](https://github.com/aristanetworks/goarista/blob/master/COPYING) | - github.com/aristanetworks/goarista [Apache License 2.0](https://github.com/aristanetworks/goarista/blob/master/COPYING) | ||||||
| - github.com/aws/aws-sdk-go [Apache License 2.0](https://github.com/aws/aws-sdk-go/blob/master/LICENSE.txt) | - github.com/aws/aws-sdk-go [Apache License 2.0](https://github.com/aws/aws-sdk-go/blob/master/LICENSE.txt) | ||||||
|  | - github.com/benbjohnson/clock [MIT License](https://github.com/benbjohnson/clock/blob/master/LICENSE) | ||||||
| - github.com/beorn7/perks [MIT License](https://github.com/beorn7/perks/blob/master/LICENSE) | - github.com/beorn7/perks [MIT License](https://github.com/beorn7/perks/blob/master/LICENSE) | ||||||
| - github.com/caio/go-tdigest [MIT License](https://github.com/caio/go-tdigest/blob/master/LICENSE) | - github.com/caio/go-tdigest [MIT License](https://github.com/caio/go-tdigest/blob/master/LICENSE) | ||||||
| - github.com/cenkalti/backoff [MIT License](https://github.com/cenkalti/backoff/blob/master/LICENSE) | - github.com/cenkalti/backoff [MIT License](https://github.com/cenkalti/backoff/blob/master/LICENSE) | ||||||
|  |  | ||||||
							
								
								
									
										1
									
								
								go.mod
								
								
								
								
							
							
						
						
									
										1
									
								
								go.mod
								
								
								
								
							|  | @ -26,6 +26,7 @@ require ( | ||||||
| 	github.com/aristanetworks/goarista v0.0.0-20190325233358-a123909ec740 | 	github.com/aristanetworks/goarista v0.0.0-20190325233358-a123909ec740 | ||||||
| 	github.com/armon/go-metrics v0.3.0 // indirect | 	github.com/armon/go-metrics v0.3.0 // indirect | ||||||
| 	github.com/aws/aws-sdk-go v1.30.9 | 	github.com/aws/aws-sdk-go v1.30.9 | ||||||
|  | 	github.com/benbjohnson/clock v1.0.0 | ||||||
| 	github.com/bitly/go-hostpool v0.1.0 // indirect | 	github.com/bitly/go-hostpool v0.1.0 // indirect | ||||||
| 	github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect | 	github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect | ||||||
| 	github.com/caio/go-tdigest v2.3.0+incompatible // indirect | 	github.com/caio/go-tdigest v2.3.0+incompatible // indirect | ||||||
|  |  | ||||||
							
								
								
									
										2
									
								
								go.sum
								
								
								
								
							
							
						
						
									
										2
									
								
								go.sum
								
								
								
								
							|  | @ -112,6 +112,8 @@ github.com/armon/go-metrics v0.3.0 h1:B7AQgHi8QSEi4uHu7Sbsga+IJDU+CENgjxoo81vDUq | ||||||
| github.com/armon/go-metrics v0.3.0/go.mod h1:zXjbSimjXTd7vOpY8B0/2LpvNvDoXBuplAD+gJD3GYs= | github.com/armon/go-metrics v0.3.0/go.mod h1:zXjbSimjXTd7vOpY8B0/2LpvNvDoXBuplAD+gJD3GYs= | ||||||
| github.com/aws/aws-sdk-go v1.30.9 h1:DntpBUKkchINPDbhEzDRin1eEn1TG9TZFlzWPf0i8to= | github.com/aws/aws-sdk-go v1.30.9 h1:DntpBUKkchINPDbhEzDRin1eEn1TG9TZFlzWPf0i8to= | ||||||
| github.com/aws/aws-sdk-go v1.30.9/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= | github.com/aws/aws-sdk-go v1.30.9/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= | ||||||
|  | github.com/benbjohnson/clock v1.0.0 h1:78Jk/r6m4wCi6sndMpty7A//t4dw/RW5fV4ZgDVfX1w= | ||||||
|  | github.com/benbjohnson/clock v1.0.0/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= | ||||||
| github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= | github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= | ||||||
| github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= | github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= | ||||||
| github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= | github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= | ||||||
|  |  | ||||||
|  | @ -5,12 +5,11 @@ import ( | ||||||
| 	"bytes" | 	"bytes" | ||||||
| 	"compress/gzip" | 	"compress/gzip" | ||||||
| 	"context" | 	"context" | ||||||
| 	"crypto/rand" |  | ||||||
| 	"errors" | 	"errors" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"io" | 	"io" | ||||||
| 	"math" | 	"math" | ||||||
| 	"math/big" | 	"math/rand" | ||||||
| 	"os" | 	"os" | ||||||
| 	"os/exec" | 	"os/exec" | ||||||
| 	"runtime" | 	"runtime" | ||||||
|  | @ -211,12 +210,8 @@ func RandomSleep(max time.Duration, shutdown chan struct{}) { | ||||||
| 	if max == 0 { | 	if max == 0 { | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 	maxSleep := big.NewInt(max.Nanoseconds()) |  | ||||||
| 
 | 
 | ||||||
| 	var sleepns int64 | 	sleepns := rand.Int63n(max.Nanoseconds()) | ||||||
| 	if j, err := rand.Int(rand.Reader, maxSleep); err == nil { |  | ||||||
| 		sleepns = j.Int64() |  | ||||||
| 	} |  | ||||||
| 
 | 
 | ||||||
| 	t := time.NewTimer(time.Nanosecond * time.Duration(sleepns)) | 	t := time.NewTimer(time.Nanosecond * time.Duration(sleepns)) | ||||||
| 	select { | 	select { | ||||||
|  | @ -234,11 +229,7 @@ func RandomDuration(max time.Duration) time.Duration { | ||||||
| 		return 0 | 		return 0 | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	var sleepns int64 | 	sleepns := rand.Int63n(max.Nanoseconds()) | ||||||
| 	maxSleep := big.NewInt(max.Nanoseconds()) |  | ||||||
| 	if j, err := rand.Int(rand.Reader, maxSleep); err == nil { |  | ||||||
| 		sleepns = j.Int64() |  | ||||||
| 	} |  | ||||||
| 
 | 
 | ||||||
| 	return time.Duration(sleepns) | 	return time.Duration(sleepns) | ||||||
| } | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue