diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 76dd25054..67ce29cd8 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -2,6 +2,7 @@ package statsd import ( "bufio" + "bytes" "errors" "fmt" "log" @@ -89,7 +90,7 @@ type Statsd struct { malformed int // Channel for all incoming statsd packets - in chan []byte + in chan *bytes.Buffer done chan struct{} // Cache gauges, counters & sets so they can be aggregated as they arrive @@ -121,6 +122,9 @@ type Statsd struct { TotalConnections selfstat.Stat PacketsRecv selfstat.Stat BytesRecv selfstat.Stat + + // A pool of byte slices to handle parsing + bufPool sync.Pool } // One statsd metric, form is :||@ @@ -281,9 +285,6 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { func (s *Statsd) Start(_ telegraf.Accumulator) error { // Make data structures - s.done = make(chan struct{}) - s.in = make(chan []byte, s.AllowedPendingMessages) - s.gauges = make(map[string]cachedgauge) s.counters = make(map[string]cachedcounter) s.sets = make(map[string]cachedset) @@ -302,10 +303,15 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error { s.PacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags) s.BytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags) - s.in = make(chan []byte, s.AllowedPendingMessages) + s.in = make(chan *bytes.Buffer, s.AllowedPendingMessages) s.done = make(chan struct{}) s.accept = make(chan bool, s.MaxTCPConnections) s.conns = make(map[string]*net.TCPConn) + s.bufPool = sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, + } for i := 0; i < s.MaxTCPConnections; i++ { s.accept <- true } @@ -394,11 +400,12 @@ func (s *Statsd) udpListen() error { log.Printf("E! Error READ: %s\n", err.Error()) continue } - bufCopy := make([]byte, n) - copy(bufCopy, buf[:n]) + b := s.bufPool.Get().(*bytes.Buffer) + b.Reset() + b.Write(buf[:n]) select { - case s.in <- bufCopy: + case s.in <- b: default: s.drops++ if s.drops == 1 || s.AllowedPendingMessages == 0 || s.drops%s.AllowedPendingMessages == 0 { @@ -414,19 +421,19 @@ func (s *Statsd) udpListen() error { // single statsd metric into a struct. func (s *Statsd) parser() error { defer s.wg.Done() - var packet []byte for { select { case <-s.done: return nil - case packet = <-s.in: - lines := strings.Split(string(packet), "\n") + case buf := <-s.in: + lines := strings.Split(buf.String(), "\n") for _, line := range lines { line = strings.TrimSpace(line) if line != "" { s.parseStatsdLine(line) } } + s.bufPool.Put(buf) } } } @@ -774,12 +781,14 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) { } s.BytesRecv.Incr(int64(n)) s.PacketsRecv.Incr(1) - bufCopy := make([]byte, n+1) - copy(bufCopy, scanner.Bytes()) - bufCopy[n] = '\n' + + b := s.bufPool.Get().(*bytes.Buffer) + b.Reset() + b.Write(scanner.Bytes()) + b.WriteByte('\n') select { - case s.in <- bufCopy: + case s.in <- b: default: s.drops++ if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 { diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index 91af51d67..1331e1f4b 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -1,6 +1,7 @@ package statsd import ( + "bytes" "errors" "fmt" "net" @@ -16,8 +17,8 @@ const ( testMsg = "test.tcp.msg:100|c" ) -func newTestTcpListener() (*Statsd, chan []byte) { - in := make(chan []byte, 1500) +func newTestTcpListener() (*Statsd, chan *bytes.Buffer) { + in := make(chan *bytes.Buffer, 1500) listener := &Statsd{ Protocol: "tcp", ServiceAddress: ":8125", @@ -34,7 +35,7 @@ func NewTestStatsd() *Statsd { // Make data structures s.done = make(chan struct{}) - s.in = make(chan []byte, s.AllowedPendingMessages) + s.in = make(chan *bytes.Buffer, s.AllowedPendingMessages) s.gauges = make(map[string]cachedgauge) s.counters = make(map[string]cachedcounter) s.sets = make(map[string]cachedset)