Close UDP listener in udp_listener plugin
also adding waitgroups to udp_listener and statsd plugins to verify that all goroutines have been cleaned up before Stop() exits. closes #869
This commit is contained in:
@@ -50,6 +50,7 @@ type Statsd struct {
|
||||
UDPPacketSize int `toml:"udp_packet_size"`
|
||||
|
||||
sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
|
||||
// Channel for all incoming statsd packets
|
||||
in chan []byte
|
||||
@@ -238,6 +239,7 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error {
|
||||
s.sets = make(map[string]cachedset)
|
||||
s.timings = make(map[string]cachedtimings)
|
||||
|
||||
s.wg.Add(2)
|
||||
// Start the UDP listener
|
||||
go s.udpListen()
|
||||
// Start the line parser
|
||||
@@ -248,13 +250,13 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error {
|
||||
|
||||
// udpListen starts listening for udp packets on the configured port.
|
||||
func (s *Statsd) udpListen() error {
|
||||
defer s.wg.Done()
|
||||
var err error
|
||||
address, _ := net.ResolveUDPAddr("udp", s.ServiceAddress)
|
||||
s.listener, err = net.ListenUDP("udp", address)
|
||||
if err != nil {
|
||||
log.Fatalf("ERROR: ListenUDP - %s", err)
|
||||
}
|
||||
defer s.listener.Close()
|
||||
log.Println("Statsd listener listening on: ", s.listener.LocalAddr().String())
|
||||
|
||||
for {
|
||||
@@ -264,7 +266,7 @@ func (s *Statsd) udpListen() error {
|
||||
default:
|
||||
buf := make([]byte, s.UDPPacketSize)
|
||||
n, _, err := s.listener.ReadFromUDP(buf)
|
||||
if err != nil {
|
||||
if err != nil && !strings.Contains(err.Error(), "closed network") {
|
||||
log.Printf("ERROR READ: %s\n", err.Error())
|
||||
continue
|
||||
}
|
||||
@@ -282,6 +284,7 @@ func (s *Statsd) udpListen() error {
|
||||
// packet into statsd strings and then calls parseStatsdLine, which parses a
|
||||
// single statsd metric into a struct.
|
||||
func (s *Statsd) parser() error {
|
||||
defer s.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-s.done:
|
||||
@@ -561,8 +564,9 @@ func (s *Statsd) Stop() {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
log.Println("Stopping the statsd service")
|
||||
s.listener.Close()
|
||||
close(s.done)
|
||||
s.listener.Close()
|
||||
s.wg.Wait()
|
||||
close(s.in)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user