Ack delivery if it is unparseable in amqp_consumer input (#5286)

This commit is contained in:
Daniel Nelson 2019-01-15 11:48:36 -08:00 committed by GitHub
parent 42184fd1c8
commit 193aba8673
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 8 additions and 0 deletions

View File

@ -430,6 +430,14 @@ func (a *AMQPConsumer) process(ctx context.Context, msgs <-chan amqp.Delivery, a
func (a *AMQPConsumer) onMessage(acc telegraf.TrackingAccumulator, d amqp.Delivery) error {
metrics, err := a.parser.Parse(d.Body)
if err != nil {
// Discard the message from the queue; will never be able to process
// this message.
rejErr := d.Ack(false)
if rejErr != nil {
log.Printf("E! [inputs.amqp_consumer] Unable to reject message: %d: %v",
d.DeliveryTag, rejErr)
a.conn.Close()
}
return err
}