Make the UDP input buffer only once

This commit is contained in:
Cameron Sparr 2016-04-05 10:37:21 -06:00 committed by Michele Fadda
parent fddb2a238e
commit 2cba540d87
3 changed files with 16 additions and 17 deletions

View File

@ -1,6 +1,7 @@
## v0.12.1 [unreleased] ## v0.12.1 [unreleased]
### Features ### Features
- [#976](https://github.com/influxdata/telegraf/pull/976): Reduce allocations in the UDP and statsd inputs.
### Bugfixes ### Bugfixes
- [#968](https://github.com/influxdata/telegraf/issues/968): Processes plugin gets unknown state when spaces are in (command name) - [#968](https://github.com/influxdata/telegraf/issues/968): Processes plugin gets unknown state when spaces are in (command name)

View File

@ -18,7 +18,9 @@ import (
) )
const ( const (
UDP_PACKET_SIZE int = 1500 // UDP packet limit, see
// https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
UDP_PACKET_SIZE int = 65507
defaultFieldName = "value" defaultFieldName = "value"
@ -157,10 +159,6 @@ const sampleConfig = `
## calculation of percentiles. Raising this limit increases the accuracy ## calculation of percentiles. Raising this limit increases the accuracy
## of percentiles but also increases the memory usage and cpu time. ## of percentiles but also increases the memory usage and cpu time.
percentile_limit = 1000 percentile_limit = 1000
## UDP packet size for the server to listen for. This will depend on the size
## of the packets that the client is sending, which is usually 1500 bytes.
udp_packet_size = 1500
` `
func (_ *Statsd) SampleConfig() string { func (_ *Statsd) SampleConfig() string {
@ -274,12 +272,12 @@ func (s *Statsd) udpListen() error {
} }
log.Println("Statsd listener listening on: ", s.listener.LocalAddr().String()) log.Println("Statsd listener listening on: ", s.listener.LocalAddr().String())
buf := make([]byte, s.UDPPacketSize)
for { for {
select { select {
case <-s.done: case <-s.done:
return nil return nil
default: default:
buf := make([]byte, s.UDPPacketSize)
n, _, err := s.listener.ReadFromUDP(buf) n, _, err := s.listener.ReadFromUDP(buf)
if err != nil && !strings.Contains(err.Error(), "closed network") { if err != nil && !strings.Contains(err.Error(), "closed network") {
log.Printf("ERROR READ: %s\n", err.Error()) log.Printf("ERROR READ: %s\n", err.Error())
@ -300,11 +298,12 @@ func (s *Statsd) udpListen() error {
// single statsd metric into a struct. // single statsd metric into a struct.
func (s *Statsd) parser() error { func (s *Statsd) parser() error {
defer s.wg.Done() defer s.wg.Done()
var packet []byte
for { for {
select { select {
case <-s.done: case <-s.done:
return nil return nil
case packet := <-s.in: case packet = <-s.in:
lines := strings.Split(string(packet), "\n") lines := strings.Split(string(packet), "\n")
for _, line := range lines { for _, line := range lines {
line = strings.TrimSpace(line) line = strings.TrimSpace(line)
@ -631,7 +630,7 @@ func (s *Statsd) Stop() {
func init() { func init() {
inputs.Add("statsd", func() telegraf.Input { inputs.Add("statsd", func() telegraf.Input {
return &Statsd{ return &Statsd{
ConvertNames: true, MetricSeparator: "_",
UDPPacketSize: UDP_PACKET_SIZE, UDPPacketSize: UDP_PACKET_SIZE,
} }
}) })

View File

@ -30,7 +30,9 @@ type UdpListener struct {
listener *net.UDPConn listener *net.UDPConn
} }
const UDP_PACKET_SIZE int = 1500 // UDP packet limit, see
// https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
const UDP_PACKET_SIZE int = 65507
var dropwarn = "ERROR: Message queue full. Discarding line [%s] " + var dropwarn = "ERROR: Message queue full. Discarding line [%s] " +
"You may want to increase allowed_pending_messages in the config\n" "You may want to increase allowed_pending_messages in the config\n"
@ -43,11 +45,6 @@ const sampleConfig = `
## UDP listener will start dropping packets. ## UDP listener will start dropping packets.
allowed_pending_messages = 10000 allowed_pending_messages = 10000
## UDP packet size for the server to listen for. This will depend
## on the size of the packets that the client is sending, which is
## usually 1500 bytes, but can be as large as 65,535 bytes.
udp_packet_size = 1500
## Data format to consume. ## Data format to consume.
## Each data format has it's own unique set of configuration options, read ## Each data format has it's own unique set of configuration options, read
## more about them here: ## more about them here:
@ -107,12 +104,12 @@ func (u *UdpListener) udpListen() error {
} }
log.Println("UDP server listening on: ", u.listener.LocalAddr().String()) log.Println("UDP server listening on: ", u.listener.LocalAddr().String())
buf := make([]byte, u.UDPPacketSize)
for { for {
select { select {
case <-u.done: case <-u.done:
return nil return nil
default: default:
buf := make([]byte, u.UDPPacketSize)
n, _, err := u.listener.ReadFromUDP(buf) n, _, err := u.listener.ReadFromUDP(buf)
if err != nil && !strings.Contains(err.Error(), "closed network") { if err != nil && !strings.Contains(err.Error(), "closed network") {
log.Printf("ERROR: %s\n", err.Error()) log.Printf("ERROR: %s\n", err.Error())
@ -130,11 +127,13 @@ func (u *UdpListener) udpListen() error {
func (u *UdpListener) udpParser() error { func (u *UdpListener) udpParser() error {
defer u.wg.Done() defer u.wg.Done()
var packet []byte
for { for {
select { select {
case <-u.done: case <-u.done:
return nil return nil
case packet := <-u.in: case packet = <-u.in:
metrics, err := u.parser.Parse(packet) metrics, err := u.parser.Parse(packet)
if err == nil { if err == nil {
u.storeMetrics(metrics) u.storeMetrics(metrics)