Improve statsd plugin perf by using a byte buffer pool (#3254)

This commit is contained in:
Agniva De Sarker 2017-09-25 23:25:02 +05:30 committed by Daniel Nelson
parent 48edce0170
commit c50e66e18f
2 changed files with 28 additions and 18 deletions

View File

@ -2,6 +2,7 @@ package statsd
import ( import (
"bufio" "bufio"
"bytes"
"errors" "errors"
"fmt" "fmt"
"log" "log"
@ -89,7 +90,7 @@ type Statsd struct {
malformed int malformed int
// Channel for all incoming statsd packets // Channel for all incoming statsd packets
in chan []byte in chan *bytes.Buffer
done chan struct{} done chan struct{}
// Cache gauges, counters & sets so they can be aggregated as they arrive // Cache gauges, counters & sets so they can be aggregated as they arrive
@ -121,6 +122,9 @@ type Statsd struct {
TotalConnections selfstat.Stat TotalConnections selfstat.Stat
PacketsRecv selfstat.Stat PacketsRecv selfstat.Stat
BytesRecv selfstat.Stat BytesRecv selfstat.Stat
// A pool of byte slices to handle parsing
bufPool sync.Pool
} }
// One statsd metric, form is <bucket>:<value>|<mtype>|@<samplerate> // One statsd metric, form is <bucket>:<value>|<mtype>|@<samplerate>
@ -281,9 +285,6 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
func (s *Statsd) Start(_ telegraf.Accumulator) error { func (s *Statsd) Start(_ telegraf.Accumulator) error {
// Make data structures // Make data structures
s.done = make(chan struct{})
s.in = make(chan []byte, s.AllowedPendingMessages)
s.gauges = make(map[string]cachedgauge) s.gauges = make(map[string]cachedgauge)
s.counters = make(map[string]cachedcounter) s.counters = make(map[string]cachedcounter)
s.sets = make(map[string]cachedset) s.sets = make(map[string]cachedset)
@ -302,10 +303,15 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error {
s.PacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags) s.PacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags)
s.BytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags) s.BytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags)
s.in = make(chan []byte, s.AllowedPendingMessages) s.in = make(chan *bytes.Buffer, s.AllowedPendingMessages)
s.done = make(chan struct{}) s.done = make(chan struct{})
s.accept = make(chan bool, s.MaxTCPConnections) s.accept = make(chan bool, s.MaxTCPConnections)
s.conns = make(map[string]*net.TCPConn) s.conns = make(map[string]*net.TCPConn)
s.bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
for i := 0; i < s.MaxTCPConnections; i++ { for i := 0; i < s.MaxTCPConnections; i++ {
s.accept <- true s.accept <- true
} }
@ -394,11 +400,12 @@ func (s *Statsd) udpListen() error {
log.Printf("E! Error READ: %s\n", err.Error()) log.Printf("E! Error READ: %s\n", err.Error())
continue continue
} }
bufCopy := make([]byte, n) b := s.bufPool.Get().(*bytes.Buffer)
copy(bufCopy, buf[:n]) b.Reset()
b.Write(buf[:n])
select { select {
case s.in <- bufCopy: case s.in <- b:
default: default:
s.drops++ s.drops++
if s.drops == 1 || s.AllowedPendingMessages == 0 || s.drops%s.AllowedPendingMessages == 0 { if s.drops == 1 || s.AllowedPendingMessages == 0 || s.drops%s.AllowedPendingMessages == 0 {
@ -414,19 +421,19 @@ func (s *Statsd) udpListen() error {
// single statsd metric into a struct. // single statsd metric into a struct.
func (s *Statsd) parser() error { func (s *Statsd) parser() error {
defer s.wg.Done() defer s.wg.Done()
var packet []byte
for { for {
select { select {
case <-s.done: case <-s.done:
return nil return nil
case packet = <-s.in: case buf := <-s.in:
lines := strings.Split(string(packet), "\n") lines := strings.Split(buf.String(), "\n")
for _, line := range lines { for _, line := range lines {
line = strings.TrimSpace(line) line = strings.TrimSpace(line)
if line != "" { if line != "" {
s.parseStatsdLine(line) s.parseStatsdLine(line)
} }
} }
s.bufPool.Put(buf)
} }
} }
} }
@ -774,12 +781,14 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) {
} }
s.BytesRecv.Incr(int64(n)) s.BytesRecv.Incr(int64(n))
s.PacketsRecv.Incr(1) s.PacketsRecv.Incr(1)
bufCopy := make([]byte, n+1)
copy(bufCopy, scanner.Bytes()) b := s.bufPool.Get().(*bytes.Buffer)
bufCopy[n] = '\n' b.Reset()
b.Write(scanner.Bytes())
b.WriteByte('\n')
select { select {
case s.in <- bufCopy: case s.in <- b:
default: default:
s.drops++ s.drops++
if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 { if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 {

View File

@ -1,6 +1,7 @@
package statsd package statsd
import ( import (
"bytes"
"errors" "errors"
"fmt" "fmt"
"net" "net"
@ -16,8 +17,8 @@ const (
testMsg = "test.tcp.msg:100|c" testMsg = "test.tcp.msg:100|c"
) )
func newTestTcpListener() (*Statsd, chan []byte) { func newTestTcpListener() (*Statsd, chan *bytes.Buffer) {
in := make(chan []byte, 1500) in := make(chan *bytes.Buffer, 1500)
listener := &Statsd{ listener := &Statsd{
Protocol: "tcp", Protocol: "tcp",
ServiceAddress: ":8125", ServiceAddress: ":8125",
@ -34,7 +35,7 @@ func NewTestStatsd() *Statsd {
// Make data structures // Make data structures
s.done = make(chan struct{}) s.done = make(chan struct{})
s.in = make(chan []byte, s.AllowedPendingMessages) s.in = make(chan *bytes.Buffer, s.AllowedPendingMessages)
s.gauges = make(map[string]cachedgauge) s.gauges = make(map[string]cachedgauge)
s.counters = make(map[string]cachedcounter) s.counters = make(map[string]cachedcounter)
s.sets = make(map[string]cachedset) s.sets = make(map[string]cachedset)