Added onConnection and connectionLost Handlers

This commit is contained in:
chaton78 2016-04-13 22:58:45 -04:00
parent b151a79fa5
commit edf04d788a
1 changed files with 19 additions and 13 deletions

View File

@ -3,6 +3,7 @@ package mqtt_consumer
import ( import (
"fmt" "fmt"
"log" "log"
"strings"
"sync" "sync"
"time" "time"
@ -47,7 +48,7 @@ type MQTTConsumer struct {
// keep the accumulator internally: // keep the accumulator internally:
acc telegraf.Accumulator acc telegraf.Accumulator
started bool false started bool
} }
var sampleConfig = ` var sampleConfig = `
@ -102,6 +103,7 @@ func (m *MQTTConsumer) SetParser(parser parsers.Parser) {
func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
m.started = false
if m.PersistentSession && m.ClientID == "" { if m.PersistentSession && m.ClientID == "" {
return fmt.Errorf("ERROR MQTT Consumer: When using persistent_session" + return fmt.Errorf("ERROR MQTT Consumer: When using persistent_session" +
@ -118,9 +120,6 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
return err return err
} }
opts.OnConnect = onConnect
m.client = mqtt.NewClient(opts) m.client = mqtt.NewClient(opts)
if token := m.client.Connect(); token.Wait() && token.Error() != nil { if token := m.client.Connect(); token.Wait() && token.Error() != nil {
return token.Error() return token.Error()
@ -129,14 +128,12 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
m.in = make(chan mqtt.Message, 1000) m.in = make(chan mqtt.Message, 1000)
m.done = make(chan struct{}) m.done = make(chan struct{})
go m.receiver() go m.receiver()
return nil return nil
} }
func onConnect(c *MQTT.Client) { func (m *MQTTConsumer) onConnect(c mqtt.Client) {
if (!m.PersistentSession || !m.started) { if !m.PersistentSession || !m.started {
topics := make(map[string]byte) topics := make(map[string]byte)
for _, topic := range m.Topics { for _, topic := range m.Topics {
topics[topic] = byte(m.QoS) topics[topic] = byte(m.QoS)
@ -145,11 +142,18 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
subscribeToken.Wait() subscribeToken.Wait()
if subscribeToken.Error() != nil { if subscribeToken.Error() != nil {
log.Printf("MQTT SUBSCRIBE ERROR\ntopics: %s\nerror: %s", log.Printf("MQTT SUBSCRIBE ERROR\ntopics: %s\nerror: %s",
string(m.Topics), err.Error()) strings.Join(m.Topics[:], ","), subscribeToken.Error())
} }
m.started = true; m.started = true
} }
return
} }
func (m *MQTTConsumer) onConnectionLost(c mqtt.Client, err error) {
log.Printf("MQTT Connection lost\nerror: %s\nClient should retry to reconnect", err.Error())
return
}
// receiver() reads all incoming messages from the consumer, and parses them into // receiver() reads all incoming messages from the consumer, and parses them into
// influxdb metric points. // influxdb metric points.
func (m *MQTTConsumer) receiver() { func (m *MQTTConsumer) receiver() {
@ -231,6 +235,8 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
opts.SetAutoReconnect(true) opts.SetAutoReconnect(true)
opts.SetKeepAlive(time.Second * 60) opts.SetKeepAlive(time.Second * 60)
opts.SetCleanSession(!m.PersistentSession) opts.SetCleanSession(!m.PersistentSession)
opts.SetOnConnectHandler(m.onConnect)
opts.SetConnectionLostHandler(m.onConnectionLost)
return opts, nil return opts, nil
} }