diff --git a/CHANGELOG.md b/CHANGELOG.md index eb22d1896..6eebe7720 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,8 +14,7 @@ even interval. This means that `interval="10s"` will collect every :00, :10, etc To ease scale concerns, flushing will be "jittered" by a random amount so that all Telegraf instances do not flush at the same time. Both of these options can be controlled via the `round_interval` and `flush_jitter` config options. -- Telegraf will now retry metric flushes, twice by default. This can be configued -via the `flush_retries` agent config option. +- Telegraf will now retry metric flushes twice ### Features - [#205](https://github.com/influxdb/telegraf/issues/205): Include per-db redis keyspace info diff --git a/agent.go b/agent.go index 8a5c47391..afde29467 100644 --- a/agent.go +++ b/agent.go @@ -1,9 +1,10 @@ package telegraf import ( + "crypto/rand" "fmt" "log" - "math/rand" + "math/big" "os" "sort" "sync" @@ -381,9 +382,10 @@ func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) er // Inelegant, but this sleep is to allow the Gather threads to run, so that // the flusher will flush after metrics are collected. time.Sleep(time.Millisecond * 100) + ticker := time.NewTicker(a.FlushInterval.Duration) points := make([]*client.Point, 0) - jitter := rand.Int63n(int64(a.FlushJitter.Duration)) + for { select { case <-shutdown: @@ -391,15 +393,7 @@ func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) er a.flush(points, shutdown, true) return nil case <-ticker.C: - timer := time.NewTimer(time.Duration(jitter)) - select { - case <-timer.C: - a.flush(points, shutdown, false) - case <-shutdown: - log.Println("Hang on, flushing any cached points before shutdown") - a.flush(points, shutdown, true) - return nil - } + a.flush(points, shutdown, false) points = make([]*client.Point, 0) case pt := <-pointChan: points = append(points, pt) @@ -407,10 +401,38 @@ func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) er } } +// jitterInterval applies the the interval jitter to the flush interval using +// crypto/rand number generator +func jitterInterval(ininterval, injitter time.Duration) time.Duration { + var jitter int64 + outinterval := ininterval + if injitter.Nanoseconds() != 0 { + maxjitter := big.NewInt(injitter.Nanoseconds()) + if j, err := rand.Int(rand.Reader, maxjitter); err == nil { + jitter = j.Int64() + } + outinterval = time.Duration(jitter + ininterval.Nanoseconds()) + } + + if outinterval.Nanoseconds() < time.Duration(500*time.Millisecond).Nanoseconds() { + log.Printf("Flush interval %s too low, setting to 500ms\n", outinterval) + outinterval = time.Duration(500 * time.Millisecond) + } + + return outinterval +} + // Run runs the agent daemon, gathering every Interval func (a *Agent) Run(shutdown chan struct{}) error { var wg sync.WaitGroup + a.FlushInterval.Duration = jitterInterval(a.FlushInterval.Duration, + a.FlushJitter.Duration) + + log.Printf("Agent Config: Interval:%s, Debug:%#v, Hostname:%#v, "+ + "Flush Interval:%s\n", + a.Interval, a.Debug, a.Hostname, a.FlushInterval) + // channel shared between all plugin threads for accumulating points pointChan := make(chan *client.Point, 1000) diff --git a/agent_test.go b/agent_test.go index 6cd5cddda..4d693cf65 100644 --- a/agent_test.go +++ b/agent_test.go @@ -3,6 +3,9 @@ package telegraf import ( "github.com/stretchr/testify/assert" "testing" + "time" + + "github.com/influxdb/telegraf/duration" // needing to load the plugins _ "github.com/influxdb/telegraf/plugins/all" @@ -55,3 +58,97 @@ func TestAgent_LoadOutput(t *testing.T) { outputsEnabled, _ = a.LoadOutputs([]string{"influxdb", "foo", "kafka", "bar"}, config) assert.Equal(t, 2, len(outputsEnabled)) } + +func TestAgent_ZeroJitter(t *testing.T) { + a := &Agent{ + FlushInterval: duration.Duration{10 * time.Second}, + FlushJitter: duration.Duration{0 * time.Second}, + } + flushinterval := jitterInterval(a.FlushInterval.Duration, + a.FlushJitter.Duration) + + actual := flushinterval.Nanoseconds() + exp := time.Duration(10 * time.Second).Nanoseconds() + + if actual != exp { + t.Errorf("Actual %v, expected %v", actual, exp) + } +} + +func TestAgent_ZeroInterval(t *testing.T) { + min := time.Duration(500 * time.Millisecond).Nanoseconds() + max := time.Duration(5 * time.Second).Nanoseconds() + + for i := 0; i < 1000; i++ { + a := &Agent{ + FlushInterval: duration.Duration{0 * time.Second}, + FlushJitter: duration.Duration{5 * time.Second}, + } + + flushinterval := jitterInterval(a.FlushInterval.Duration, + a.FlushJitter.Duration) + actual := flushinterval.Nanoseconds() + + if actual > max { + t.Errorf("Didn't expect interval %d to be > %d", actual, max) + break + } + if actual < min { + t.Errorf("Didn't expect interval %d to be < %d", actual, min) + break + } + } +} + +func TestAgent_ZeroBoth(t *testing.T) { + a := &Agent{ + FlushInterval: duration.Duration{0 * time.Second}, + FlushJitter: duration.Duration{0 * time.Second}, + } + + flushinterval := jitterInterval(a.FlushInterval.Duration, + a.FlushJitter.Duration) + + actual := flushinterval + exp := time.Duration(500 * time.Millisecond) + + if actual != exp { + t.Errorf("Actual %v, expected %v", actual, exp) + } +} + +func TestAgent_JitterMax(t *testing.T) { + max := time.Duration(32 * time.Second).Nanoseconds() + + for i := 0; i < 1000; i++ { + a := &Agent{ + FlushInterval: duration.Duration{30 * time.Second}, + FlushJitter: duration.Duration{2 * time.Second}, + } + flushinterval := jitterInterval(a.FlushInterval.Duration, + a.FlushJitter.Duration) + actual := flushinterval.Nanoseconds() + if actual > max { + t.Errorf("Didn't expect interval %d to be > %d", actual, max) + break + } + } +} + +func TestAgent_JitterMin(t *testing.T) { + min := time.Duration(30 * time.Second).Nanoseconds() + + for i := 0; i < 1000; i++ { + a := &Agent{ + FlushInterval: duration.Duration{30 * time.Second}, + FlushJitter: duration.Duration{2 * time.Second}, + } + flushinterval := jitterInterval(a.FlushInterval.Duration, + a.FlushJitter.Duration) + actual := flushinterval.Nanoseconds() + if actual < min { + t.Errorf("Didn't expect interval %d to be < %d", actual, min) + break + } + } +} diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index ceed90eaf..94e416861 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -143,9 +143,6 @@ func main() { log.Printf("Starting Telegraf (version %s)\n", Version) log.Printf("Loaded outputs: %s", strings.Join(outputs, " ")) log.Printf("Loaded plugins: %s", strings.Join(plugins, " ")) - log.Printf("Agent Config: Interval:%s, Debug:%#v, Hostname:%#v, "+ - "Flush Interval:%s\n", - ag.Interval, ag.Debug, ag.Hostname, ag.FlushInterval) log.Printf("Tags enabled: %s", config.ListTags()) if *fPidfile != "" { diff --git a/config.go b/config.go index 5f1cbdf96..e6b653015 100644 --- a/config.go +++ b/config.go @@ -230,13 +230,13 @@ var header = `# Telegraf configuration # ie, if interval="10s" then always collect on :00, :10, :20, etc. round_interval = true - # Default data flushing interval for all outputs + # Default data flushing interval for all outputs. You should not set this below + # interval. Maximum flush_interval will be flush_interval + flush_jitter flush_interval = "10s" - # Jitter the flush interval by a random range - # ie, a jitter of 5s and interval 10s means flush will happen every 10-15s - flush_jitter = "5s" - # Number of times to retry each data flush - flush_retries = 2 + # Jitter the flush interval by a random amount. This is primarily to avoid + # large write spikes for users running a large number of telegraf instances. + # ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s + flush_jitter = "0s" # Run telegraf in debug mode debug = false