Add udp_buffer_size option to udp_listener (#1883)

* patching udp_listener for fun

updating with errcode

adding debug flags to temp msgs

moving from debug to info

* updating PR 1883 based on feedback
This commit is contained in:
Sebastian Borza 2016-11-15 03:49:48 -06:00 committed by Cameron Sparr
parent b905bc1b5d
commit f816b952cf
1 changed files with 34 additions and 5 deletions

View File

@ -11,14 +11,26 @@ import (
"github.com/influxdata/telegraf/plugins/parsers"
)
// UdpListener main struct for the collector
type UdpListener struct {
ServiceAddress string
// UDPBufferSize should only be set if you want/need the telegraf UDP socket to
// differ from the system setting. In cases where you set the rmem_default to a lower
// value at the host level, but need a larger buffer for UDP bursty traffic, this
// setting enables you to configure that value ONLY for telegraf UDP sockets on this listener
// Set this to 0 (or comment out) to take system default
//
// NOTE: You should ensure that your rmem_max is >= to this setting to work properly!
// (e.g. sysctl -w net.core.rmem_max=N)
UDPBufferSize int `toml:"udp_buffer_size"`
AllowedPendingMessages int
// UDPPacketSize is deprecated, it's only here for legacy support
// we now always create 1 max size buffer and then copy only what we need
// into the in channel
// see https://github.com/influxdata/telegraf/pull/992
UDPPacketSize int `toml:"udp_packet_size"`
AllowedPendingMessages int
sync.Mutex
wg sync.WaitGroup
@ -38,7 +50,7 @@ type UdpListener struct {
listener *net.UDPConn
}
// UDP packet limit, see
// UDP_MAX_PACKET_SIZE is packet limit, see
// https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
const UDP_MAX_PACKET_SIZE int = 64 * 1024
@ -57,6 +69,10 @@ const sampleConfig = `
## UDP listener will start dropping packets.
# allowed_pending_messages = 10000
## Set the buffer size of the UDP connection outside of OS default (in bytes)
## If set to 0, take OS default
udp_buffer_size = 16777216
## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
@ -94,7 +110,7 @@ func (u *UdpListener) Start(acc telegraf.Accumulator) error {
go u.udpListen()
go u.udpParser()
log.Printf("I! Started UDP listener service on %s\n", u.ServiceAddress)
log.Printf("I! Started UDP listener service on %s (ReadBuffer: %d)\n", u.ServiceAddress, u.UDPBufferSize)
return nil
}
@ -111,20 +127,33 @@ func (u *UdpListener) Stop() {
func (u *UdpListener) udpListen() error {
defer u.wg.Done()
var err error
address, _ := net.ResolveUDPAddr("udp", u.ServiceAddress)
u.listener, err = net.ListenUDP("udp", address)
if err != nil {
log.Fatalf("ERROR: ListenUDP - %s", err)
log.Fatalf("E! Error: ListenUDP - %s", err)
}
log.Println("I! UDP server listening on: ", u.listener.LocalAddr().String())
buf := make([]byte, UDP_MAX_PACKET_SIZE)
if u.UDPBufferSize > 0 {
err = u.listener.SetReadBuffer(u.UDPBufferSize) // if we want to move away from OS default
if err != nil {
log.Printf("E! Failed to set UDP read buffer to %d: %s", u.UDPBufferSize, err)
return err
}
}
for {
select {
case <-u.done:
return nil
default:
u.listener.SetReadDeadline(time.Now().Add(time.Second))
n, _, err := u.listener.ReadFromUDP(buf)
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {