From 8ec8ae0587f17484f40fa2f6e36fc5d3ff5531b2 Mon Sep 17 00:00:00 2001 From: chaton78 Date: Wed, 13 Apr 2016 22:58:45 -0400 Subject: [PATCH] Added onConnection and connectionLost Handlers --- plugins/inputs/mqtt_consumer/mqtt_consumer.go | 32 +++++++++++-------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 72e0d3c19..9cb420a42 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -3,6 +3,7 @@ package mqtt_consumer import ( "fmt" "log" + "strings" "sync" "time" @@ -46,8 +47,8 @@ type MQTTConsumer struct { // keep the accumulator internally: acc telegraf.Accumulator - - started bool false + + started bool } var sampleConfig = ` @@ -102,6 +103,7 @@ func (m *MQTTConsumer) SetParser(parser parsers.Parser) { func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { m.Lock() defer m.Unlock() + m.started = false if m.PersistentSession && m.ClientID == "" { return fmt.Errorf("ERROR MQTT Consumer: When using persistent_session" + @@ -118,9 +120,6 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { return err } - opts.OnConnect = onConnect - - m.client = mqtt.NewClient(opts) if token := m.client.Connect(); token.Wait() && token.Error() != nil { return token.Error() @@ -129,14 +128,12 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { m.in = make(chan mqtt.Message, 1000) m.done = make(chan struct{}) - - go m.receiver() return nil } - func onConnect(c *MQTT.Client) { - if (!m.PersistentSession || !m.started) { +func (m *MQTTConsumer) onConnect(c mqtt.Client) { + if !m.PersistentSession || !m.started { topics := make(map[string]byte) for _, topic := range m.Topics { topics[topic] = byte(m.QoS) @@ -145,11 +142,18 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { subscribeToken.Wait() if subscribeToken.Error() != nil { log.Printf("MQTT SUBSCRIBE ERROR\ntopics: %s\nerror: %s", - string(m.Topics), err.Error()) + strings.Join(m.Topics[:], ","), subscribeToken.Error()) } - m.started = true; - } - } + m.started = true + } + return +} + +func (m *MQTTConsumer) onConnectionLost(c mqtt.Client, err error) { + log.Printf("MQTT Connection lost\nerror: %s\nClient should retry to reconnect", err.Error()) + return +} + // receiver() reads all incoming messages from the consumer, and parses them into // influxdb metric points. func (m *MQTTConsumer) receiver() { @@ -231,6 +235,8 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { opts.SetAutoReconnect(true) opts.SetKeepAlive(time.Second * 60) opts.SetCleanSession(!m.PersistentSession) + opts.SetOnConnectHandler(m.onConnect) + opts.SetConnectionLostHandler(m.onConnectionLost) return opts, nil }