Resubscribe if not using persistent sessions

This commit is contained in:
Pascal Larin 2016-04-12 21:43:25 -04:00
parent 3648c85e16
commit b151a79fa5
1 changed files with 16 additions and 10 deletions

View File

@ -46,6 +46,8 @@ type MQTTConsumer struct {
// keep the accumulator internally: // keep the accumulator internally:
acc telegraf.Accumulator acc telegraf.Accumulator
started bool false
} }
var sampleConfig = ` var sampleConfig = `
@ -134,16 +136,19 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
return nil return nil
} }
func onConnect(c *MQTT.Client) { func onConnect(c *MQTT.Client) {
topics := make(map[string]byte) if (!m.PersistentSession || !m.started) {
for _, topic := range m.Topics { topics := make(map[string]byte)
topics[topic] = byte(m.QoS) for _, topic := range m.Topics {
} topics[topic] = byte(m.QoS)
subscribeToken := c.SubscribeMultiple(topics, m.recvMessage) }
subscribeToken.Wait() subscribeToken := c.SubscribeMultiple(topics, m.recvMessage)
if subscribeToken.Error() != nil { subscribeToken.Wait()
log.Printf("MQTT SUBSCRIBE ERROR\ntopics: %s\nerror: %s", if subscribeToken.Error() != nil {
string(m.Topics), err.Error()) log.Printf("MQTT SUBSCRIBE ERROR\ntopics: %s\nerror: %s",
} string(m.Topics), err.Error())
}
m.started = true;
}
} }
// receiver() reads all incoming messages from the consumer, and parses them into // receiver() reads all incoming messages from the consumer, and parses them into
// influxdb metric points. // influxdb metric points.
@ -178,6 +183,7 @@ func (m *MQTTConsumer) Stop() {
defer m.Unlock() defer m.Unlock()
close(m.done) close(m.done)
m.client.Disconnect(200) m.client.Disconnect(200)
m.started = false
} }
func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error { func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error {