Change default statsd packet size to 1500, make configurable
Also modifying the internal UDP listener/parser code to make it able to handle higher load. The udp listener will no longer do any parsing or string conversion. It will simply read UDP packets as bytes and put them into a channel. The parser thread will now deal with splitting the UDP metrics into separated strings. This could probably be made even better by leaving everything as byte arrays. fixes #543
This commit is contained in:
		
							parent
							
								
									6a50fceea4
								
							
						
					
					
						commit
						839651fadb
					
				|  | @ -15,6 +15,8 @@ import ( | ||||||
| 	"github.com/influxdb/telegraf/plugins/inputs" | 	"github.com/influxdb/telegraf/plugins/inputs" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | const UDP_PACKET_SIZE int = 1500 | ||||||
|  | 
 | ||||||
| var dropwarn = "ERROR: Message queue full. Discarding line [%s] " + | var dropwarn = "ERROR: Message queue full. Discarding line [%s] " + | ||||||
| 	"You may want to increase allowed_pending_messages in the config\n" | 	"You may want to increase allowed_pending_messages in the config\n" | ||||||
| 
 | 
 | ||||||
|  | @ -37,10 +39,14 @@ type Statsd struct { | ||||||
| 	DeleteTimings  bool | 	DeleteTimings  bool | ||||||
| 	ConvertNames   bool | 	ConvertNames   bool | ||||||
| 
 | 
 | ||||||
|  | 	// UDPPacketSize is the size of the read packets for the server listening
 | ||||||
|  | 	// for statsd UDP packets. This will default to 1500 bytes.
 | ||||||
|  | 	UDPPacketSize int `toml:"udp_packet_size"` | ||||||
|  | 
 | ||||||
| 	sync.Mutex | 	sync.Mutex | ||||||
| 
 | 
 | ||||||
| 	// Channel for all incoming statsd messages
 | 	// Channel for all incoming statsd packets
 | ||||||
| 	in   chan string | 	in   chan []byte | ||||||
| 	done chan struct{} | 	done chan struct{} | ||||||
| 
 | 
 | ||||||
| 	// Cache gauges, counters & sets so they can be aggregated as they arrive
 | 	// Cache gauges, counters & sets so they can be aggregated as they arrive
 | ||||||
|  | @ -58,13 +64,14 @@ func NewStatsd() *Statsd { | ||||||
| 
 | 
 | ||||||
| 	// Make data structures
 | 	// Make data structures
 | ||||||
| 	s.done = make(chan struct{}) | 	s.done = make(chan struct{}) | ||||||
| 	s.in = make(chan string, s.AllowedPendingMessages) | 	s.in = make(chan []byte, s.AllowedPendingMessages) | ||||||
| 	s.gauges = make(map[string]cachedgauge) | 	s.gauges = make(map[string]cachedgauge) | ||||||
| 	s.counters = make(map[string]cachedcounter) | 	s.counters = make(map[string]cachedcounter) | ||||||
| 	s.sets = make(map[string]cachedset) | 	s.sets = make(map[string]cachedset) | ||||||
| 	s.timings = make(map[string]cachedtimings) | 	s.timings = make(map[string]cachedtimings) | ||||||
| 
 | 
 | ||||||
| 	s.ConvertNames = true | 	s.ConvertNames = true | ||||||
|  | 	s.UDPPacketSize = UDP_PACKET_SIZE | ||||||
| 
 | 
 | ||||||
| 	return &s | 	return &s | ||||||
| } | } | ||||||
|  | @ -139,6 +146,10 @@ const sampleConfig = ` | ||||||
|   # calculation of percentiles. Raising this limit increases the accuracy |   # calculation of percentiles. Raising this limit increases the accuracy | ||||||
|   # of percentiles but also increases the memory usage and cpu time. |   # of percentiles but also increases the memory usage and cpu time. | ||||||
|   percentile_limit = 1000 |   percentile_limit = 1000 | ||||||
|  | 
 | ||||||
|  |   # UDP packet size for the server to listen for. This will depend on the size | ||||||
|  |   # of the packets that the client is sending, which is usually 1500 bytes. | ||||||
|  |   udp_packet_size = 1500 | ||||||
| ` | ` | ||||||
| 
 | 
 | ||||||
| func (_ *Statsd) SampleConfig() string { | func (_ *Statsd) SampleConfig() string { | ||||||
|  | @ -191,7 +202,7 @@ func (s *Statsd) Gather(acc inputs.Accumulator) error { | ||||||
| func (s *Statsd) Start() error { | func (s *Statsd) Start() error { | ||||||
| 	// Make data structures
 | 	// Make data structures
 | ||||||
| 	s.done = make(chan struct{}) | 	s.done = make(chan struct{}) | ||||||
| 	s.in = make(chan string, s.AllowedPendingMessages) | 	s.in = make(chan []byte, s.AllowedPendingMessages) | ||||||
| 	s.gauges = make(map[string]cachedgauge) | 	s.gauges = make(map[string]cachedgauge) | ||||||
| 	s.counters = make(map[string]cachedcounter) | 	s.counters = make(map[string]cachedcounter) | ||||||
| 	s.sets = make(map[string]cachedset) | 	s.sets = make(map[string]cachedset) | ||||||
|  | @ -220,36 +231,37 @@ func (s *Statsd) udpListen() error { | ||||||
| 		case <-s.done: | 		case <-s.done: | ||||||
| 			return nil | 			return nil | ||||||
| 		default: | 		default: | ||||||
| 			buf := make([]byte, 1024) | 			buf := make([]byte, s.UDPPacketSize) | ||||||
| 			n, _, err := listener.ReadFromUDP(buf) | 			n, _, err := listener.ReadFromUDP(buf) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				log.Printf("ERROR: %s\n", err.Error()) | 				log.Printf("ERROR: %s\n", err.Error()) | ||||||
| 			} | 			} | ||||||
| 
 | 
 | ||||||
| 			lines := strings.Split(string(buf[:n]), "\n") | 			select { | ||||||
| 			for _, line := range lines { | 			case s.in <- buf[:n]: | ||||||
| 				line = strings.TrimSpace(line) | 			default: | ||||||
| 				if line != "" { | 				log.Printf(dropwarn, string(buf[:n])) | ||||||
| 					select { |  | ||||||
| 					case s.in <- line: |  | ||||||
| 					default: |  | ||||||
| 						log.Printf(dropwarn, line) |  | ||||||
| 					} |  | ||||||
| 				} |  | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // parser monitors the s.in channel, if there is a line ready, it parses the
 | // parser monitors the s.in channel, if there is a packet ready, it parses the
 | ||||||
| // statsd string into a usable metric struct and aggregates the value
 | // packet into statsd strings and then calls parseStatsdLine, which parses a
 | ||||||
|  | // single statsd metric into a struct.
 | ||||||
| func (s *Statsd) parser() error { | func (s *Statsd) parser() error { | ||||||
| 	for { | 	for { | ||||||
| 		select { | 		select { | ||||||
| 		case <-s.done: | 		case <-s.done: | ||||||
| 			return nil | 			return nil | ||||||
| 		case line := <-s.in: | 		case packet := <-s.in: | ||||||
| 			s.parseStatsdLine(line) | 			lines := strings.Split(string(packet), "\n") | ||||||
|  | 			for _, line := range lines { | ||||||
|  | 				line = strings.TrimSpace(line) | ||||||
|  | 				if line != "" { | ||||||
|  | 					s.parseStatsdLine(line) | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | @ -499,6 +511,9 @@ func (s *Statsd) Stop() { | ||||||
| 
 | 
 | ||||||
| func init() { | func init() { | ||||||
| 	inputs.Add("statsd", func() inputs.Input { | 	inputs.Add("statsd", func() inputs.Input { | ||||||
| 		return &Statsd{ConvertNames: true} | 		return &Statsd{ | ||||||
|  | 			ConvertNames:  true, | ||||||
|  | 			UDPPacketSize: UDP_PACKET_SIZE, | ||||||
|  | 		} | ||||||
| 	}) | 	}) | ||||||
| } | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue