From a178aab9402121fbfac62a09695de3dfdffb1b40 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Thu, 17 Mar 2016 10:16:12 -0600 Subject: [PATCH] Close UDP listener in udp_listener plugin also adding waitgroups to udp_listener and statsd plugins to verify that all goroutines have been cleaned up before Stop() exits. closes #869 --- plugins/inputs/statsd/statsd.go | 10 ++++++--- plugins/inputs/udp_listener/udp_listener.go | 21 ++++++++++++++----- .../inputs/udp_listener/udp_listener_test.go | 4 ++++ 3 files changed, 27 insertions(+), 8 deletions(-) diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index ba605baa4..943188353 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -50,6 +50,7 @@ type Statsd struct { UDPPacketSize int `toml:"udp_packet_size"` sync.Mutex + wg sync.WaitGroup // Channel for all incoming statsd packets in chan []byte @@ -238,6 +239,7 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error { s.sets = make(map[string]cachedset) s.timings = make(map[string]cachedtimings) + s.wg.Add(2) // Start the UDP listener go s.udpListen() // Start the line parser @@ -248,13 +250,13 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error { // udpListen starts listening for udp packets on the configured port. func (s *Statsd) udpListen() error { + defer s.wg.Done() var err error address, _ := net.ResolveUDPAddr("udp", s.ServiceAddress) s.listener, err = net.ListenUDP("udp", address) if err != nil { log.Fatalf("ERROR: ListenUDP - %s", err) } - defer s.listener.Close() log.Println("Statsd listener listening on: ", s.listener.LocalAddr().String()) for { @@ -264,7 +266,7 @@ func (s *Statsd) udpListen() error { default: buf := make([]byte, s.UDPPacketSize) n, _, err := s.listener.ReadFromUDP(buf) - if err != nil { + if err != nil && !strings.Contains(err.Error(), "closed network") { log.Printf("ERROR READ: %s\n", err.Error()) continue } @@ -282,6 +284,7 @@ func (s *Statsd) udpListen() error { // packet into statsd strings and then calls parseStatsdLine, which parses a // single statsd metric into a struct. func (s *Statsd) parser() error { + defer s.wg.Done() for { select { case <-s.done: @@ -561,8 +564,9 @@ func (s *Statsd) Stop() { s.Lock() defer s.Unlock() log.Println("Stopping the statsd service") - s.listener.Close() close(s.done) + s.listener.Close() + s.wg.Wait() close(s.in) } diff --git a/plugins/inputs/udp_listener/udp_listener.go b/plugins/inputs/udp_listener/udp_listener.go index 7aac3160c..4b362c478 100644 --- a/plugins/inputs/udp_listener/udp_listener.go +++ b/plugins/inputs/udp_listener/udp_listener.go @@ -3,6 +3,7 @@ package udp_listener import ( "log" "net" + "strings" "sync" "github.com/influxdata/telegraf" @@ -14,7 +15,9 @@ type UdpListener struct { ServiceAddress string UDPPacketSize int `toml:"udp_packet_size"` AllowedPendingMessages int + sync.Mutex + wg sync.WaitGroup in chan []byte done chan struct{} @@ -23,6 +26,8 @@ type UdpListener struct { // Keep the accumulator in this struct acc telegraf.Accumulator + + listener *net.UDPConn } const UDP_PACKET_SIZE int = 1500 @@ -76,6 +81,7 @@ func (u *UdpListener) Start(acc telegraf.Accumulator) error { u.in = make(chan []byte, u.AllowedPendingMessages) u.done = make(chan struct{}) + u.wg.Add(2) go u.udpListen() go u.udpParser() @@ -87,18 +93,21 @@ func (u *UdpListener) Stop() { u.Lock() defer u.Unlock() close(u.done) + u.listener.Close() + u.wg.Wait() close(u.in) log.Println("Stopped UDP listener service on ", u.ServiceAddress) } func (u *UdpListener) udpListen() error { + defer u.wg.Done() + var err error address, _ := net.ResolveUDPAddr("udp", u.ServiceAddress) - listener, err := net.ListenUDP("udp", address) + u.listener, err = net.ListenUDP("udp", address) if err != nil { log.Fatalf("ERROR: ListenUDP - %s", err) } - defer listener.Close() - log.Println("UDP server listening on: ", listener.LocalAddr().String()) + log.Println("UDP server listening on: ", u.listener.LocalAddr().String()) for { select { @@ -106,9 +115,10 @@ func (u *UdpListener) udpListen() error { return nil default: buf := make([]byte, u.UDPPacketSize) - n, _, err := listener.ReadFromUDP(buf) - if err != nil { + n, _, err := u.listener.ReadFromUDP(buf) + if err != nil && !strings.Contains(err.Error(), "closed network") { log.Printf("ERROR: %s\n", err.Error()) + continue } select { @@ -121,6 +131,7 @@ func (u *UdpListener) udpListen() error { } func (u *UdpListener) udpParser() error { + defer u.wg.Done() for { select { case <-u.done: diff --git a/plugins/inputs/udp_listener/udp_listener_test.go b/plugins/inputs/udp_listener/udp_listener_test.go index 2f0f6fae5..bdbab318b 100644 --- a/plugins/inputs/udp_listener/udp_listener_test.go +++ b/plugins/inputs/udp_listener/udp_listener_test.go @@ -32,6 +32,7 @@ func TestRunParser(t *testing.T) { defer close(listener.done) listener.parser, _ = parsers.NewInfluxParser() + listener.wg.Add(1) go listener.udpParser() in <- testmsg @@ -58,6 +59,7 @@ func TestRunParserInvalidMsg(t *testing.T) { defer close(listener.done) listener.parser, _ = parsers.NewInfluxParser() + listener.wg.Add(1) go listener.udpParser() in <- testmsg @@ -78,6 +80,7 @@ func TestRunParserGraphiteMsg(t *testing.T) { defer close(listener.done) listener.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) + listener.wg.Add(1) go listener.udpParser() in <- testmsg @@ -98,6 +101,7 @@ func TestRunParserJSONMsg(t *testing.T) { defer close(listener.done) listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil) + listener.wg.Add(1) go listener.udpParser() in <- testmsg