Add passive mode exchange declaration option to amqp consumer input (#3995)

This commit is contained in:
Mike Gent 2018-06-03 17:52:59 -05:00 committed by Daniel Nelson
parent 986186a440
commit 850eff5246
3 changed files with 29 additions and 9 deletions

View File

@ -3036,6 +3036,8 @@
# url = "amqp://localhost:5672/influxdb" # url = "amqp://localhost:5672/influxdb"
# ## AMQP exchange # ## AMQP exchange
# exchange = "telegraf" # exchange = "telegraf"
# ## Exchange passive mode
# exchange_passive = false
# ## AMQP queue name # ## AMQP queue name
# queue = "telegraf" # queue = "telegraf"
# ## Binding Key # ## Binding Key

View File

@ -19,6 +19,8 @@ The following defaults are known to work with RabbitMQ:
url = "amqp://localhost:5672/influxdb" url = "amqp://localhost:5672/influxdb"
## AMQP exchange ## AMQP exchange
exchange = "telegraf" exchange = "telegraf"
## Exchange passive mode
exchange_passive = false
## AMQP queue name ## AMQP queue name
queue = "telegraf" queue = "telegraf"
## Binding Key ## Binding Key

View File

@ -20,6 +20,8 @@ type AMQPConsumer struct {
URL string URL string
// AMQP exchange // AMQP exchange
Exchange string Exchange string
// Exchange passive mode
ExchangePassive bool
// Queue Name // Queue Name
Queue string Queue string
// Binding Key // Binding Key
@ -58,6 +60,8 @@ func (a *AMQPConsumer) SampleConfig() string {
url = "amqp://localhost:5672/influxdb" url = "amqp://localhost:5672/influxdb"
## AMQP exchange ## AMQP exchange
exchange = "telegraf" exchange = "telegraf"
## Exchange passive mode
exchange_passive = false
## AMQP queue name ## AMQP queue name
queue = "telegraf" queue = "telegraf"
## Binding Key ## Binding Key
@ -174,6 +178,17 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err
return nil, fmt.Errorf("Failed to open a channel: %s", err) return nil, fmt.Errorf("Failed to open a channel: %s", err)
} }
if a.ExchangePassive == true {
err = ch.ExchangeDeclarePassive(
a.Exchange, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
} else {
err = ch.ExchangeDeclare( err = ch.ExchangeDeclare(
a.Exchange, // name a.Exchange, // name
"topic", // type "topic", // type
@ -183,6 +198,7 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err
false, // no-wait false, // no-wait
nil, // arguments nil, // arguments
) )
}
if err != nil { if err != nil {
return nil, fmt.Errorf("Failed to declare an exchange: %s", err) return nil, fmt.Errorf("Failed to declare an exchange: %s", err)
} }