From 9243ae9f51508c877b9f34350f83d584234fe65c Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 22 Jan 2020 15:28:41 -0800 Subject: [PATCH] Add udp internal metrics for the statsd input (#6921) --- plugins/inputs/statsd/statsd.go | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index a0d3c9ee7..32b12a7e9 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -122,8 +122,12 @@ type Statsd struct { MaxConnections selfstat.Stat CurrentConnections selfstat.Stat TotalConnections selfstat.Stat - PacketsRecv selfstat.Stat - BytesRecv selfstat.Stat + TCPPacketsRecv selfstat.Stat + TCPBytesRecv selfstat.Stat + UDPPacketsRecv selfstat.Stat + UDPPacketsDrop selfstat.Stat + UDPBytesRecv selfstat.Stat + ParseTimeNS selfstat.Stat Log telegraf.Logger @@ -327,8 +331,12 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error { s.MaxConnections.Set(int64(s.MaxTCPConnections)) s.CurrentConnections = selfstat.Register("statsd", "tcp_current_connections", tags) s.TotalConnections = selfstat.Register("statsd", "tcp_total_connections", tags) - s.PacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags) - s.BytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags) + s.TCPPacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags) + s.TCPBytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags) + s.UDPPacketsRecv = selfstat.Register("statsd", "udp_packets_received", tags) + s.UDPPacketsDrop = selfstat.Register("statsd", "udp_packets_dropped", tags) + s.UDPBytesRecv = selfstat.Register("statsd", "udp_bytes_received", tags) + s.ParseTimeNS = selfstat.Register("statsd", "parse_time_ns", tags) s.in = make(chan input, s.AllowedPendingMessages) s.done = make(chan struct{}) @@ -461,6 +469,8 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error { } return err } + s.UDPPacketsRecv.Incr(1) + s.UDPBytesRecv.Incr(int64(n)) b := s.bufPool.Get().(*bytes.Buffer) b.Reset() b.Write(buf[:n]) @@ -470,6 +480,7 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error { Time: time.Now(), Addr: addr.IP.String()}: default: + s.UDPPacketsDrop.Incr(1) s.drops++ if s.drops == 1 || s.AllowedPendingMessages == 0 || s.drops%s.AllowedPendingMessages == 0 { s.Log.Errorf("Statsd message queue full. "+ @@ -490,6 +501,7 @@ func (s *Statsd) parser() error { case <-s.done: return nil case in := <-s.in: + start := time.Now() lines := strings.Split(in.Buffer.String(), "\n") s.bufPool.Put(in.Buffer) for _, line := range lines { @@ -502,6 +514,8 @@ func (s *Statsd) parser() error { s.parseStatsdLine(line) } } + elapsed := time.Since(start) + s.ParseTimeNS.Set(elapsed.Nanoseconds()) } } } @@ -834,8 +848,8 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) { if n == 0 { continue } - s.BytesRecv.Incr(int64(n)) - s.PacketsRecv.Incr(1) + s.TCPBytesRecv.Incr(int64(n)) + s.TCPPacketsRecv.Incr(1) b := s.bufPool.Get().(*bytes.Buffer) b.Reset()