diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 12baeb982..41f244ec8 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -3036,6 +3036,8 @@ # url = "amqp://localhost:5672/influxdb" # ## AMQP exchange # exchange = "telegraf" +# ## Exchange passive mode +# exchange_passive = false # ## AMQP queue name # queue = "telegraf" # ## Binding Key diff --git a/plugins/inputs/amqp_consumer/README.md b/plugins/inputs/amqp_consumer/README.md index a14e2c8b0..c10a4410a 100644 --- a/plugins/inputs/amqp_consumer/README.md +++ b/plugins/inputs/amqp_consumer/README.md @@ -19,6 +19,8 @@ The following defaults are known to work with RabbitMQ: url = "amqp://localhost:5672/influxdb" ## AMQP exchange exchange = "telegraf" + ## Exchange passive mode + exchange_passive = false ## AMQP queue name queue = "telegraf" ## Binding Key diff --git a/plugins/inputs/amqp_consumer/amqp_consumer.go b/plugins/inputs/amqp_consumer/amqp_consumer.go index 48458a0b7..6ed79f6c4 100644 --- a/plugins/inputs/amqp_consumer/amqp_consumer.go +++ b/plugins/inputs/amqp_consumer/amqp_consumer.go @@ -20,6 +20,8 @@ type AMQPConsumer struct { URL string // AMQP exchange Exchange string + // Exchange passive mode + ExchangePassive bool // Queue Name Queue string // Binding Key @@ -58,6 +60,8 @@ func (a *AMQPConsumer) SampleConfig() string { url = "amqp://localhost:5672/influxdb" ## AMQP exchange exchange = "telegraf" + ## Exchange passive mode + exchange_passive = false ## AMQP queue name queue = "telegraf" ## Binding Key @@ -174,15 +178,27 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err return nil, fmt.Errorf("Failed to open a channel: %s", err) } - err = ch.ExchangeDeclare( - a.Exchange, // name - "topic", // type - true, // durable - false, // auto-deleted - false, // internal - false, // no-wait - nil, // arguments - ) + if a.ExchangePassive == true { + err = ch.ExchangeDeclarePassive( + a.Exchange, // name + "topic", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + } else { + err = ch.ExchangeDeclare( + a.Exchange, // name + "topic", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + } if err != nil { return nil, fmt.Errorf("Failed to declare an exchange: %s", err) }