From f816b952cf076f467ba4b82ee0a31cd99dac21a9 Mon Sep 17 00:00:00 2001 From: Sebastian Borza Date: Tue, 15 Nov 2016 03:49:48 -0600 Subject: [PATCH] Add udp_buffer_size option to udp_listener (#1883) * patching udp_listener for fun updating with errcode adding debug flags to temp msgs moving from debug to info * updating PR 1883 based on feedback --- plugins/inputs/udp_listener/udp_listener.go | 39 ++++++++++++++++++--- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/plugins/inputs/udp_listener/udp_listener.go b/plugins/inputs/udp_listener/udp_listener.go index f8dff5269..78687feee 100644 --- a/plugins/inputs/udp_listener/udp_listener.go +++ b/plugins/inputs/udp_listener/udp_listener.go @@ -11,14 +11,26 @@ import ( "github.com/influxdata/telegraf/plugins/parsers" ) +// UdpListener main struct for the collector type UdpListener struct { ServiceAddress string + + // UDPBufferSize should only be set if you want/need the telegraf UDP socket to + // differ from the system setting. In cases where you set the rmem_default to a lower + // value at the host level, but need a larger buffer for UDP bursty traffic, this + // setting enables you to configure that value ONLY for telegraf UDP sockets on this listener + // Set this to 0 (or comment out) to take system default + // + // NOTE: You should ensure that your rmem_max is >= to this setting to work properly! + // (e.g. sysctl -w net.core.rmem_max=N) + UDPBufferSize int `toml:"udp_buffer_size"` + AllowedPendingMessages int + // UDPPacketSize is deprecated, it's only here for legacy support // we now always create 1 max size buffer and then copy only what we need // into the in channel // see https://github.com/influxdata/telegraf/pull/992 - UDPPacketSize int `toml:"udp_packet_size"` - AllowedPendingMessages int + UDPPacketSize int `toml:"udp_packet_size"` sync.Mutex wg sync.WaitGroup @@ -38,7 +50,7 @@ type UdpListener struct { listener *net.UDPConn } -// UDP packet limit, see +// UDP_MAX_PACKET_SIZE is packet limit, see // https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure const UDP_MAX_PACKET_SIZE int = 64 * 1024 @@ -57,6 +69,10 @@ const sampleConfig = ` ## UDP listener will start dropping packets. # allowed_pending_messages = 10000 + ## Set the buffer size of the UDP connection outside of OS default (in bytes) + ## If set to 0, take OS default + udp_buffer_size = 16777216 + ## Data format to consume. ## Each data format has it's own unique set of configuration options, read ## more about them here: @@ -94,7 +110,7 @@ func (u *UdpListener) Start(acc telegraf.Accumulator) error { go u.udpListen() go u.udpParser() - log.Printf("I! Started UDP listener service on %s\n", u.ServiceAddress) + log.Printf("I! Started UDP listener service on %s (ReadBuffer: %d)\n", u.ServiceAddress, u.UDPBufferSize) return nil } @@ -111,20 +127,33 @@ func (u *UdpListener) Stop() { func (u *UdpListener) udpListen() error { defer u.wg.Done() var err error + address, _ := net.ResolveUDPAddr("udp", u.ServiceAddress) u.listener, err = net.ListenUDP("udp", address) + if err != nil { - log.Fatalf("ERROR: ListenUDP - %s", err) + log.Fatalf("E! Error: ListenUDP - %s", err) } + log.Println("I! UDP server listening on: ", u.listener.LocalAddr().String()) buf := make([]byte, UDP_MAX_PACKET_SIZE) + + if u.UDPBufferSize > 0 { + err = u.listener.SetReadBuffer(u.UDPBufferSize) // if we want to move away from OS default + if err != nil { + log.Printf("E! Failed to set UDP read buffer to %d: %s", u.UDPBufferSize, err) + return err + } + } + for { select { case <-u.done: return nil default: u.listener.SetReadDeadline(time.Now().Add(time.Second)) + n, _, err := u.listener.ReadFromUDP(buf) if err != nil { if err, ok := err.(net.Error); ok && err.Timeout() {