From 60c8f382be18b9e8a0c13e2e476d568e7e164edc Mon Sep 17 00:00:00 2001 From: Matthew Crenshaw <3420325+sgtsquiggs@users.noreply.github.com> Date: Tue, 6 Aug 2019 14:29:29 -0400 Subject: [PATCH] Fix reload panic in socket_listener input plugin (#6218) --- .../inputs/socket_listener/socket_listener.go | 27 ++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go index d29cff582..a127a0738 100644 --- a/plugins/inputs/socket_listener/socket_listener.go +++ b/plugins/inputs/socket_listener/socket_listener.go @@ -37,6 +37,8 @@ type streamSocketListener struct { func (ssl *streamSocketListener) listen() { ssl.connections = map[string]net.Conn{} + wg := sync.WaitGroup{} + for { c, err := ssl.Accept() if err != nil { @@ -67,7 +69,11 @@ func (ssl *streamSocketListener) listen() { ssl.AddError(fmt.Errorf("unable to configure keep alive (%s): %s", ssl.ServiceAddress, err)) } - go ssl.read(c) + wg.Add(1) + go func() { + defer wg.Done() + ssl.read(c) + }() } ssl.connectionsMtx.Lock() @@ -75,6 +81,8 @@ func (ssl *streamSocketListener) listen() { c.Close() } ssl.connectionsMtx.Unlock() + + wg.Wait() } func (ssl *streamSocketListener) setKeepAlive(c net.Conn) error { @@ -169,6 +177,8 @@ type SocketListener struct { SocketMode string `toml:"socket_mode"` tlsint.ServerConfig + wg sync.WaitGroup + parsers.Parser telegraf.Accumulator io.Closer @@ -302,7 +312,12 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error { } sl.Closer = ssl - go ssl.listen() + sl.wg = sync.WaitGroup{} + sl.wg.Add(1) + go func() { + defer sl.wg.Done() + ssl.listen() + }() case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram": pc, err := udpListen(protocol, addr) if err != nil { @@ -336,7 +351,12 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error { } sl.Closer = psl - go psl.listen() + sl.wg = sync.WaitGroup{} + sl.wg.Add(1) + go func() { + defer sl.wg.Done() + psl.listen() + }() default: return fmt.Errorf("unknown protocol '%s' in '%s'", protocol, sl.ServiceAddress) } @@ -378,6 +398,7 @@ func (sl *SocketListener) Stop() { sl.Close() sl.Closer = nil } + sl.wg.Wait() } func newSocketListener() *SocketListener {