diff --git a/plugins/inputs/amqp_consumer/README.md b/plugins/inputs/amqp_consumer/README.md index bc42f9107..133531421 100644 --- a/plugins/inputs/amqp_consumer/README.md +++ b/plugins/inputs/amqp_consumer/README.md @@ -46,6 +46,10 @@ The following defaults are known to work with RabbitMQ: ## AMQP queue name queue = "telegraf" + + ## AMQP queue durability can be "transient" or "durable". + queue_durability = "durable" + ## Binding Key binding_key = "#" diff --git a/plugins/inputs/amqp_consumer/amqp_consumer.go b/plugins/inputs/amqp_consumer/amqp_consumer.go index 739ed76e4..33cd9971b 100644 --- a/plugins/inputs/amqp_consumer/amqp_consumer.go +++ b/plugins/inputs/amqp_consumer/amqp_consumer.go @@ -30,7 +30,9 @@ type AMQPConsumer struct { ExchangeArguments map[string]string `toml:"exchange_arguments"` // Queue Name - Queue string + Queue string `toml:"queue"` + QueueDurability string `toml:"queue_durability"` + // Binding Key BindingKey string `toml:"binding_key"` @@ -64,6 +66,8 @@ const ( DefaultExchangeType = "topic" DefaultExchangeDurability = "durable" + DefaultQueueDurability = "durable" + DefaultPrefetchCount = 50 ) @@ -98,10 +102,13 @@ func (a *AMQPConsumer) SampleConfig() string { # exchange_arguments = { } # exchange_arguments = {"hash_propery" = "timestamp"} - ## AMQP queue name + ## AMQP queue name. queue = "telegraf" - ## Binding Key + ## AMQP queue durability can be "transient" or "durable". + queue_durability = "durable" + + ## Binding Key. binding_key = "#" ## Maximum number of messages server should give to the worker. @@ -260,13 +267,21 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err return nil, err } + var queueDurable = true + switch a.QueueDurability { + case "transient": + queueDurable = false + default: + queueDurable = true + } + q, err := ch.QueueDeclare( - a.Queue, // queue - true, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments + a.Queue, // queue + queueDurable, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments ) if err != nil { return nil, fmt.Errorf("Failed to declare a queue: %s", err) @@ -380,6 +395,7 @@ func init() { AuthMethod: DefaultAuthMethod, ExchangeType: DefaultExchangeType, ExchangeDurability: DefaultExchangeDurability, + QueueDurability: DefaultQueueDurability, PrefetchCount: DefaultPrefetchCount, } })