Fix reload panic in socket_listener input plugin (#6218)
This commit is contained in:
parent
e65324d2c1
commit
60c8f382be
|
@ -37,6 +37,8 @@ type streamSocketListener struct {
|
|||
func (ssl *streamSocketListener) listen() {
|
||||
ssl.connections = map[string]net.Conn{}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
for {
|
||||
c, err := ssl.Accept()
|
||||
if err != nil {
|
||||
|
@ -67,7 +69,11 @@ func (ssl *streamSocketListener) listen() {
|
|||
ssl.AddError(fmt.Errorf("unable to configure keep alive (%s): %s", ssl.ServiceAddress, err))
|
||||
}
|
||||
|
||||
go ssl.read(c)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
ssl.read(c)
|
||||
}()
|
||||
}
|
||||
|
||||
ssl.connectionsMtx.Lock()
|
||||
|
@ -75,6 +81,8 @@ func (ssl *streamSocketListener) listen() {
|
|||
c.Close()
|
||||
}
|
||||
ssl.connectionsMtx.Unlock()
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (ssl *streamSocketListener) setKeepAlive(c net.Conn) error {
|
||||
|
@ -169,6 +177,8 @@ type SocketListener struct {
|
|||
SocketMode string `toml:"socket_mode"`
|
||||
tlsint.ServerConfig
|
||||
|
||||
wg sync.WaitGroup
|
||||
|
||||
parsers.Parser
|
||||
telegraf.Accumulator
|
||||
io.Closer
|
||||
|
@ -302,7 +312,12 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
|
|||
}
|
||||
|
||||
sl.Closer = ssl
|
||||
go ssl.listen()
|
||||
sl.wg = sync.WaitGroup{}
|
||||
sl.wg.Add(1)
|
||||
go func() {
|
||||
defer sl.wg.Done()
|
||||
ssl.listen()
|
||||
}()
|
||||
case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram":
|
||||
pc, err := udpListen(protocol, addr)
|
||||
if err != nil {
|
||||
|
@ -336,7 +351,12 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
|
|||
}
|
||||
|
||||
sl.Closer = psl
|
||||
go psl.listen()
|
||||
sl.wg = sync.WaitGroup{}
|
||||
sl.wg.Add(1)
|
||||
go func() {
|
||||
defer sl.wg.Done()
|
||||
psl.listen()
|
||||
}()
|
||||
default:
|
||||
return fmt.Errorf("unknown protocol '%s' in '%s'", protocol, sl.ServiceAddress)
|
||||
}
|
||||
|
@ -378,6 +398,7 @@ func (sl *SocketListener) Stop() {
|
|||
sl.Close()
|
||||
sl.Closer = nil
|
||||
}
|
||||
sl.wg.Wait()
|
||||
}
|
||||
|
||||
func newSocketListener() *SocketListener {
|
||||
|
|
Loading…
Reference in New Issue