Resubscribe if not using persistent sessions
This commit is contained in:
parent
21c7378b61
commit
ce94e636bb
|
@ -46,6 +46,8 @@ type MQTTConsumer struct {
|
|||
|
||||
// keep the accumulator internally:
|
||||
acc telegraf.Accumulator
|
||||
|
||||
started bool false
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
|
@ -134,16 +136,19 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
|
|||
return nil
|
||||
}
|
||||
func onConnect(c *MQTT.Client) {
|
||||
topics := make(map[string]byte)
|
||||
for _, topic := range m.Topics {
|
||||
topics[topic] = byte(m.QoS)
|
||||
}
|
||||
subscribeToken := c.SubscribeMultiple(topics, m.recvMessage)
|
||||
subscribeToken.Wait()
|
||||
if subscribeToken.Error() != nil {
|
||||
log.Printf("MQTT SUBSCRIBE ERROR\ntopics: %s\nerror: %s",
|
||||
string(m.Topics), err.Error())
|
||||
}
|
||||
if (!m.PersistentSession || !m.started) {
|
||||
topics := make(map[string]byte)
|
||||
for _, topic := range m.Topics {
|
||||
topics[topic] = byte(m.QoS)
|
||||
}
|
||||
subscribeToken := c.SubscribeMultiple(topics, m.recvMessage)
|
||||
subscribeToken.Wait()
|
||||
if subscribeToken.Error() != nil {
|
||||
log.Printf("MQTT SUBSCRIBE ERROR\ntopics: %s\nerror: %s",
|
||||
string(m.Topics), err.Error())
|
||||
}
|
||||
m.started = true;
|
||||
}
|
||||
}
|
||||
// receiver() reads all incoming messages from the consumer, and parses them into
|
||||
// influxdb metric points.
|
||||
|
@ -178,6 +183,7 @@ func (m *MQTTConsumer) Stop() {
|
|||
defer m.Unlock()
|
||||
close(m.done)
|
||||
m.client.Disconnect(200)
|
||||
m.started = false
|
||||
}
|
||||
|
||||
func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error {
|
||||
|
|
Loading…
Reference in New Issue