diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index c64d2139b..0cba92eb6 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -116,6 +116,9 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { return err } + opts.OnConnect = onConnect + + m.client = mqtt.NewClient(opts) if token := m.client.Connect(); token.Wait() && token.Error() != nil { return token.Error() @@ -124,21 +127,24 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { m.in = make(chan mqtt.Message, 1000) m.done = make(chan struct{}) - topics := make(map[string]byte) - for _, topic := range m.Topics { - topics[topic] = byte(m.QoS) - } - subscribeToken := m.client.SubscribeMultiple(topics, m.recvMessage) - subscribeToken.Wait() - if subscribeToken.Error() != nil { - return subscribeToken.Error() - } + go m.receiver() return nil } - + func onConnect(c *MQTT.Client) { + topics := make(map[string]byte) + for _, topic := range m.Topics { + topics[topic] = byte(m.QoS) + } + subscribeToken := c.SubscribeMultiple(topics, m.recvMessage) + subscribeToken.Wait() + if subscribeToken.Error() != nil { + log.Printf("MQTT SUBSCRIBE ERROR\ntopics: %s\nerror: %s", + string(m.Topics), err.Error()) + } + } // receiver() reads all incoming messages from the consumer, and parses them into // influxdb metric points. func (m *MQTTConsumer) receiver() {