diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 2a1631750..5f54f4bb4 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -187,6 +187,7 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { m.state = Disconnected m.acc = acc.WithTracking(m.MaxUndeliveredMessages) + m.sem = make(semaphore, m.MaxUndeliveredMessages) m.ctx, m.cancel = context.WithCancel(context.Background()) m.client = m.clientFactory(m.opts) @@ -215,7 +216,6 @@ func (m *MQTTConsumer) connect() error { m.Log.Infof("Connected %v", m.Servers) m.state = Connected - m.sem = make(semaphore, m.MaxUndeliveredMessages) m.messages = make(map[telegraf.TrackingID]bool) // Presistent sessions should skip subscription if a session is present, as @@ -254,12 +254,12 @@ func (m *MQTTConsumer) recvMessage(c mqtt.Client, msg mqtt.Message) { for { select { case track := <-m.acc.Delivered(): + <-m.sem _, ok := m.messages[track.ID()] if !ok { // Added by a previous connection continue } - <-m.sem // No ack, MQTT does not support durable handling delete(m.messages, track.ID()) case m.sem <- empty{}: