diff --git a/CHANGELOG.md b/CHANGELOG.md index a742541ab..9c7d0f507 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## v0.12.1 [unreleased] ### Features +- [#976](https://github.com/influxdata/telegraf/pull/976): Reduce allocations in the UDP and statsd inputs. ### Bugfixes - [#968](https://github.com/influxdata/telegraf/issues/968): Processes plugin gets unknown state when spaces are in (command name) diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index d31e6bfc9..84687511e 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -18,7 +18,9 @@ import ( ) const ( - UDP_PACKET_SIZE int = 1500 + // UDP packet limit, see + // https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure + UDP_PACKET_SIZE int = 65507 defaultFieldName = "value" @@ -157,10 +159,6 @@ const sampleConfig = ` ## calculation of percentiles. Raising this limit increases the accuracy ## of percentiles but also increases the memory usage and cpu time. percentile_limit = 1000 - - ## UDP packet size for the server to listen for. This will depend on the size - ## of the packets that the client is sending, which is usually 1500 bytes. - udp_packet_size = 1500 ` func (_ *Statsd) SampleConfig() string { @@ -274,12 +272,12 @@ func (s *Statsd) udpListen() error { } log.Println("Statsd listener listening on: ", s.listener.LocalAddr().String()) + buf := make([]byte, s.UDPPacketSize) for { select { case <-s.done: return nil default: - buf := make([]byte, s.UDPPacketSize) n, _, err := s.listener.ReadFromUDP(buf) if err != nil && !strings.Contains(err.Error(), "closed network") { log.Printf("ERROR READ: %s\n", err.Error()) @@ -300,11 +298,12 @@ func (s *Statsd) udpListen() error { // single statsd metric into a struct. func (s *Statsd) parser() error { defer s.wg.Done() + var packet []byte for { select { case <-s.done: return nil - case packet := <-s.in: + case packet = <-s.in: lines := strings.Split(string(packet), "\n") for _, line := range lines { line = strings.TrimSpace(line) @@ -631,8 +630,8 @@ func (s *Statsd) Stop() { func init() { inputs.Add("statsd", func() telegraf.Input { return &Statsd{ - ConvertNames: true, - UDPPacketSize: UDP_PACKET_SIZE, + MetricSeparator: "_", + UDPPacketSize: UDP_PACKET_SIZE, } }) } diff --git a/plugins/inputs/udp_listener/udp_listener.go b/plugins/inputs/udp_listener/udp_listener.go index 794f1791d..442cf98b3 100644 --- a/plugins/inputs/udp_listener/udp_listener.go +++ b/plugins/inputs/udp_listener/udp_listener.go @@ -30,7 +30,9 @@ type UdpListener struct { listener *net.UDPConn } -const UDP_PACKET_SIZE int = 1500 +// UDP packet limit, see +// https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure +const UDP_PACKET_SIZE int = 65507 var dropwarn = "ERROR: Message queue full. Discarding line [%s] " + "You may want to increase allowed_pending_messages in the config\n" @@ -43,11 +45,6 @@ const sampleConfig = ` ## UDP listener will start dropping packets. allowed_pending_messages = 10000 - ## UDP packet size for the server to listen for. This will depend - ## on the size of the packets that the client is sending, which is - ## usually 1500 bytes, but can be as large as 65,535 bytes. - udp_packet_size = 1500 - ## Data format to consume. ## Each data format has it's own unique set of configuration options, read ## more about them here: @@ -107,12 +104,12 @@ func (u *UdpListener) udpListen() error { } log.Println("UDP server listening on: ", u.listener.LocalAddr().String()) + buf := make([]byte, u.UDPPacketSize) for { select { case <-u.done: return nil default: - buf := make([]byte, u.UDPPacketSize) n, _, err := u.listener.ReadFromUDP(buf) if err != nil && !strings.Contains(err.Error(), "closed network") { log.Printf("ERROR: %s\n", err.Error()) @@ -130,11 +127,13 @@ func (u *UdpListener) udpListen() error { func (u *UdpListener) udpParser() error { defer u.wg.Done() + + var packet []byte for { select { case <-u.done: return nil - case packet := <-u.in: + case packet = <-u.in: metrics, err := u.parser.Parse(packet) if err == nil { u.storeMetrics(metrics)