diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index a4b70ffe3..d9f597bcf 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -15,6 +15,8 @@ import ( "github.com/influxdb/telegraf/plugins/inputs" ) +const UDP_PACKET_SIZE int = 1500 + var dropwarn = "ERROR: Message queue full. Discarding line [%s] " + "You may want to increase allowed_pending_messages in the config\n" @@ -37,10 +39,14 @@ type Statsd struct { DeleteTimings bool ConvertNames bool + // UDPPacketSize is the size of the read packets for the server listening + // for statsd UDP packets. This will default to 1500 bytes. + UDPPacketSize int `toml:"udp_packet_size"` + sync.Mutex - // Channel for all incoming statsd messages - in chan string + // Channel for all incoming statsd packets + in chan []byte done chan struct{} // Cache gauges, counters & sets so they can be aggregated as they arrive @@ -58,13 +64,14 @@ func NewStatsd() *Statsd { // Make data structures s.done = make(chan struct{}) - s.in = make(chan string, s.AllowedPendingMessages) + s.in = make(chan []byte, s.AllowedPendingMessages) s.gauges = make(map[string]cachedgauge) s.counters = make(map[string]cachedcounter) s.sets = make(map[string]cachedset) s.timings = make(map[string]cachedtimings) s.ConvertNames = true + s.UDPPacketSize = UDP_PACKET_SIZE return &s } @@ -139,6 +146,10 @@ const sampleConfig = ` # calculation of percentiles. Raising this limit increases the accuracy # of percentiles but also increases the memory usage and cpu time. percentile_limit = 1000 + + # UDP packet size for the server to listen for. This will depend on the size + # of the packets that the client is sending, which is usually 1500 bytes. + udp_packet_size = 1500 ` func (_ *Statsd) SampleConfig() string { @@ -191,7 +202,7 @@ func (s *Statsd) Gather(acc inputs.Accumulator) error { func (s *Statsd) Start() error { // Make data structures s.done = make(chan struct{}) - s.in = make(chan string, s.AllowedPendingMessages) + s.in = make(chan []byte, s.AllowedPendingMessages) s.gauges = make(map[string]cachedgauge) s.counters = make(map[string]cachedcounter) s.sets = make(map[string]cachedset) @@ -220,36 +231,37 @@ func (s *Statsd) udpListen() error { case <-s.done: return nil default: - buf := make([]byte, 1024) + buf := make([]byte, s.UDPPacketSize) n, _, err := listener.ReadFromUDP(buf) if err != nil { log.Printf("ERROR: %s\n", err.Error()) } - lines := strings.Split(string(buf[:n]), "\n") - for _, line := range lines { - line = strings.TrimSpace(line) - if line != "" { - select { - case s.in <- line: - default: - log.Printf(dropwarn, line) - } - } + select { + case s.in <- buf[:n]: + default: + log.Printf(dropwarn, string(buf[:n])) } } } } -// parser monitors the s.in channel, if there is a line ready, it parses the -// statsd string into a usable metric struct and aggregates the value +// parser monitors the s.in channel, if there is a packet ready, it parses the +// packet into statsd strings and then calls parseStatsdLine, which parses a +// single statsd metric into a struct. func (s *Statsd) parser() error { for { select { case <-s.done: return nil - case line := <-s.in: - s.parseStatsdLine(line) + case packet := <-s.in: + lines := strings.Split(string(packet), "\n") + for _, line := range lines { + line = strings.TrimSpace(line) + if line != "" { + s.parseStatsdLine(line) + } + } } } } @@ -499,6 +511,9 @@ func (s *Statsd) Stop() { func init() { inputs.Add("statsd", func() inputs.Input { - return &Statsd{ConvertNames: true} + return &Statsd{ + ConvertNames: true, + UDPPacketSize: UDP_PACKET_SIZE, + } }) }