Listen before leaving start in statsd (#5628)

This commit is contained in:
Daniel Nelson 2019-03-26 18:11:56 -07:00 committed by GitHub
parent 3bb1548414
commit 22ab649261
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 46 additions and 31 deletions

View File

@ -13,11 +13,10 @@ import (
"sync" "sync"
"time" "time"
"github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/selfstat" "github.com/influxdata/telegraf/selfstat"
) )
@ -338,38 +337,64 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error {
s.MetricSeparator = defaultSeparator s.MetricSeparator = defaultSeparator
} }
s.wg.Add(2)
// Start the UDP listener
if s.isUDP() { if s.isUDP() {
go s.udpListen() address, err := net.ResolveUDPAddr(s.Protocol, s.ServiceAddress)
if err != nil {
return err
}
conn, err := net.ListenUDP(s.Protocol, address)
if err != nil {
return err
}
log.Println("I! Statsd UDP listener listening on: ", conn.LocalAddr().String())
s.UDPlistener = conn
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.udpListen(conn)
}()
} else { } else {
go s.tcpListen() address, err := net.ResolveTCPAddr("tcp", s.ServiceAddress)
if err != nil {
return err
}
listener, err := net.ListenTCP("tcp", address)
if err != nil {
return err
}
log.Println("I! TCP Statsd listening on: ", listener.Addr().String())
s.TCPlistener = listener
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.tcpListen(listener)
}()
} }
// Start the line parser // Start the line parser
go s.parser() s.wg.Add(1)
go func() {
defer s.wg.Done()
s.parser()
}()
log.Printf("I! Started the statsd service on %s\n", s.ServiceAddress) log.Printf("I! Started the statsd service on %s\n", s.ServiceAddress)
return nil return nil
} }
// tcpListen() starts listening for udp packets on the configured port. // tcpListen() starts listening for udp packets on the configured port.
func (s *Statsd) tcpListen() error { func (s *Statsd) tcpListen(listener *net.TCPListener) error {
defer s.wg.Done()
// Start listener
var err error
address, _ := net.ResolveTCPAddr("tcp", s.ServiceAddress)
s.TCPlistener, err = net.ListenTCP("tcp", address)
if err != nil {
log.Fatalf("ERROR: ListenTCP - %s", err)
return err
}
log.Println("I! TCP Statsd listening on: ", s.TCPlistener.Addr().String())
for { for {
select { select {
case <-s.done: case <-s.done:
return nil return nil
default: default:
// Accept connection: // Accept connection:
conn, err := s.TCPlistener.AcceptTCP() conn, err := listener.AcceptTCP()
if err != nil { if err != nil {
return err return err
} }
@ -403,16 +428,7 @@ func (s *Statsd) tcpListen() error {
} }
// udpListen starts listening for udp packets on the configured port. // udpListen starts listening for udp packets on the configured port.
func (s *Statsd) udpListen() error { func (s *Statsd) udpListen(conn *net.UDPConn) error {
defer s.wg.Done()
var err error
address, _ := net.ResolveUDPAddr(s.Protocol, s.ServiceAddress)
s.UDPlistener, err = net.ListenUDP(s.Protocol, address)
if err != nil {
log.Fatalf("ERROR: ListenUDP - %s", err)
}
log.Println("I! Statsd UDP listener listening on: ", s.UDPlistener.LocalAddr().String())
if s.ReadBufferSize > 0 { if s.ReadBufferSize > 0 {
s.UDPlistener.SetReadBuffer(s.ReadBufferSize) s.UDPlistener.SetReadBuffer(s.ReadBufferSize)
} }
@ -423,7 +439,7 @@ func (s *Statsd) udpListen() error {
case <-s.done: case <-s.done:
return nil return nil
default: default:
n, _, err := s.UDPlistener.ReadFromUDP(buf) n, _, err := conn.ReadFromUDP(buf)
if err != nil && !strings.Contains(err.Error(), "closed network") { if err != nil && !strings.Contains(err.Error(), "closed network") {
log.Printf("E! Error READ: %s\n", err.Error()) log.Printf("E! Error READ: %s\n", err.Error())
continue continue
@ -448,7 +464,6 @@ func (s *Statsd) udpListen() error {
// packet into statsd strings and then calls parseStatsdLine, which parses a // packet into statsd strings and then calls parseStatsdLine, which parses a
// single statsd metric into a struct. // single statsd metric into a struct.
func (s *Statsd) parser() error { func (s *Statsd) parser() error {
defer s.wg.Done()
for { for {
select { select {
case <-s.done: case <-s.done: