Added onConnection and connectionLost Handlers
This commit is contained in:
		
							parent
							
								
									ce94e636bb
								
							
						
					
					
						commit
						8ec8ae0587
					
				|  | @ -3,6 +3,7 @@ package mqtt_consumer | ||||||
| import ( | import ( | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"log" | 	"log" | ||||||
|  | 	"strings" | ||||||
| 	"sync" | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
| 
 | 
 | ||||||
|  | @ -46,8 +47,8 @@ type MQTTConsumer struct { | ||||||
| 
 | 
 | ||||||
| 	// keep the accumulator internally:
 | 	// keep the accumulator internally:
 | ||||||
| 	acc telegraf.Accumulator | 	acc telegraf.Accumulator | ||||||
| 	 | 
 | ||||||
| 	started bool false | 	started bool | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| var sampleConfig = ` | var sampleConfig = ` | ||||||
|  | @ -102,6 +103,7 @@ func (m *MQTTConsumer) SetParser(parser parsers.Parser) { | ||||||
| func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { | func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { | ||||||
| 	m.Lock() | 	m.Lock() | ||||||
| 	defer m.Unlock() | 	defer m.Unlock() | ||||||
|  | 	m.started = false | ||||||
| 
 | 
 | ||||||
| 	if m.PersistentSession && m.ClientID == "" { | 	if m.PersistentSession && m.ClientID == "" { | ||||||
| 		return fmt.Errorf("ERROR MQTT Consumer: When using persistent_session" + | 		return fmt.Errorf("ERROR MQTT Consumer: When using persistent_session" + | ||||||
|  | @ -118,9 +120,6 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	opts.OnConnect = onConnect |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| 	m.client = mqtt.NewClient(opts) | 	m.client = mqtt.NewClient(opts) | ||||||
| 	if token := m.client.Connect(); token.Wait() && token.Error() != nil { | 	if token := m.client.Connect(); token.Wait() && token.Error() != nil { | ||||||
| 		return token.Error() | 		return token.Error() | ||||||
|  | @ -129,14 +128,12 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { | ||||||
| 	m.in = make(chan mqtt.Message, 1000) | 	m.in = make(chan mqtt.Message, 1000) | ||||||
| 	m.done = make(chan struct{}) | 	m.done = make(chan struct{}) | ||||||
| 
 | 
 | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| 	go m.receiver() | 	go m.receiver() | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  func onConnect(c *MQTT.Client) { | func (m *MQTTConsumer) onConnect(c mqtt.Client) { | ||||||
|  	if (!m.PersistentSession || !m.started) { | 	if !m.PersistentSession || !m.started { | ||||||
| 		topics := make(map[string]byte) | 		topics := make(map[string]byte) | ||||||
| 		for _, topic := range m.Topics { | 		for _, topic := range m.Topics { | ||||||
| 			topics[topic] = byte(m.QoS) | 			topics[topic] = byte(m.QoS) | ||||||
|  | @ -145,11 +142,18 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { | ||||||
| 		subscribeToken.Wait() | 		subscribeToken.Wait() | ||||||
| 		if subscribeToken.Error() != nil { | 		if subscribeToken.Error() != nil { | ||||||
| 			log.Printf("MQTT SUBSCRIBE ERROR\ntopics: %s\nerror: %s", | 			log.Printf("MQTT SUBSCRIBE ERROR\ntopics: %s\nerror: %s", | ||||||
| 				string(m.Topics), err.Error()) | 				strings.Join(m.Topics[:], ","), subscribeToken.Error()) | ||||||
| 		} | 		} | ||||||
| 		m.started = true; | 		m.started = true | ||||||
|  	} | 	} | ||||||
|  } | 	return | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (m *MQTTConsumer) onConnectionLost(c mqtt.Client, err error) { | ||||||
|  | 	log.Printf("MQTT Connection lost\nerror: %s\nClient should retry to reconnect", err.Error()) | ||||||
|  | 	return | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // receiver() reads all incoming messages from the consumer, and parses them into
 | // receiver() reads all incoming messages from the consumer, and parses them into
 | ||||||
| // influxdb metric points.
 | // influxdb metric points.
 | ||||||
| func (m *MQTTConsumer) receiver() { | func (m *MQTTConsumer) receiver() { | ||||||
|  | @ -231,6 +235,8 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { | ||||||
| 	opts.SetAutoReconnect(true) | 	opts.SetAutoReconnect(true) | ||||||
| 	opts.SetKeepAlive(time.Second * 60) | 	opts.SetKeepAlive(time.Second * 60) | ||||||
| 	opts.SetCleanSession(!m.PersistentSession) | 	opts.SetCleanSession(!m.PersistentSession) | ||||||
|  | 	opts.SetOnConnectHandler(m.onConnect) | ||||||
|  | 	opts.SetConnectionLostHandler(m.onConnectionLost) | ||||||
| 	return opts, nil | 	return opts, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue