parent
							
								
									f3b553712a
								
							
						
					
					
						commit
						d3a5cca1bc
					
				|  | @ -13,6 +13,7 @@ | ||||||
| - [#522](https://github.com/influxdata/telegraf/pull/522): Phusion passenger input plugin. Thanks @kureikain! | - [#522](https://github.com/influxdata/telegraf/pull/522): Phusion passenger input plugin. Thanks @kureikain! | ||||||
| - [#541](https://github.com/influxdata/telegraf/pull/541): Kafka output TLS cert support. Thanks @Ormod! | - [#541](https://github.com/influxdata/telegraf/pull/541): Kafka output TLS cert support. Thanks @Ormod! | ||||||
| - [#551](https://github.com/influxdb/telegraf/pull/551): Statsd UDP read packet size now defaults to 1500 bytes, and is configurable. | - [#551](https://github.com/influxdb/telegraf/pull/551): Statsd UDP read packet size now defaults to 1500 bytes, and is configurable. | ||||||
|  | - [#552](https://github.com/influxdata/telegraf/pull/552): Support for collection interval jittering. | ||||||
| 
 | 
 | ||||||
| ### Bugfixes | ### Bugfixes | ||||||
| - [#506](https://github.com/influxdb/telegraf/pull/506): Ping input doesn't return response time metric when timeout. Thanks @titilambert! | - [#506](https://github.com/influxdb/telegraf/pull/506): Ping input doesn't return response time metric when timeout. Thanks @titilambert! | ||||||
|  |  | ||||||
							
								
								
									
										19
									
								
								agent.go
								
								
								
								
							
							
						
						
									
										19
									
								
								agent.go
								
								
								
								
							|  | @ -1,10 +1,11 @@ | ||||||
| package telegraf | package telegraf | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"crypto/rand" | 	cryptorand "crypto/rand" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"log" | 	"log" | ||||||
| 	"math/big" | 	"math/big" | ||||||
|  | 	"math/rand" | ||||||
| 	"os" | 	"os" | ||||||
| 	"sync" | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
|  | @ -92,6 +93,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error { | ||||||
| 
 | 
 | ||||||
| 	start := time.Now() | 	start := time.Now() | ||||||
| 	counter := 0 | 	counter := 0 | ||||||
|  | 	jitter := a.Config.Agent.CollectionJitter.Duration.Nanoseconds() | ||||||
| 	for _, input := range a.Config.Inputs { | 	for _, input := range a.Config.Inputs { | ||||||
| 		if input.Config.Interval != 0 { | 		if input.Config.Interval != 0 { | ||||||
| 			continue | 			continue | ||||||
|  | @ -104,9 +106,19 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error { | ||||||
| 
 | 
 | ||||||
| 			acc := NewAccumulator(input.Config, pointChan) | 			acc := NewAccumulator(input.Config, pointChan) | ||||||
| 			acc.SetDebug(a.Config.Agent.Debug) | 			acc.SetDebug(a.Config.Agent.Debug) | ||||||
| 			// acc.SetPrefix(input.Name + "_")
 |  | ||||||
| 			acc.SetDefaultTags(a.Config.Tags) | 			acc.SetDefaultTags(a.Config.Tags) | ||||||
| 
 | 
 | ||||||
|  | 			if jitter != 0 { | ||||||
|  | 				nanoSleep := rand.Int63n(jitter) | ||||||
|  | 				d, err := time.ParseDuration(fmt.Sprintf("%dns", nanoSleep)) | ||||||
|  | 				if err != nil { | ||||||
|  | 					log.Printf("Jittering collection interval failed for plugin %s", | ||||||
|  | 						input.Name) | ||||||
|  | 				} else { | ||||||
|  | 					time.Sleep(d) | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
| 			if err := input.Input.Gather(acc); err != nil { | 			if err := input.Input.Gather(acc); err != nil { | ||||||
| 				log.Printf("Error in input [%s]: %s", input.Name, err) | 				log.Printf("Error in input [%s]: %s", input.Name, err) | ||||||
| 			} | 			} | ||||||
|  | @ -143,7 +155,6 @@ func (a *Agent) gatherSeparate( | ||||||
| 
 | 
 | ||||||
| 		acc := NewAccumulator(input.Config, pointChan) | 		acc := NewAccumulator(input.Config, pointChan) | ||||||
| 		acc.SetDebug(a.Config.Agent.Debug) | 		acc.SetDebug(a.Config.Agent.Debug) | ||||||
| 		// acc.SetPrefix(input.Name + "_")
 |  | ||||||
| 		acc.SetDefaultTags(a.Config.Tags) | 		acc.SetDefaultTags(a.Config.Tags) | ||||||
| 
 | 
 | ||||||
| 		if err := input.Input.Gather(acc); err != nil { | 		if err := input.Input.Gather(acc); err != nil { | ||||||
|  | @ -315,7 +326,7 @@ func jitterInterval(ininterval, injitter time.Duration) time.Duration { | ||||||
| 	outinterval := ininterval | 	outinterval := ininterval | ||||||
| 	if injitter.Nanoseconds() != 0 { | 	if injitter.Nanoseconds() != 0 { | ||||||
| 		maxjitter := big.NewInt(injitter.Nanoseconds()) | 		maxjitter := big.NewInt(injitter.Nanoseconds()) | ||||||
| 		if j, err := rand.Int(rand.Reader, maxjitter); err == nil { | 		if j, err := cryptorand.Int(cryptorand.Reader, maxjitter); err == nil { | ||||||
| 			jitter = j.Int64() | 			jitter = j.Int64() | ||||||
| 		} | 		} | ||||||
| 		outinterval = time.Duration(jitter + ininterval.Nanoseconds()) | 		outinterval = time.Duration(jitter + ininterval.Nanoseconds()) | ||||||
|  |  | ||||||
|  | @ -61,13 +61,22 @@ type AgentConfig struct { | ||||||
| 	//     ie, if Interval=10s then always collect on :00, :10, :20, etc.
 | 	//     ie, if Interval=10s then always collect on :00, :10, :20, etc.
 | ||||||
| 	RoundInterval bool | 	RoundInterval bool | ||||||
| 
 | 
 | ||||||
|  | 	// CollectionJitter is used to jitter the collection by a random amount.
 | ||||||
|  | 	// Each plugin will sleep for a random time within jitter before collecting.
 | ||||||
|  | 	// This can be used to avoid many plugins querying things like sysfs at the
 | ||||||
|  | 	// same time, which can have a measurable effect on the system.
 | ||||||
|  | 	CollectionJitter internal.Duration | ||||||
|  | 
 | ||||||
| 	// Interval at which to flush data
 | 	// Interval at which to flush data
 | ||||||
| 	FlushInterval internal.Duration | 	FlushInterval internal.Duration | ||||||
| 
 | 
 | ||||||
| 	// FlushRetries is the number of times to retry each data flush
 | 	// FlushRetries is the number of times to retry each data flush
 | ||||||
| 	FlushRetries int | 	FlushRetries int | ||||||
| 
 | 
 | ||||||
| 	// FlushJitter tells
 | 	// FlushJitter Jitters the flush interval by a random amount.
 | ||||||
|  | 	// This is primarily to avoid large write spikes for users running a large
 | ||||||
|  | 	// number of telegraf instances.
 | ||||||
|  | 	// ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
 | ||||||
| 	FlushJitter internal.Duration | 	FlushJitter internal.Duration | ||||||
| 
 | 
 | ||||||
| 	// TODO(cam): Remove UTC and Precision parameters, they are no longer
 | 	// TODO(cam): Remove UTC and Precision parameters, they are no longer
 | ||||||
|  | @ -271,6 +280,11 @@ var header = `# Telegraf configuration | ||||||
|   # Rounds collection interval to 'interval' |   # Rounds collection interval to 'interval' | ||||||
|   # ie, if interval="10s" then always collect on :00, :10, :20, etc. |   # ie, if interval="10s" then always collect on :00, :10, :20, etc. | ||||||
|   round_interval = true |   round_interval = true | ||||||
|  |   # Collection jitter is used to jitter the collection by a random amount. | ||||||
|  |   # Each plugin will sleep for a random time within jitter before collecting. | ||||||
|  |   # This can be used to avoid many plugins querying things like sysfs at the | ||||||
|  |   # same time, which can have a measurable effect on the system. | ||||||
|  |   collection_jitter = "0s" | ||||||
| 
 | 
 | ||||||
|   # Default data flushing interval for all outputs. You should not set this below |   # Default data flushing interval for all outputs. You should not set this below | ||||||
|   # interval. Maximum flush_interval will be flush_interval + flush_jitter |   # interval. Maximum flush_interval will be flush_interval + flush_jitter | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue