Add udp internal metrics for the statsd input (#6921)

This commit is contained in:
Daniel Nelson 2020-01-22 15:28:41 -08:00 committed by GitHub
parent ec35f07770
commit 9243ae9f51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 20 additions and 6 deletions

View File

@ -122,8 +122,12 @@ type Statsd struct {
MaxConnections selfstat.Stat MaxConnections selfstat.Stat
CurrentConnections selfstat.Stat CurrentConnections selfstat.Stat
TotalConnections selfstat.Stat TotalConnections selfstat.Stat
PacketsRecv selfstat.Stat TCPPacketsRecv selfstat.Stat
BytesRecv selfstat.Stat TCPBytesRecv selfstat.Stat
UDPPacketsRecv selfstat.Stat
UDPPacketsDrop selfstat.Stat
UDPBytesRecv selfstat.Stat
ParseTimeNS selfstat.Stat
Log telegraf.Logger Log telegraf.Logger
@ -327,8 +331,12 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error {
s.MaxConnections.Set(int64(s.MaxTCPConnections)) s.MaxConnections.Set(int64(s.MaxTCPConnections))
s.CurrentConnections = selfstat.Register("statsd", "tcp_current_connections", tags) s.CurrentConnections = selfstat.Register("statsd", "tcp_current_connections", tags)
s.TotalConnections = selfstat.Register("statsd", "tcp_total_connections", tags) s.TotalConnections = selfstat.Register("statsd", "tcp_total_connections", tags)
s.PacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags) s.TCPPacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags)
s.BytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags) s.TCPBytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags)
s.UDPPacketsRecv = selfstat.Register("statsd", "udp_packets_received", tags)
s.UDPPacketsDrop = selfstat.Register("statsd", "udp_packets_dropped", tags)
s.UDPBytesRecv = selfstat.Register("statsd", "udp_bytes_received", tags)
s.ParseTimeNS = selfstat.Register("statsd", "parse_time_ns", tags)
s.in = make(chan input, s.AllowedPendingMessages) s.in = make(chan input, s.AllowedPendingMessages)
s.done = make(chan struct{}) s.done = make(chan struct{})
@ -461,6 +469,8 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error {
} }
return err return err
} }
s.UDPPacketsRecv.Incr(1)
s.UDPBytesRecv.Incr(int64(n))
b := s.bufPool.Get().(*bytes.Buffer) b := s.bufPool.Get().(*bytes.Buffer)
b.Reset() b.Reset()
b.Write(buf[:n]) b.Write(buf[:n])
@ -470,6 +480,7 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error {
Time: time.Now(), Time: time.Now(),
Addr: addr.IP.String()}: Addr: addr.IP.String()}:
default: default:
s.UDPPacketsDrop.Incr(1)
s.drops++ s.drops++
if s.drops == 1 || s.AllowedPendingMessages == 0 || s.drops%s.AllowedPendingMessages == 0 { if s.drops == 1 || s.AllowedPendingMessages == 0 || s.drops%s.AllowedPendingMessages == 0 {
s.Log.Errorf("Statsd message queue full. "+ s.Log.Errorf("Statsd message queue full. "+
@ -490,6 +501,7 @@ func (s *Statsd) parser() error {
case <-s.done: case <-s.done:
return nil return nil
case in := <-s.in: case in := <-s.in:
start := time.Now()
lines := strings.Split(in.Buffer.String(), "\n") lines := strings.Split(in.Buffer.String(), "\n")
s.bufPool.Put(in.Buffer) s.bufPool.Put(in.Buffer)
for _, line := range lines { for _, line := range lines {
@ -502,6 +514,8 @@ func (s *Statsd) parser() error {
s.parseStatsdLine(line) s.parseStatsdLine(line)
} }
} }
elapsed := time.Since(start)
s.ParseTimeNS.Set(elapsed.Nanoseconds())
} }
} }
} }
@ -834,8 +848,8 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) {
if n == 0 { if n == 0 {
continue continue
} }
s.BytesRecv.Incr(int64(n)) s.TCPBytesRecv.Incr(int64(n))
s.PacketsRecv.Incr(1) s.TCPPacketsRecv.Incr(1)
b := s.bufPool.Get().(*bytes.Buffer) b := s.bufPool.Get().(*bytes.Buffer)
b.Reset() b.Reset()