Fix panic on connection loss with undelivered messages (#6806)
This commit is contained in:
parent
ef7fd9d030
commit
8b73625492
|
@ -187,6 +187,7 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
|
||||||
m.state = Disconnected
|
m.state = Disconnected
|
||||||
|
|
||||||
m.acc = acc.WithTracking(m.MaxUndeliveredMessages)
|
m.acc = acc.WithTracking(m.MaxUndeliveredMessages)
|
||||||
|
m.sem = make(semaphore, m.MaxUndeliveredMessages)
|
||||||
m.ctx, m.cancel = context.WithCancel(context.Background())
|
m.ctx, m.cancel = context.WithCancel(context.Background())
|
||||||
|
|
||||||
m.client = m.clientFactory(m.opts)
|
m.client = m.clientFactory(m.opts)
|
||||||
|
@ -215,7 +216,6 @@ func (m *MQTTConsumer) connect() error {
|
||||||
|
|
||||||
m.Log.Infof("Connected %v", m.Servers)
|
m.Log.Infof("Connected %v", m.Servers)
|
||||||
m.state = Connected
|
m.state = Connected
|
||||||
m.sem = make(semaphore, m.MaxUndeliveredMessages)
|
|
||||||
m.messages = make(map[telegraf.TrackingID]bool)
|
m.messages = make(map[telegraf.TrackingID]bool)
|
||||||
|
|
||||||
// Presistent sessions should skip subscription if a session is present, as
|
// Presistent sessions should skip subscription if a session is present, as
|
||||||
|
@ -254,12 +254,12 @@ func (m *MQTTConsumer) recvMessage(c mqtt.Client, msg mqtt.Message) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case track := <-m.acc.Delivered():
|
case track := <-m.acc.Delivered():
|
||||||
|
<-m.sem
|
||||||
_, ok := m.messages[track.ID()]
|
_, ok := m.messages[track.ID()]
|
||||||
if !ok {
|
if !ok {
|
||||||
// Added by a previous connection
|
// Added by a previous connection
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
<-m.sem
|
|
||||||
// No ack, MQTT does not support durable handling
|
// No ack, MQTT does not support durable handling
|
||||||
delete(m.messages, track.ID())
|
delete(m.messages, track.ID())
|
||||||
case m.sem <- empty{}:
|
case m.sem <- empty{}:
|
||||||
|
|
Loading…
Reference in New Issue