diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 6b0dd0b78..8b5e15502 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -13,11 +13,10 @@ import ( "sync" "time" - "github.com/influxdata/telegraf/plugins/parsers/graphite" - "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers/graphite" "github.com/influxdata/telegraf/selfstat" ) @@ -338,38 +337,64 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error { s.MetricSeparator = defaultSeparator } - s.wg.Add(2) - // Start the UDP listener if s.isUDP() { - go s.udpListen() + address, err := net.ResolveUDPAddr(s.Protocol, s.ServiceAddress) + if err != nil { + return err + } + + conn, err := net.ListenUDP(s.Protocol, address) + if err != nil { + return err + } + + log.Println("I! Statsd UDP listener listening on: ", conn.LocalAddr().String()) + s.UDPlistener = conn + + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.udpListen(conn) + }() } else { - go s.tcpListen() + address, err := net.ResolveTCPAddr("tcp", s.ServiceAddress) + if err != nil { + return err + } + listener, err := net.ListenTCP("tcp", address) + if err != nil { + return err + } + + log.Println("I! TCP Statsd listening on: ", listener.Addr().String()) + s.TCPlistener = listener + + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.tcpListen(listener) + }() } + // Start the line parser - go s.parser() + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.parser() + }() log.Printf("I! Started the statsd service on %s\n", s.ServiceAddress) return nil } // tcpListen() starts listening for udp packets on the configured port. -func (s *Statsd) tcpListen() error { - defer s.wg.Done() - // Start listener - var err error - address, _ := net.ResolveTCPAddr("tcp", s.ServiceAddress) - s.TCPlistener, err = net.ListenTCP("tcp", address) - if err != nil { - log.Fatalf("ERROR: ListenTCP - %s", err) - return err - } - log.Println("I! TCP Statsd listening on: ", s.TCPlistener.Addr().String()) +func (s *Statsd) tcpListen(listener *net.TCPListener) error { for { select { case <-s.done: return nil default: // Accept connection: - conn, err := s.TCPlistener.AcceptTCP() + conn, err := listener.AcceptTCP() if err != nil { return err } @@ -403,16 +428,7 @@ func (s *Statsd) tcpListen() error { } // udpListen starts listening for udp packets on the configured port. -func (s *Statsd) udpListen() error { - defer s.wg.Done() - var err error - address, _ := net.ResolveUDPAddr(s.Protocol, s.ServiceAddress) - s.UDPlistener, err = net.ListenUDP(s.Protocol, address) - if err != nil { - log.Fatalf("ERROR: ListenUDP - %s", err) - } - log.Println("I! Statsd UDP listener listening on: ", s.UDPlistener.LocalAddr().String()) - +func (s *Statsd) udpListen(conn *net.UDPConn) error { if s.ReadBufferSize > 0 { s.UDPlistener.SetReadBuffer(s.ReadBufferSize) } @@ -423,7 +439,7 @@ func (s *Statsd) udpListen() error { case <-s.done: return nil default: - n, _, err := s.UDPlistener.ReadFromUDP(buf) + n, _, err := conn.ReadFromUDP(buf) if err != nil && !strings.Contains(err.Error(), "closed network") { log.Printf("E! Error READ: %s\n", err.Error()) continue @@ -448,7 +464,6 @@ func (s *Statsd) udpListen() error { // packet into statsd strings and then calls parseStatsdLine, which parses a // single statsd metric into a struct. func (s *Statsd) parser() error { - defer s.wg.Done() for { select { case <-s.done: