diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 0cba92eb6..72e0d3c19 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -46,6 +46,8 @@ type MQTTConsumer struct { // keep the accumulator internally: acc telegraf.Accumulator + + started bool false } var sampleConfig = ` @@ -134,16 +136,19 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { return nil } func onConnect(c *MQTT.Client) { - topics := make(map[string]byte) - for _, topic := range m.Topics { - topics[topic] = byte(m.QoS) - } - subscribeToken := c.SubscribeMultiple(topics, m.recvMessage) - subscribeToken.Wait() - if subscribeToken.Error() != nil { - log.Printf("MQTT SUBSCRIBE ERROR\ntopics: %s\nerror: %s", - string(m.Topics), err.Error()) - } + if (!m.PersistentSession || !m.started) { + topics := make(map[string]byte) + for _, topic := range m.Topics { + topics[topic] = byte(m.QoS) + } + subscribeToken := c.SubscribeMultiple(topics, m.recvMessage) + subscribeToken.Wait() + if subscribeToken.Error() != nil { + log.Printf("MQTT SUBSCRIBE ERROR\ntopics: %s\nerror: %s", + string(m.Topics), err.Error()) + } + m.started = true; + } } // receiver() reads all incoming messages from the consumer, and parses them into // influxdb metric points. @@ -178,6 +183,7 @@ func (m *MQTTConsumer) Stop() { defer m.Unlock() close(m.done) m.client.Disconnect(200) + m.started = false } func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error {