Reconnect AMQP consumer to broker (#3947)

This commit is contained in:
rabhis 2018-03-28 21:00:56 +00:00 committed by Daniel Nelson
parent 8a73dc05c0
commit 5be1198274
1 changed files with 16 additions and 14 deletions

View File

@ -145,23 +145,25 @@ func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error {
go a.process(msgs, acc) go a.process(msgs, acc)
go func() { go func() {
err := <-a.conn.NotifyClose(make(chan *amqp.Error))
if err == nil {
return
}
log.Printf("I! AMQP consumer connection closed: %s; trying to reconnect", err)
for { for {
msgs, err := a.connect(amqpConf) err := <-a.conn.NotifyClose(make(chan *amqp.Error))
if err != nil { if err == nil {
log.Printf("E! AMQP connection failed: %s", err) break
time.Sleep(10 * time.Second)
continue
} }
a.wg.Add(1) log.Printf("I! AMQP consumer connection closed: %s; trying to reconnect", err)
go a.process(msgs, acc) for {
break msgs, err := a.connect(amqpConf)
if err != nil {
log.Printf("E! AMQP connection failed: %s", err)
time.Sleep(10 * time.Second)
continue
}
a.wg.Add(1)
go a.process(msgs, acc)
break
}
} }
}() }()