Run scheduled flushes in background
doing this unblocks incoming metrics while waiting for a flush to take place. we have to create a semaphore so that we can 'skip' flushes that try to run while a flush is already running. closes #2262
This commit is contained in:
		
							parent
							
								
									22340ad984
								
							
						
					
					
						commit
						6df3f0fdae
					
				|  | @ -10,6 +10,7 @@ | ||||||
| ### Bugfixes | ### Bugfixes | ||||||
| 
 | 
 | ||||||
| - [#2077](https://github.com/influxdata/telegraf/issues/2077): SQL Server Input - Arithmetic overflow error converting numeric to data type int. | - [#2077](https://github.com/influxdata/telegraf/issues/2077): SQL Server Input - Arithmetic overflow error converting numeric to data type int. | ||||||
|  | - [#2262](https://github.com/influxdata/telegraf/issues/2262): Flush jitter can inhibit metric collection. | ||||||
| 
 | 
 | ||||||
| ## v1.2 [2017-01-00] | ## v1.2 [2017-01-00] | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -286,6 +286,7 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er | ||||||
| 	}() | 	}() | ||||||
| 
 | 
 | ||||||
| 	ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration) | 	ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration) | ||||||
|  | 	semaphore := make(chan struct{}, 1) | ||||||
| 	for { | 	for { | ||||||
| 		select { | 		select { | ||||||
| 		case <-shutdown: | 		case <-shutdown: | ||||||
|  | @ -295,8 +296,18 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er | ||||||
| 			a.flush() | 			a.flush() | ||||||
| 			return nil | 			return nil | ||||||
| 		case <-ticker.C: | 		case <-ticker.C: | ||||||
|  | 			go func() { | ||||||
|  | 				select { | ||||||
|  | 				case semaphore <- struct{}{}: | ||||||
| 					internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown) | 					internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown) | ||||||
| 					a.flush() | 					a.flush() | ||||||
|  | 					<-semaphore | ||||||
|  | 				default: | ||||||
|  | 					// skipping this flush because one is already happening
 | ||||||
|  | 					log.Println("W! Skipping a scheduled flush because there is" + | ||||||
|  | 						" already a flush ongoing.") | ||||||
|  | 				} | ||||||
|  | 			}() | ||||||
| 		case metric := <-metricC: | 		case metric := <-metricC: | ||||||
| 			// NOTE potential bottleneck here as we put each metric through the
 | 			// NOTE potential bottleneck here as we put each metric through the
 | ||||||
| 			// processors serially.
 | 			// processors serially.
 | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue