Refactor collection_jitter and flush_jitter

use a common function between collection_jitter and flush_jitter. which
creates the same behavior between the two options.

going forward, both jitters will be random sleeps that get re-evaluated
at runtime for every interval (previously only collection_jitter did
this)

also fixes behavior so that both jitters will exit in the event of a
process exit.

closes #1296
This commit is contained in:
Cameron Sparr 2016-05-30 23:24:42 +01:00
parent e809c4e445
commit 892abec025
6 changed files with 59 additions and 115 deletions

View File

@ -2,6 +2,10 @@
### Release Notes ### Release Notes
- `flush_jitter` behavior has been changed. The random jitter will now be
evaluated at every flush interval, rather than once at startup. This makes it
consistent with the behavior of `collection_jitter`.
- All AWS plugins now utilize a standard mechanism for evaluating credentials. - All AWS plugins now utilize a standard mechanism for evaluating credentials.
This allows all AWS plugins to support environment variables, shared credential This allows all AWS plugins to support environment variables, shared credential
files & profiles, and role assumptions. See the specific plugin README for files & profiles, and role assumptions. See the specific plugin README for
@ -31,6 +35,7 @@ time before a new metric is included by the plugin.
- [#1278](https://github.com/influxdata/telegraf/pull/1278): RabbitMQ input: made url parameter optional by using DefaultURL (http://localhost:15672) if not specified - [#1278](https://github.com/influxdata/telegraf/pull/1278): RabbitMQ input: made url parameter optional by using DefaultURL (http://localhost:15672) if not specified
- [#1197](https://github.com/influxdata/telegraf/pull/1197): Limit AWS GetMetricStatistics requests to 10 per second. - [#1197](https://github.com/influxdata/telegraf/pull/1197): Limit AWS GetMetricStatistics requests to 10 per second.
- [#1278](https://github.com/influxdata/telegraf/pull/1278) & [#1288](https://github.com/influxdata/telegraf/pull/1288) & [#1295](https://github.com/influxdata/telegraf/pull/1295): RabbitMQ/Apache/InfluxDB inputs: made url(s) parameter optional by using reasonable input defaults if not specified - [#1278](https://github.com/influxdata/telegraf/pull/1278) & [#1288](https://github.com/influxdata/telegraf/pull/1288) & [#1295](https://github.com/influxdata/telegraf/pull/1295): RabbitMQ/Apache/InfluxDB inputs: made url(s) parameter optional by using reasonable input defaults if not specified
- [#1296](https://github.com/influxdata/telegraf/issues/1296): Refactor of flush_jitter argument.
### Bugfixes ### Bugfixes

View File

@ -1,17 +1,15 @@
package agent package agent
import ( import (
cryptorand "crypto/rand"
"fmt" "fmt"
"log" "log"
"math/big"
"math/rand"
"os" "os"
"runtime" "runtime"
"sync" "sync"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/config" "github.com/influxdata/telegraf/internal/config"
"github.com/influxdata/telegraf/internal/models" "github.com/influxdata/telegraf/internal/models"
) )
@ -115,27 +113,16 @@ func (a *Agent) gatherer(
ticker := time.NewTicker(interval) ticker := time.NewTicker(interval)
defer ticker.Stop() defer ticker.Stop()
jitter := a.Config.Agent.CollectionJitter.Duration.Nanoseconds()
for { for {
var outerr error var outerr error
start := time.Now()
acc := NewAccumulator(input.Config, metricC) acc := NewAccumulator(input.Config, metricC)
acc.SetDebug(a.Config.Agent.Debug) acc.SetDebug(a.Config.Agent.Debug)
acc.setDefaultTags(a.Config.Tags) acc.setDefaultTags(a.Config.Tags)
if jitter != 0 { internal.RandomSleep(a.Config.Agent.CollectionJitter.Duration, shutdown)
nanoSleep := rand.Int63n(jitter)
d, err := time.ParseDuration(fmt.Sprintf("%dns", nanoSleep))
if err != nil {
log.Printf("Jittering collection interval failed for plugin %s",
input.Name)
} else {
time.Sleep(d)
}
}
start := time.Now()
gatherWithTimeout(shutdown, input, acc, interval) gatherWithTimeout(shutdown, input, acc, interval)
elapsed := time.Since(start) elapsed := time.Since(start)
@ -274,6 +261,7 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
a.flush() a.flush()
return nil return nil
case <-ticker.C: case <-ticker.C:
internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
a.flush() a.flush()
case m := <-metricC: case m := <-metricC:
for _, o := range a.Config.Outputs { for _, o := range a.Config.Outputs {
@ -283,35 +271,10 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
} }
} }
// jitterInterval applies the the interval jitter to the flush interval using
// crypto/rand number generator
func jitterInterval(ininterval, injitter time.Duration) time.Duration {
var jitter int64
outinterval := ininterval
if injitter.Nanoseconds() != 0 {
maxjitter := big.NewInt(injitter.Nanoseconds())
if j, err := cryptorand.Int(cryptorand.Reader, maxjitter); err == nil {
jitter = j.Int64()
}
outinterval = time.Duration(jitter + ininterval.Nanoseconds())
}
if outinterval.Nanoseconds() < time.Duration(500*time.Millisecond).Nanoseconds() {
log.Printf("Flush interval %s too low, setting to 500ms\n", outinterval)
outinterval = time.Duration(500 * time.Millisecond)
}
return outinterval
}
// Run runs the agent daemon, gathering every Interval // Run runs the agent daemon, gathering every Interval
func (a *Agent) Run(shutdown chan struct{}) error { func (a *Agent) Run(shutdown chan struct{}) error {
var wg sync.WaitGroup var wg sync.WaitGroup
a.Config.Agent.FlushInterval.Duration = jitterInterval(
a.Config.Agent.FlushInterval.Duration,
a.Config.Agent.FlushJitter.Duration)
log.Printf("Agent Config: Interval:%s, Debug:%#v, Quiet:%#v, Hostname:%#v, "+ log.Printf("Agent Config: Interval:%s, Debug:%#v, Quiet:%#v, Hostname:%#v, "+
"Flush Interval:%s \n", "Flush Interval:%s \n",
a.Config.Agent.Interval.Duration, a.Config.Agent.Debug, a.Config.Agent.Quiet, a.Config.Agent.Interval.Duration, a.Config.Agent.Debug, a.Config.Agent.Quiet,

View File

@ -2,7 +2,6 @@ package agent
import ( import (
"testing" "testing"
"time"
"github.com/influxdata/telegraf/internal/config" "github.com/influxdata/telegraf/internal/config"
@ -110,75 +109,3 @@ func TestAgent_LoadOutput(t *testing.T) {
a, _ = NewAgent(c) a, _ = NewAgent(c)
assert.Equal(t, 3, len(a.Config.Outputs)) assert.Equal(t, 3, len(a.Config.Outputs))
} }
func TestAgent_ZeroJitter(t *testing.T) {
flushinterval := jitterInterval(time.Duration(10*time.Second),
time.Duration(0*time.Second))
actual := flushinterval.Nanoseconds()
exp := time.Duration(10 * time.Second).Nanoseconds()
if actual != exp {
t.Errorf("Actual %v, expected %v", actual, exp)
}
}
func TestAgent_ZeroInterval(t *testing.T) {
min := time.Duration(500 * time.Millisecond).Nanoseconds()
max := time.Duration(5 * time.Second).Nanoseconds()
for i := 0; i < 1000; i++ {
flushinterval := jitterInterval(time.Duration(0*time.Second),
time.Duration(5*time.Second))
actual := flushinterval.Nanoseconds()
if actual > max {
t.Errorf("Didn't expect interval %d to be > %d", actual, max)
break
}
if actual < min {
t.Errorf("Didn't expect interval %d to be < %d", actual, min)
break
}
}
}
func TestAgent_ZeroBoth(t *testing.T) {
flushinterval := jitterInterval(time.Duration(0*time.Second),
time.Duration(0*time.Second))
actual := flushinterval
exp := time.Duration(500 * time.Millisecond)
if actual != exp {
t.Errorf("Actual %v, expected %v", actual, exp)
}
}
func TestAgent_JitterMax(t *testing.T) {
max := time.Duration(32 * time.Second).Nanoseconds()
for i := 0; i < 1000; i++ {
flushinterval := jitterInterval(time.Duration(30*time.Second),
time.Duration(2*time.Second))
actual := flushinterval.Nanoseconds()
if actual > max {
t.Errorf("Didn't expect interval %d to be > %d", actual, max)
break
}
}
}
func TestAgent_JitterMin(t *testing.T) {
min := time.Duration(30 * time.Second).Nanoseconds()
for i := 0; i < 1000; i++ {
flushinterval := jitterInterval(time.Duration(30*time.Second),
time.Duration(2*time.Second))
actual := flushinterval.Nanoseconds()
if actual < min {
t.Errorf("Didn't expect interval %d to be < %d", actual, min)
break
}
}
}

View File

@ -58,7 +58,6 @@ func NewConfig() *Config {
Interval: internal.Duration{Duration: 10 * time.Second}, Interval: internal.Duration{Duration: 10 * time.Second},
RoundInterval: true, RoundInterval: true,
FlushInterval: internal.Duration{Duration: 10 * time.Second}, FlushInterval: internal.Duration{Duration: 10 * time.Second},
FlushJitter: internal.Duration{Duration: 5 * time.Second},
}, },
Tags: make(map[string]string), Tags: make(map[string]string),

View File

@ -10,6 +10,7 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"math/big"
"os" "os"
"os/exec" "os/exec"
"strconv" "strconv"
@ -228,3 +229,27 @@ func CompileFilter(filters []string) (glob.Glob, error) {
} }
return out, err return out, err
} }
// RandomSleep will sleep for a random amount of time up to max.
// If the shutdown channel is closed, it will return before it has finished
// sleeping.
func RandomSleep(max time.Duration, shutdown chan struct{}) {
if max == 0 {
return
}
maxSleep := big.NewInt(max.Nanoseconds())
var sleepns int64
if j, err := rand.Int(rand.Reader, maxSleep); err == nil {
sleepns = j.Int64()
}
t := time.NewTimer(time.Nanosecond * time.Duration(sleepns))
select {
case <-t.C:
return
case <-shutdown:
t.Stop()
return
}
}

View File

@ -137,3 +137,28 @@ func TestCompileFilter(t *testing.T) {
assert.True(t, f.Match("mem")) assert.True(t, f.Match("mem"))
assert.True(t, f.Match("network")) assert.True(t, f.Match("network"))
} }
func TestRandomSleep(t *testing.T) {
// test that zero max returns immediately
s := time.Now()
RandomSleep(time.Duration(0), make(chan struct{}))
elapsed := time.Since(s)
assert.True(t, elapsed < time.Millisecond)
// test that max sleep is respected
s = time.Now()
RandomSleep(time.Millisecond*50, make(chan struct{}))
elapsed = time.Since(s)
assert.True(t, elapsed < time.Millisecond*50)
// test that shutdown is respected
s = time.Now()
shutdown := make(chan struct{})
go func() {
time.Sleep(time.Millisecond * 100)
close(shutdown)
}()
RandomSleep(time.Second, shutdown)
elapsed = time.Since(s)
assert.True(t, elapsed < time.Millisecond*150)
}