From 6df3f0fdae91881167c27589e12cb99d0867f65f Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 17 Jan 2017 15:01:12 -0800 Subject: [PATCH] Run scheduled flushes in background doing this unblocks incoming metrics while waiting for a flush to take place. we have to create a semaphore so that we can 'skip' flushes that try to run while a flush is already running. closes #2262 --- CHANGELOG.md | 1 + agent/agent.go | 15 +++++++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c9c85953f..fd37abcb8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ ### Bugfixes - [#2077](https://github.com/influxdata/telegraf/issues/2077): SQL Server Input - Arithmetic overflow error converting numeric to data type int. +- [#2262](https://github.com/influxdata/telegraf/issues/2262): Flush jitter can inhibit metric collection. ## v1.2 [2017-01-00] diff --git a/agent/agent.go b/agent/agent.go index ab64154e0..a9e42643a 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -286,6 +286,7 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er }() ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration) + semaphore := make(chan struct{}, 1) for { select { case <-shutdown: @@ -295,8 +296,18 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er a.flush() return nil case <-ticker.C: - internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown) - a.flush() + go func() { + select { + case semaphore <- struct{}{}: + internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown) + a.flush() + <-semaphore + default: + // skipping this flush because one is already happening + log.Println("W! Skipping a scheduled flush because there is" + + " already a flush ongoing.") + } + }() case metric := <-metricC: // NOTE potential bottleneck here as we put each metric through the // processors serially.