Handle onConnect

This commit is contained in:
Pascal Larin 2016-04-12 12:36:43 -04:00
parent f5878eafb9
commit 3648c85e16
1 changed files with 16 additions and 10 deletions

View File

@ -116,6 +116,9 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
return err
}
opts.OnConnect = onConnect
m.client = mqtt.NewClient(opts)
if token := m.client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
@ -124,21 +127,24 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
m.in = make(chan mqtt.Message, 1000)
m.done = make(chan struct{})
topics := make(map[string]byte)
for _, topic := range m.Topics {
topics[topic] = byte(m.QoS)
}
subscribeToken := m.client.SubscribeMultiple(topics, m.recvMessage)
subscribeToken.Wait()
if subscribeToken.Error() != nil {
return subscribeToken.Error()
}
go m.receiver()
return nil
}
func onConnect(c *MQTT.Client) {
topics := make(map[string]byte)
for _, topic := range m.Topics {
topics[topic] = byte(m.QoS)
}
subscribeToken := c.SubscribeMultiple(topics, m.recvMessage)
subscribeToken.Wait()
if subscribeToken.Error() != nil {
log.Printf("MQTT SUBSCRIBE ERROR\ntopics: %s\nerror: %s",
string(m.Topics), err.Error())
}
}
// receiver() reads all incoming messages from the consumer, and parses them into
// influxdb metric points.
func (m *MQTTConsumer) receiver() {