From 892abec025dd596cdcd69a03a0556b52301125cb Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 30 May 2016 23:24:42 +0100 Subject: [PATCH] Refactor collection_jitter and flush_jitter use a common function between collection_jitter and flush_jitter. which creates the same behavior between the two options. going forward, both jitters will be random sleeps that get re-evaluated at runtime for every interval (previously only collection_jitter did this) also fixes behavior so that both jitters will exit in the event of a process exit. closes #1296 --- CHANGELOG.md | 5 +++ agent/agent.go | 45 +++--------------------- agent/agent_test.go | 73 --------------------------------------- internal/config/config.go | 1 - internal/internal.go | 25 ++++++++++++++ internal/internal_test.go | 25 ++++++++++++++ 6 files changed, 59 insertions(+), 115 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5fbcaf018..71d70e976 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ### Release Notes +- `flush_jitter` behavior has been changed. The random jitter will now be +evaluated at every flush interval, rather than once at startup. This makes it +consistent with the behavior of `collection_jitter`. + - All AWS plugins now utilize a standard mechanism for evaluating credentials. This allows all AWS plugins to support environment variables, shared credential files & profiles, and role assumptions. See the specific plugin README for @@ -31,6 +35,7 @@ time before a new metric is included by the plugin. - [#1278](https://github.com/influxdata/telegraf/pull/1278): RabbitMQ input: made url parameter optional by using DefaultURL (http://localhost:15672) if not specified - [#1197](https://github.com/influxdata/telegraf/pull/1197): Limit AWS GetMetricStatistics requests to 10 per second. - [#1278](https://github.com/influxdata/telegraf/pull/1278) & [#1288](https://github.com/influxdata/telegraf/pull/1288) & [#1295](https://github.com/influxdata/telegraf/pull/1295): RabbitMQ/Apache/InfluxDB inputs: made url(s) parameter optional by using reasonable input defaults if not specified +- [#1296](https://github.com/influxdata/telegraf/issues/1296): Refactor of flush_jitter argument. ### Bugfixes diff --git a/agent/agent.go b/agent/agent.go index 6b6714760..1423ef773 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1,17 +1,15 @@ package agent import ( - cryptorand "crypto/rand" "fmt" "log" - "math/big" - "math/rand" "os" "runtime" "sync" "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/config" "github.com/influxdata/telegraf/internal/models" ) @@ -115,27 +113,16 @@ func (a *Agent) gatherer( ticker := time.NewTicker(interval) defer ticker.Stop() - jitter := a.Config.Agent.CollectionJitter.Duration.Nanoseconds() - for { var outerr error - start := time.Now() acc := NewAccumulator(input.Config, metricC) acc.SetDebug(a.Config.Agent.Debug) acc.setDefaultTags(a.Config.Tags) - if jitter != 0 { - nanoSleep := rand.Int63n(jitter) - d, err := time.ParseDuration(fmt.Sprintf("%dns", nanoSleep)) - if err != nil { - log.Printf("Jittering collection interval failed for plugin %s", - input.Name) - } else { - time.Sleep(d) - } - } + internal.RandomSleep(a.Config.Agent.CollectionJitter.Duration, shutdown) + start := time.Now() gatherWithTimeout(shutdown, input, acc, interval) elapsed := time.Since(start) @@ -274,6 +261,7 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er a.flush() return nil case <-ticker.C: + internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown) a.flush() case m := <-metricC: for _, o := range a.Config.Outputs { @@ -283,35 +271,10 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er } } -// jitterInterval applies the the interval jitter to the flush interval using -// crypto/rand number generator -func jitterInterval(ininterval, injitter time.Duration) time.Duration { - var jitter int64 - outinterval := ininterval - if injitter.Nanoseconds() != 0 { - maxjitter := big.NewInt(injitter.Nanoseconds()) - if j, err := cryptorand.Int(cryptorand.Reader, maxjitter); err == nil { - jitter = j.Int64() - } - outinterval = time.Duration(jitter + ininterval.Nanoseconds()) - } - - if outinterval.Nanoseconds() < time.Duration(500*time.Millisecond).Nanoseconds() { - log.Printf("Flush interval %s too low, setting to 500ms\n", outinterval) - outinterval = time.Duration(500 * time.Millisecond) - } - - return outinterval -} - // Run runs the agent daemon, gathering every Interval func (a *Agent) Run(shutdown chan struct{}) error { var wg sync.WaitGroup - a.Config.Agent.FlushInterval.Duration = jitterInterval( - a.Config.Agent.FlushInterval.Duration, - a.Config.Agent.FlushJitter.Duration) - log.Printf("Agent Config: Interval:%s, Debug:%#v, Quiet:%#v, Hostname:%#v, "+ "Flush Interval:%s \n", a.Config.Agent.Interval.Duration, a.Config.Agent.Debug, a.Config.Agent.Quiet, diff --git a/agent/agent_test.go b/agent/agent_test.go index adbde9a13..a5920ce1c 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -2,7 +2,6 @@ package agent import ( "testing" - "time" "github.com/influxdata/telegraf/internal/config" @@ -110,75 +109,3 @@ func TestAgent_LoadOutput(t *testing.T) { a, _ = NewAgent(c) assert.Equal(t, 3, len(a.Config.Outputs)) } - -func TestAgent_ZeroJitter(t *testing.T) { - flushinterval := jitterInterval(time.Duration(10*time.Second), - time.Duration(0*time.Second)) - - actual := flushinterval.Nanoseconds() - exp := time.Duration(10 * time.Second).Nanoseconds() - - if actual != exp { - t.Errorf("Actual %v, expected %v", actual, exp) - } -} - -func TestAgent_ZeroInterval(t *testing.T) { - min := time.Duration(500 * time.Millisecond).Nanoseconds() - max := time.Duration(5 * time.Second).Nanoseconds() - - for i := 0; i < 1000; i++ { - flushinterval := jitterInterval(time.Duration(0*time.Second), - time.Duration(5*time.Second)) - actual := flushinterval.Nanoseconds() - - if actual > max { - t.Errorf("Didn't expect interval %d to be > %d", actual, max) - break - } - if actual < min { - t.Errorf("Didn't expect interval %d to be < %d", actual, min) - break - } - } -} - -func TestAgent_ZeroBoth(t *testing.T) { - flushinterval := jitterInterval(time.Duration(0*time.Second), - time.Duration(0*time.Second)) - - actual := flushinterval - exp := time.Duration(500 * time.Millisecond) - - if actual != exp { - t.Errorf("Actual %v, expected %v", actual, exp) - } -} - -func TestAgent_JitterMax(t *testing.T) { - max := time.Duration(32 * time.Second).Nanoseconds() - - for i := 0; i < 1000; i++ { - flushinterval := jitterInterval(time.Duration(30*time.Second), - time.Duration(2*time.Second)) - actual := flushinterval.Nanoseconds() - if actual > max { - t.Errorf("Didn't expect interval %d to be > %d", actual, max) - break - } - } -} - -func TestAgent_JitterMin(t *testing.T) { - min := time.Duration(30 * time.Second).Nanoseconds() - - for i := 0; i < 1000; i++ { - flushinterval := jitterInterval(time.Duration(30*time.Second), - time.Duration(2*time.Second)) - actual := flushinterval.Nanoseconds() - if actual < min { - t.Errorf("Didn't expect interval %d to be < %d", actual, min) - break - } - } -} diff --git a/internal/config/config.go b/internal/config/config.go index daaaa10fc..545cec84d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -58,7 +58,6 @@ func NewConfig() *Config { Interval: internal.Duration{Duration: 10 * time.Second}, RoundInterval: true, FlushInterval: internal.Duration{Duration: 10 * time.Second}, - FlushJitter: internal.Duration{Duration: 5 * time.Second}, }, Tags: make(map[string]string), diff --git a/internal/internal.go b/internal/internal.go index 33ee40a26..27a24f021 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -10,6 +10,7 @@ import ( "fmt" "io/ioutil" "log" + "math/big" "os" "os/exec" "strconv" @@ -228,3 +229,27 @@ func CompileFilter(filters []string) (glob.Glob, error) { } return out, err } + +// RandomSleep will sleep for a random amount of time up to max. +// If the shutdown channel is closed, it will return before it has finished +// sleeping. +func RandomSleep(max time.Duration, shutdown chan struct{}) { + if max == 0 { + return + } + maxSleep := big.NewInt(max.Nanoseconds()) + + var sleepns int64 + if j, err := rand.Int(rand.Reader, maxSleep); err == nil { + sleepns = j.Int64() + } + + t := time.NewTimer(time.Nanosecond * time.Duration(sleepns)) + select { + case <-t.C: + return + case <-shutdown: + t.Stop() + return + } +} diff --git a/internal/internal_test.go b/internal/internal_test.go index 341fdd370..31bb5ec61 100644 --- a/internal/internal_test.go +++ b/internal/internal_test.go @@ -137,3 +137,28 @@ func TestCompileFilter(t *testing.T) { assert.True(t, f.Match("mem")) assert.True(t, f.Match("network")) } + +func TestRandomSleep(t *testing.T) { + // test that zero max returns immediately + s := time.Now() + RandomSleep(time.Duration(0), make(chan struct{})) + elapsed := time.Since(s) + assert.True(t, elapsed < time.Millisecond) + + // test that max sleep is respected + s = time.Now() + RandomSleep(time.Millisecond*50, make(chan struct{})) + elapsed = time.Since(s) + assert.True(t, elapsed < time.Millisecond*50) + + // test that shutdown is respected + s = time.Now() + shutdown := make(chan struct{}) + go func() { + time.Sleep(time.Millisecond * 100) + close(shutdown) + }() + RandomSleep(time.Second, shutdown) + elapsed = time.Since(s) + assert.True(t, elapsed < time.Millisecond*150) +}