From df0bded83e12f0c14b8cb07553403b347f599e4d Mon Sep 17 00:00:00 2001 From: rabhis Date: Wed, 28 Mar 2018 21:00:56 +0000 Subject: [PATCH] Reconnect AMQP consumer to broker (#3947) --- plugins/inputs/amqp_consumer/amqp_consumer.go | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/plugins/inputs/amqp_consumer/amqp_consumer.go b/plugins/inputs/amqp_consumer/amqp_consumer.go index ee1b9aee1..c96272fa7 100644 --- a/plugins/inputs/amqp_consumer/amqp_consumer.go +++ b/plugins/inputs/amqp_consumer/amqp_consumer.go @@ -145,23 +145,25 @@ func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error { go a.process(msgs, acc) go func() { - err := <-a.conn.NotifyClose(make(chan *amqp.Error)) - if err == nil { - return - } - - log.Printf("I! AMQP consumer connection closed: %s; trying to reconnect", err) for { - msgs, err := a.connect(amqpConf) - if err != nil { - log.Printf("E! AMQP connection failed: %s", err) - time.Sleep(10 * time.Second) - continue + err := <-a.conn.NotifyClose(make(chan *amqp.Error)) + if err == nil { + break } - a.wg.Add(1) - go a.process(msgs, acc) - break + log.Printf("I! AMQP consumer connection closed: %s; trying to reconnect", err) + for { + msgs, err := a.connect(amqpConf) + if err != nil { + log.Printf("E! AMQP connection failed: %s", err) + time.Sleep(10 * time.Second) + continue + } + + a.wg.Add(1) + go a.process(msgs, acc) + break + } } }()