Refactor UDP & TCP input buffers

closes #991
This commit is contained in:
Cameron Sparr 2016-04-07 12:06:56 -06:00
parent 1bf904fe60
commit be379f3dac
3 changed files with 31 additions and 16 deletions

View File

@ -20,7 +20,7 @@ import (
const ( const (
// UDP packet limit, see // UDP packet limit, see
// https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure // https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
UDP_PACKET_SIZE int = 65507 UDP_MAX_PACKET_SIZE int = 64 * 1024
defaultFieldName = "value" defaultFieldName = "value"
@ -57,8 +57,10 @@ type Statsd struct {
// statsd protocol (http://docs.datadoghq.com/guides/dogstatsd/) // statsd protocol (http://docs.datadoghq.com/guides/dogstatsd/)
ParseDataDogTags bool ParseDataDogTags bool
// UDPPacketSize is the size of the read packets for the server listening // UDPPacketSize is deprecated, it's only here for legacy support
// for statsd UDP packets. This will default to 1500 bytes. // we now always create 1 max size buffer and then copy only what we need
// into the in channel
// see https://github.com/influxdata/telegraf/pull/992
UDPPacketSize int `toml:"udp_packet_size"` UDPPacketSize int `toml:"udp_packet_size"`
sync.Mutex sync.Mutex
@ -272,7 +274,7 @@ func (s *Statsd) udpListen() error {
} }
log.Println("Statsd listener listening on: ", s.listener.LocalAddr().String()) log.Println("Statsd listener listening on: ", s.listener.LocalAddr().String())
buf := make([]byte, s.UDPPacketSize) buf := make([]byte, UDP_MAX_PACKET_SIZE)
for { for {
select { select {
case <-s.done: case <-s.done:
@ -283,9 +285,11 @@ func (s *Statsd) udpListen() error {
log.Printf("ERROR READ: %s\n", err.Error()) log.Printf("ERROR READ: %s\n", err.Error())
continue continue
} }
bufCopy := make([]byte, n)
copy(bufCopy, buf[:n])
select { select {
case s.in <- buf[:n]: case s.in <- bufCopy:
default: default:
log.Printf(dropwarn, string(buf[:n])) log.Printf(dropwarn, string(buf[:n]))
} }
@ -631,7 +635,6 @@ func init() {
inputs.Add("statsd", func() telegraf.Input { inputs.Add("statsd", func() telegraf.Input {
return &Statsd{ return &Statsd{
MetricSeparator: "_", MetricSeparator: "_",
UDPPacketSize: UDP_PACKET_SIZE,
} }
}) })
} }

View File

@ -39,7 +39,7 @@ type TcpListener struct {
acc telegraf.Accumulator acc telegraf.Accumulator
} }
var dropwarn = "ERROR: Message queue full. Discarding metric. " + var dropwarn = "ERROR: Message queue full. Discarding metric [%s], " +
"You may want to increase allowed_pending_messages in the config\n" "You may want to increase allowed_pending_messages in the config\n"
const sampleConfig = ` const sampleConfig = `
@ -193,6 +193,7 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) {
t.forget(id) t.forget(id)
}() }()
var buf []byte
scanner := bufio.NewScanner(conn) scanner := bufio.NewScanner(conn)
for { for {
select { select {
@ -202,8 +203,15 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) {
if !scanner.Scan() { if !scanner.Scan() {
return return
} }
buf = scanner.Bytes()
if len(buf) == 0 {
continue
}
bufCopy := make([]byte, len(buf))
copy(bufCopy, buf)
select { select {
case t.in <- scanner.Bytes(): case t.in <- bufCopy:
default: default:
log.Printf(dropwarn) log.Printf(dropwarn)
} }

View File

@ -13,6 +13,10 @@ import (
type UdpListener struct { type UdpListener struct {
ServiceAddress string ServiceAddress string
// UDPPacketSize is deprecated, it's only here for legacy support
// we now always create 1 max size buffer and then copy only what we need
// into the in channel
// see https://github.com/influxdata/telegraf/pull/992
UDPPacketSize int `toml:"udp_packet_size"` UDPPacketSize int `toml:"udp_packet_size"`
AllowedPendingMessages int AllowedPendingMessages int
@ -32,7 +36,7 @@ type UdpListener struct {
// UDP packet limit, see // UDP packet limit, see
// https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure // https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
const UDP_PACKET_SIZE int = 65507 const UDP_MAX_PACKET_SIZE int = 64 * 1024
var dropwarn = "ERROR: Message queue full. Discarding line [%s] " + var dropwarn = "ERROR: Message queue full. Discarding line [%s] " +
"You may want to increase allowed_pending_messages in the config\n" "You may want to increase allowed_pending_messages in the config\n"
@ -104,7 +108,7 @@ func (u *UdpListener) udpListen() error {
} }
log.Println("UDP server listening on: ", u.listener.LocalAddr().String()) log.Println("UDP server listening on: ", u.listener.LocalAddr().String())
buf := make([]byte, u.UDPPacketSize) buf := make([]byte, UDP_MAX_PACKET_SIZE)
for { for {
select { select {
case <-u.done: case <-u.done:
@ -115,11 +119,13 @@ func (u *UdpListener) udpListen() error {
log.Printf("ERROR: %s\n", err.Error()) log.Printf("ERROR: %s\n", err.Error())
continue continue
} }
bufCopy := make([]byte, n)
copy(bufCopy, buf[:n])
select { select {
case u.in <- buf[:n]: case u.in <- bufCopy:
default: default:
log.Printf(dropwarn, string(buf[:n])) log.Printf(dropwarn, string(bufCopy))
} }
} }
} }
@ -155,8 +161,6 @@ func (u *UdpListener) storeMetrics(metrics []telegraf.Metric) error {
func init() { func init() {
inputs.Add("udp_listener", func() telegraf.Input { inputs.Add("udp_listener", func() telegraf.Input {
return &UdpListener{ return &UdpListener{}
UDPPacketSize: UDP_PACKET_SIZE,
}
}) })
} }