From f5d400a1ce6356ce059cae15ab2d13ae1d6b002f Mon Sep 17 00:00:00 2001 From: DanKans Date: Mon, 11 Sep 2017 20:24:51 +0100 Subject: [PATCH] Fix MQTT input exits if Broker is not available on startup (#3202) --- plugins/inputs/mqtt_consumer/README.md | 2 + plugins/inputs/mqtt_consumer/mqtt_consumer.go | 59 ++++++++++++++----- .../mqtt_consumer/mqtt_consumer_test.go | 11 ++-- 3 files changed, 52 insertions(+), 20 deletions(-) diff --git a/plugins/inputs/mqtt_consumer/README.md b/plugins/inputs/mqtt_consumer/README.md index 0e8d540f3..c61545e8e 100644 --- a/plugins/inputs/mqtt_consumer/README.md +++ b/plugins/inputs/mqtt_consumer/README.md @@ -13,6 +13,8 @@ The plugin expects messages in the servers = ["localhost:1883"] ## MQTT QoS, must be 0, 1, or 2 qos = 0 + ## Connection timeout for initial connection in seconds + connection_timeout = 30 ## Topics to subscribe to topics = [ diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 1c75aaf65..3cd98baea 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -16,11 +16,12 @@ import ( ) type MQTTConsumer struct { - Servers []string - Topics []string - Username string - Password string - QoS int `toml:"qos"` + Servers []string + Topics []string + Username string + Password string + QoS int `toml:"qos"` + ConnectionTimeout internal.Duration `toml:"connection_timeout"` parser parsers.Parser @@ -48,13 +49,15 @@ type MQTTConsumer struct { // keep the accumulator internally: acc telegraf.Accumulator - started bool + connected bool } var sampleConfig = ` servers = ["localhost:1883"] ## MQTT QoS, must be 0, 1, or 2 qos = 0 + ## Connection timeout for initial connection in seconds + connection_timeout = 30 ## Topics to subscribe to topics = [ @@ -103,7 +106,7 @@ func (m *MQTTConsumer) SetParser(parser parsers.Parser) { func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { m.Lock() defer m.Unlock() - m.started = false + m.connected = false if m.PersistentSession && m.ClientID == "" { return fmt.Errorf("ERROR MQTT Consumer: When using persistent_session" + @@ -115,26 +118,40 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { return fmt.Errorf("MQTT Consumer, invalid QoS value: %d", m.QoS) } + if int(m.ConnectionTimeout.Duration) <= 0 { + return fmt.Errorf("MQTT Consumer, invalid connection_timeout value: %d", m.ConnectionTimeout) + } + opts, err := m.createOpts() if err != nil { return err } m.client = mqtt.NewClient(opts) - if token := m.client.Connect(); token.Wait() && token.Error() != nil { - return token.Error() - } - m.in = make(chan mqtt.Message, 1000) m.done = make(chan struct{}) + m.connect() + + return nil +} + +func (m *MQTTConsumer) connect() error { + if token := m.client.Connect(); token.Wait() && token.Error() != nil { + err := token.Error() + log.Printf("D! MQTT Consumer, connection error - %v", err) + + return err + } + go m.receiver() return nil } + func (m *MQTTConsumer) onConnect(c mqtt.Client) { log.Printf("I! MQTT Client Connected") - if !m.PersistentSession || !m.started { + if !m.PersistentSession || !m.connected { topics := make(map[string]byte) for _, topic := range m.Topics { topics[topic] = byte(m.QoS) @@ -145,7 +162,7 @@ func (m *MQTTConsumer) onConnect(c mqtt.Client) { m.acc.AddError(fmt.Errorf("E! MQTT Subscribe Error\ntopics: %s\nerror: %s", strings.Join(m.Topics[:], ","), subscribeToken.Error())) } - m.started = true + m.connected = true } return } @@ -186,18 +203,27 @@ func (m *MQTTConsumer) recvMessage(_ mqtt.Client, msg mqtt.Message) { func (m *MQTTConsumer) Stop() { m.Lock() defer m.Unlock() - close(m.done) - m.client.Disconnect(200) - m.started = false + + if m.connected { + close(m.done) + m.client.Disconnect(200) + m.connected = false + } } func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error { + if !m.connected { + m.connect() + } + return nil } func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { opts := mqtt.NewClientOptions() + opts.ConnectTimeout = m.ConnectionTimeout.Duration + if m.ClientID == "" { opts.SetClientID("Telegraf-Consumer-" + internal.RandomString(5)) } else { @@ -238,6 +264,7 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { opts.SetCleanSession(!m.PersistentSession) opts.SetOnConnectHandler(m.onConnect) opts.SetConnectionLostHandler(m.onConnectionLost) + return opts, nil } diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go index 027e4818b..eb5e3048c 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go @@ -22,11 +22,13 @@ const ( func newTestMQTTConsumer() (*MQTTConsumer, chan mqtt.Message) { in := make(chan mqtt.Message, 100) n := &MQTTConsumer{ - Topics: []string{"telegraf"}, - Servers: []string{"localhost:1883"}, - in: in, - done: make(chan struct{}), + Topics: []string{"telegraf"}, + Servers: []string{"localhost:1883"}, + in: in, + done: make(chan struct{}), + connected: true, } + return n, in } @@ -131,6 +133,7 @@ func TestRunParserAndGather(t *testing.T) { n, in := newTestMQTTConsumer() acc := testutil.Accumulator{} n.acc = &acc + defer close(n.done) n.parser, _ = parsers.NewInfluxParser()