Close UDP listener in udp_listener plugin

also adding waitgroups to udp_listener and statsd plugins to verify that
all goroutines have been cleaned up before Stop() exits.

closes #869
This commit is contained in:
Cameron Sparr 2016-03-17 10:16:12 -06:00 committed by Michele Fadda
parent 55e1d1d5ee
commit e4248bd7a6
3 changed files with 27 additions and 8 deletions

View File

@ -50,6 +50,7 @@ type Statsd struct {
UDPPacketSize int `toml:"udp_packet_size"` UDPPacketSize int `toml:"udp_packet_size"`
sync.Mutex sync.Mutex
wg sync.WaitGroup
// Channel for all incoming statsd packets // Channel for all incoming statsd packets
in chan []byte in chan []byte
@ -238,6 +239,7 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error {
s.sets = make(map[string]cachedset) s.sets = make(map[string]cachedset)
s.timings = make(map[string]cachedtimings) s.timings = make(map[string]cachedtimings)
s.wg.Add(2)
// Start the UDP listener // Start the UDP listener
go s.udpListen() go s.udpListen()
// Start the line parser // Start the line parser
@ -248,13 +250,13 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error {
// udpListen starts listening for udp packets on the configured port. // udpListen starts listening for udp packets on the configured port.
func (s *Statsd) udpListen() error { func (s *Statsd) udpListen() error {
defer s.wg.Done()
var err error var err error
address, _ := net.ResolveUDPAddr("udp", s.ServiceAddress) address, _ := net.ResolveUDPAddr("udp", s.ServiceAddress)
s.listener, err = net.ListenUDP("udp", address) s.listener, err = net.ListenUDP("udp", address)
if err != nil { if err != nil {
log.Fatalf("ERROR: ListenUDP - %s", err) log.Fatalf("ERROR: ListenUDP - %s", err)
} }
defer s.listener.Close()
log.Println("Statsd listener listening on: ", s.listener.LocalAddr().String()) log.Println("Statsd listener listening on: ", s.listener.LocalAddr().String())
for { for {
@ -264,7 +266,7 @@ func (s *Statsd) udpListen() error {
default: default:
buf := make([]byte, s.UDPPacketSize) buf := make([]byte, s.UDPPacketSize)
n, _, err := s.listener.ReadFromUDP(buf) n, _, err := s.listener.ReadFromUDP(buf)
if err != nil { if err != nil && !strings.Contains(err.Error(), "closed network") {
log.Printf("ERROR READ: %s\n", err.Error()) log.Printf("ERROR READ: %s\n", err.Error())
continue continue
} }
@ -282,6 +284,7 @@ func (s *Statsd) udpListen() error {
// packet into statsd strings and then calls parseStatsdLine, which parses a // packet into statsd strings and then calls parseStatsdLine, which parses a
// single statsd metric into a struct. // single statsd metric into a struct.
func (s *Statsd) parser() error { func (s *Statsd) parser() error {
defer s.wg.Done()
for { for {
select { select {
case <-s.done: case <-s.done:
@ -561,8 +564,9 @@ func (s *Statsd) Stop() {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
log.Println("Stopping the statsd service") log.Println("Stopping the statsd service")
s.listener.Close()
close(s.done) close(s.done)
s.listener.Close()
s.wg.Wait()
close(s.in) close(s.in)
} }

View File

@ -3,6 +3,7 @@ package udp_listener
import ( import (
"log" "log"
"net" "net"
"strings"
"sync" "sync"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
@ -14,7 +15,9 @@ type UdpListener struct {
ServiceAddress string ServiceAddress string
UDPPacketSize int `toml:"udp_packet_size"` UDPPacketSize int `toml:"udp_packet_size"`
AllowedPendingMessages int AllowedPendingMessages int
sync.Mutex sync.Mutex
wg sync.WaitGroup
in chan []byte in chan []byte
done chan struct{} done chan struct{}
@ -23,6 +26,8 @@ type UdpListener struct {
// Keep the accumulator in this struct // Keep the accumulator in this struct
acc telegraf.Accumulator acc telegraf.Accumulator
listener *net.UDPConn
} }
const UDP_PACKET_SIZE int = 1500 const UDP_PACKET_SIZE int = 1500
@ -76,6 +81,7 @@ func (u *UdpListener) Start(acc telegraf.Accumulator) error {
u.in = make(chan []byte, u.AllowedPendingMessages) u.in = make(chan []byte, u.AllowedPendingMessages)
u.done = make(chan struct{}) u.done = make(chan struct{})
u.wg.Add(2)
go u.udpListen() go u.udpListen()
go u.udpParser() go u.udpParser()
@ -87,18 +93,21 @@ func (u *UdpListener) Stop() {
u.Lock() u.Lock()
defer u.Unlock() defer u.Unlock()
close(u.done) close(u.done)
u.listener.Close()
u.wg.Wait()
close(u.in) close(u.in)
log.Println("Stopped UDP listener service on ", u.ServiceAddress) log.Println("Stopped UDP listener service on ", u.ServiceAddress)
} }
func (u *UdpListener) udpListen() error { func (u *UdpListener) udpListen() error {
defer u.wg.Done()
var err error
address, _ := net.ResolveUDPAddr("udp", u.ServiceAddress) address, _ := net.ResolveUDPAddr("udp", u.ServiceAddress)
listener, err := net.ListenUDP("udp", address) u.listener, err = net.ListenUDP("udp", address)
if err != nil { if err != nil {
log.Fatalf("ERROR: ListenUDP - %s", err) log.Fatalf("ERROR: ListenUDP - %s", err)
} }
defer listener.Close() log.Println("UDP server listening on: ", u.listener.LocalAddr().String())
log.Println("UDP server listening on: ", listener.LocalAddr().String())
for { for {
select { select {
@ -106,9 +115,10 @@ func (u *UdpListener) udpListen() error {
return nil return nil
default: default:
buf := make([]byte, u.UDPPacketSize) buf := make([]byte, u.UDPPacketSize)
n, _, err := listener.ReadFromUDP(buf) n, _, err := u.listener.ReadFromUDP(buf)
if err != nil { if err != nil && !strings.Contains(err.Error(), "closed network") {
log.Printf("ERROR: %s\n", err.Error()) log.Printf("ERROR: %s\n", err.Error())
continue
} }
select { select {
@ -121,6 +131,7 @@ func (u *UdpListener) udpListen() error {
} }
func (u *UdpListener) udpParser() error { func (u *UdpListener) udpParser() error {
defer u.wg.Done()
for { for {
select { select {
case <-u.done: case <-u.done:

View File

@ -32,6 +32,7 @@ func TestRunParser(t *testing.T) {
defer close(listener.done) defer close(listener.done)
listener.parser, _ = parsers.NewInfluxParser() listener.parser, _ = parsers.NewInfluxParser()
listener.wg.Add(1)
go listener.udpParser() go listener.udpParser()
in <- testmsg in <- testmsg
@ -58,6 +59,7 @@ func TestRunParserInvalidMsg(t *testing.T) {
defer close(listener.done) defer close(listener.done)
listener.parser, _ = parsers.NewInfluxParser() listener.parser, _ = parsers.NewInfluxParser()
listener.wg.Add(1)
go listener.udpParser() go listener.udpParser()
in <- testmsg in <- testmsg
@ -78,6 +80,7 @@ func TestRunParserGraphiteMsg(t *testing.T) {
defer close(listener.done) defer close(listener.done)
listener.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) listener.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
listener.wg.Add(1)
go listener.udpParser() go listener.udpParser()
in <- testmsg in <- testmsg
@ -98,6 +101,7 @@ func TestRunParserJSONMsg(t *testing.T) {
defer close(listener.done) defer close(listener.done)
listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil) listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil)
listener.wg.Add(1)
go listener.udpParser() go listener.udpParser()
in <- testmsg in <- testmsg