diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 84687511e..69638af06 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -20,7 +20,7 @@ import ( const ( // UDP packet limit, see // https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure - UDP_PACKET_SIZE int = 65507 + UDP_MAX_PACKET_SIZE int = 64 * 1024 defaultFieldName = "value" @@ -57,8 +57,10 @@ type Statsd struct { // statsd protocol (http://docs.datadoghq.com/guides/dogstatsd/) ParseDataDogTags bool - // UDPPacketSize is the size of the read packets for the server listening - // for statsd UDP packets. This will default to 1500 bytes. + // UDPPacketSize is deprecated, it's only here for legacy support + // we now always create 1 max size buffer and then copy only what we need + // into the in channel + // see https://github.com/influxdata/telegraf/pull/992 UDPPacketSize int `toml:"udp_packet_size"` sync.Mutex @@ -272,7 +274,7 @@ func (s *Statsd) udpListen() error { } log.Println("Statsd listener listening on: ", s.listener.LocalAddr().String()) - buf := make([]byte, s.UDPPacketSize) + buf := make([]byte, UDP_MAX_PACKET_SIZE) for { select { case <-s.done: @@ -283,9 +285,11 @@ func (s *Statsd) udpListen() error { log.Printf("ERROR READ: %s\n", err.Error()) continue } + bufCopy := make([]byte, n) + copy(bufCopy, buf[:n]) select { - case s.in <- buf[:n]: + case s.in <- bufCopy: default: log.Printf(dropwarn, string(buf[:n])) } @@ -631,7 +635,6 @@ func init() { inputs.Add("statsd", func() telegraf.Input { return &Statsd{ MetricSeparator: "_", - UDPPacketSize: UDP_PACKET_SIZE, } }) } diff --git a/plugins/inputs/tcp_listener/tcp_listener.go b/plugins/inputs/tcp_listener/tcp_listener.go index 4559a3bf5..6f3166456 100644 --- a/plugins/inputs/tcp_listener/tcp_listener.go +++ b/plugins/inputs/tcp_listener/tcp_listener.go @@ -39,7 +39,7 @@ type TcpListener struct { acc telegraf.Accumulator } -var dropwarn = "ERROR: Message queue full. Discarding metric. " + +var dropwarn = "ERROR: Message queue full. Discarding metric [%s], " + "You may want to increase allowed_pending_messages in the config\n" const sampleConfig = ` @@ -193,6 +193,7 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) { t.forget(id) }() + var buf []byte scanner := bufio.NewScanner(conn) for { select { @@ -202,8 +203,15 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) { if !scanner.Scan() { return } + buf = scanner.Bytes() + if len(buf) == 0 { + continue + } + bufCopy := make([]byte, len(buf)) + copy(bufCopy, buf) + select { - case t.in <- scanner.Bytes(): + case t.in <- bufCopy: default: log.Printf(dropwarn) } diff --git a/plugins/inputs/udp_listener/udp_listener.go b/plugins/inputs/udp_listener/udp_listener.go index 442cf98b3..39249de37 100644 --- a/plugins/inputs/udp_listener/udp_listener.go +++ b/plugins/inputs/udp_listener/udp_listener.go @@ -12,7 +12,11 @@ import ( ) type UdpListener struct { - ServiceAddress string + ServiceAddress string + // UDPPacketSize is deprecated, it's only here for legacy support + // we now always create 1 max size buffer and then copy only what we need + // into the in channel + // see https://github.com/influxdata/telegraf/pull/992 UDPPacketSize int `toml:"udp_packet_size"` AllowedPendingMessages int @@ -32,7 +36,7 @@ type UdpListener struct { // UDP packet limit, see // https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure -const UDP_PACKET_SIZE int = 65507 +const UDP_MAX_PACKET_SIZE int = 64 * 1024 var dropwarn = "ERROR: Message queue full. Discarding line [%s] " + "You may want to increase allowed_pending_messages in the config\n" @@ -104,7 +108,7 @@ func (u *UdpListener) udpListen() error { } log.Println("UDP server listening on: ", u.listener.LocalAddr().String()) - buf := make([]byte, u.UDPPacketSize) + buf := make([]byte, UDP_MAX_PACKET_SIZE) for { select { case <-u.done: @@ -115,11 +119,13 @@ func (u *UdpListener) udpListen() error { log.Printf("ERROR: %s\n", err.Error()) continue } + bufCopy := make([]byte, n) + copy(bufCopy, buf[:n]) select { - case u.in <- buf[:n]: + case u.in <- bufCopy: default: - log.Printf(dropwarn, string(buf[:n])) + log.Printf(dropwarn, string(bufCopy)) } } } @@ -155,8 +161,6 @@ func (u *UdpListener) storeMetrics(metrics []telegraf.Metric) error { func init() { inputs.Add("udp_listener", func() telegraf.Input { - return &UdpListener{ - UDPPacketSize: UDP_PACKET_SIZE, - } + return &UdpListener{} }) }