Run scheduled flushes in background

doing this unblocks incoming metrics while waiting for a flush to take
place.

we have to create a semaphore so that we can
'skip' flushes that try to run while a flush is already running.

closes #2262
This commit is contained in:
Cameron Sparr 2017-01-17 15:01:12 -08:00
parent 4d72cd7c9f
commit e06f1e0323
2 changed files with 14 additions and 2 deletions

View File

@ -10,6 +10,7 @@
### Bugfixes ### Bugfixes
- [#2077](https://github.com/influxdata/telegraf/issues/2077): SQL Server Input - Arithmetic overflow error converting numeric to data type int. - [#2077](https://github.com/influxdata/telegraf/issues/2077): SQL Server Input - Arithmetic overflow error converting numeric to data type int.
- [#2262](https://github.com/influxdata/telegraf/issues/2262): Flush jitter can inhibit metric collection.
## v1.2 [2017-01-00] ## v1.2 [2017-01-00]

View File

@ -286,6 +286,7 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
}() }()
ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration) ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration)
semaphore := make(chan struct{}, 1)
for { for {
select { select {
case <-shutdown: case <-shutdown:
@ -295,8 +296,18 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
a.flush() a.flush()
return nil return nil
case <-ticker.C: case <-ticker.C:
internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown) go func() {
a.flush() select {
case semaphore <- struct{}{}:
internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
a.flush()
<-semaphore
default:
// skipping this flush because one is already happening
log.Println("W! Skipping a scheduled flush because there is" +
" already a flush ongoing.")
}
}()
case metric := <-metricC: case metric := <-metricC:
// NOTE potential bottleneck here as we put each metric through the // NOTE potential bottleneck here as we put each metric through the
// processors serially. // processors serially.