Add queue_durability parameter to amqp_consumer input (#4628)

This commit is contained in:
Andrew 2018-09-05 23:27:52 +02:00 committed by Daniel Nelson
parent 13029a1fa4
commit a47149765e
2 changed files with 29 additions and 9 deletions

View File

@ -46,6 +46,10 @@ The following defaults are known to work with RabbitMQ:
## AMQP queue name ## AMQP queue name
queue = "telegraf" queue = "telegraf"
## AMQP queue durability can be "transient" or "durable".
queue_durability = "durable"
## Binding Key ## Binding Key
binding_key = "#" binding_key = "#"

View File

@ -30,7 +30,9 @@ type AMQPConsumer struct {
ExchangeArguments map[string]string `toml:"exchange_arguments"` ExchangeArguments map[string]string `toml:"exchange_arguments"`
// Queue Name // Queue Name
Queue string Queue string `toml:"queue"`
QueueDurability string `toml:"queue_durability"`
// Binding Key // Binding Key
BindingKey string `toml:"binding_key"` BindingKey string `toml:"binding_key"`
@ -64,6 +66,8 @@ const (
DefaultExchangeType = "topic" DefaultExchangeType = "topic"
DefaultExchangeDurability = "durable" DefaultExchangeDurability = "durable"
DefaultQueueDurability = "durable"
DefaultPrefetchCount = 50 DefaultPrefetchCount = 50
) )
@ -98,10 +102,13 @@ func (a *AMQPConsumer) SampleConfig() string {
# exchange_arguments = { } # exchange_arguments = { }
# exchange_arguments = {"hash_propery" = "timestamp"} # exchange_arguments = {"hash_propery" = "timestamp"}
## AMQP queue name ## AMQP queue name.
queue = "telegraf" queue = "telegraf"
## Binding Key ## AMQP queue durability can be "transient" or "durable".
queue_durability = "durable"
## Binding Key.
binding_key = "#" binding_key = "#"
## Maximum number of messages server should give to the worker. ## Maximum number of messages server should give to the worker.
@ -260,9 +267,17 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err
return nil, err return nil, err
} }
var queueDurable = true
switch a.QueueDurability {
case "transient":
queueDurable = false
default:
queueDurable = true
}
q, err := ch.QueueDeclare( q, err := ch.QueueDeclare(
a.Queue, // queue a.Queue, // queue
true, // durable queueDurable, // durable
false, // delete when unused false, // delete when unused
false, // exclusive false, // exclusive
false, // no-wait false, // no-wait
@ -380,6 +395,7 @@ func init() {
AuthMethod: DefaultAuthMethod, AuthMethod: DefaultAuthMethod,
ExchangeType: DefaultExchangeType, ExchangeType: DefaultExchangeType,
ExchangeDurability: DefaultExchangeDurability, ExchangeDurability: DefaultExchangeDurability,
QueueDurability: DefaultQueueDurability,
PrefetchCount: DefaultPrefetchCount, PrefetchCount: DefaultPrefetchCount,
} }
}) })