diff --git a/plugins/inputs/amqp_consumer/README.md b/plugins/inputs/amqp_consumer/README.md index c10a4410a..53d97d080 100644 --- a/plugins/inputs/amqp_consumer/README.md +++ b/plugins/inputs/amqp_consumer/README.md @@ -17,23 +17,36 @@ The following defaults are known to work with RabbitMQ: [[inputs.amqp_consumer]] ## AMQP url url = "amqp://localhost:5672/influxdb" - ## AMQP exchange + + ## Exchange to declare and consume from. exchange = "telegraf" - ## Exchange passive mode - exchange_passive = false + + ## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash". + # exchange_type = "topic" + + ## If true, exchange will be passively declared. + # exchange_passive = false + + ## Exchange durability can be either "transient" or "durable". + # exchange_durability = "durable" + + ## Additional exchange arguments. + # exchange_args = { } + # exchange_args = {"hash_propery" = "timestamp"} + ## AMQP queue name queue = "telegraf" ## Binding Key binding_key = "#" - ## Controls how many messages the server will try to keep on the network - ## for consumers before receiving delivery acks. - #prefetch_count = 50 + ## Maximum number of messages server should give to the worker. + prefetch_count = 50 - ## Auth method. PLAIN and EXTERNAL are supported. + ## Auth method. PLAIN and EXTERNAL are supported ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as ## described here: https://www.rabbitmq.com/plugins.html # auth_method = "PLAIN" + ## Optional TLS Config # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" diff --git a/plugins/inputs/amqp_consumer/amqp_consumer.go b/plugins/inputs/amqp_consumer/amqp_consumer.go index 6ed79f6c4..ad0e3ce7b 100644 --- a/plugins/inputs/amqp_consumer/amqp_consumer.go +++ b/plugins/inputs/amqp_consumer/amqp_consumer.go @@ -18,10 +18,13 @@ import ( // AMQPConsumer is the top level struct for this plugin type AMQPConsumer struct { URL string - // AMQP exchange - Exchange string - // Exchange passive mode - ExchangePassive bool + + Exchange string `toml:"exchange"` + ExchangeType string `toml:"exchange_type"` + ExchangeDurability string `toml:"exchange_durability"` + ExchangePassive bool `toml:"exchange_passive"` + ExchangeArguments map[string]string `toml:"exchange_arguments"` + // Queue Name Queue string // Binding Key @@ -50,7 +53,11 @@ func (a *externalAuth) Response() string { } const ( - DefaultAuthMethod = "PLAIN" + DefaultAuthMethod = "PLAIN" + + DefaultExchangeType = "topic" + DefaultExchangeDurability = "durable" + DefaultPrefetchCount = 50 ) @@ -58,10 +65,23 @@ func (a *AMQPConsumer) SampleConfig() string { return ` ## AMQP url url = "amqp://localhost:5672/influxdb" - ## AMQP exchange + + ## Exchange to declare and consume from. exchange = "telegraf" - ## Exchange passive mode - exchange_passive = false + + ## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash". + # exchange_type = "topic" + + ## If true, exchange will be passively declared. + # exchange_passive = false + + ## Exchange durability can be either "transient" or "durable". + # exchange_durability = "durable" + + ## Additional exchange arguments. + # exchange_args = { } + # exchange_args = {"hash_propery" = "timestamp"} + ## AMQP queue name queue = "telegraf" ## Binding Key @@ -178,29 +198,28 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err return nil, fmt.Errorf("Failed to open a channel: %s", err) } - if a.ExchangePassive == true { - err = ch.ExchangeDeclarePassive( - a.Exchange, // name - "topic", // type - true, // durable - false, // auto-deleted - false, // internal - false, // no-wait - nil, // arguments - ) - } else { - err = ch.ExchangeDeclare( - a.Exchange, // name - "topic", // type - true, // durable - false, // auto-deleted - false, // internal - false, // no-wait - nil, // arguments - ) + var exchangeDurable = true + switch a.ExchangeDurability { + case "transient": + exchangeDurable = false + default: + exchangeDurable = true } + + exchangeArgs := make(amqp.Table, len(a.ExchangeArguments)) + for k, v := range a.ExchangeArguments { + exchangeArgs[k] = v + } + + err = declareExchange( + ch, + a.Exchange, + a.ExchangeType, + a.ExchangePassive, + exchangeDurable, + exchangeArgs) if err != nil { - return nil, fmt.Errorf("Failed to declare an exchange: %s", err) + return nil, err } q, err := ch.QueueDeclare( @@ -252,6 +271,42 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err return msgs, err } +func declareExchange( + channel *amqp.Channel, + exchangeName string, + exchangeType string, + exchangePassive bool, + exchangeDurable bool, + exchangeArguments amqp.Table, +) error { + var err error + if exchangePassive { + err = channel.ExchangeDeclarePassive( + exchangeName, + exchangeType, + exchangeDurable, + false, // delete when unused + false, // internal + false, // no-wait + exchangeArguments, + ) + } else { + err = channel.ExchangeDeclare( + exchangeName, + exchangeType, + exchangeDurable, + false, // delete when unused + false, // internal + false, // no-wait + exchangeArguments, + ) + } + if err != nil { + return fmt.Errorf("error declaring exchange: %v", err) + } + return nil +} + // Read messages from queue and add them to the Accumulator func (a *AMQPConsumer) process(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) { defer a.wg.Done() @@ -283,8 +338,10 @@ func (a *AMQPConsumer) Stop() { func init() { inputs.Add("amqp_consumer", func() telegraf.Input { return &AMQPConsumer{ - AuthMethod: DefaultAuthMethod, - PrefetchCount: DefaultPrefetchCount, + AuthMethod: DefaultAuthMethod, + ExchangeType: DefaultExchangeType, + ExchangeDurability: DefaultExchangeDurability, + PrefetchCount: DefaultPrefetchCount, } }) } diff --git a/plugins/outputs/amqp/README.md b/plugins/outputs/amqp/README.md index 52a4ccbd1..bb66a030a 100644 --- a/plugins/outputs/amqp/README.md +++ b/plugins/outputs/amqp/README.md @@ -21,13 +21,28 @@ For an introduction to AMQP see: ### Configuration: -``` +```toml # Configuration for the AMQP server to send metrics to [[outputs.amqp]] ## AMQP url url = "amqp://localhost:5672/influxdb" - ## AMQP exchange + + ## Exchange to declare and publish to. exchange = "telegraf" + + ## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash". + # exchange_type = "topic" + + ## If true, exchange will be passively declared. + # exchange_passive = false + + ## Exchange durability can be either "transient" or "durable". + # exchange_durability = "durable" + + ## Additional exchange arguments. + # exchange_args = { } + # exchange_args = {"hash_propery" = "timestamp"} + ## Auth method. PLAIN and EXTERNAL are supported ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as ## described here: https://www.rabbitmq.com/plugins.html @@ -40,7 +55,7 @@ For an introduction to AMQP see: routing_tag = "host" ## Delivery Mode controls if a published message is persistent ## Valid options are "transient" and "persistent". default: "transient" - # delivery_mode = "transient" + delivery_mode = "transient" ## InfluxDB retention policy # retention_policy = "default" diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go index bd3068eb8..8a3bcbd6c 100644 --- a/plugins/outputs/amqp/amqp.go +++ b/plugins/outputs/amqp/amqp.go @@ -26,8 +26,13 @@ type client struct { type AMQP struct { // AMQP brokers to send metrics to URL string - // AMQP exchange - Exchange string + + Exchange string `toml:"exchange"` + ExchangeType string `toml:"exchange_type"` + ExchangeDurability string `toml:"exchange_durability"` + ExchangePassive bool `toml:"exchange_passive"` + ExchangeArguments map[string]string `toml:"exchange_arguments"` + // AMQP Auth method AuthMethod string // Routing Key (static) @@ -65,7 +70,11 @@ func (a *externalAuth) Response() string { } const ( - DefaultAuthMethod = "PLAIN" + DefaultAuthMethod = "PLAIN" + + DefaultExchangeType = "topic" + DefaultExchangeDurability = "durable" + DefaultRetentionPolicy = "default" DefaultDatabase = "telegraf" ) @@ -73,8 +82,23 @@ const ( var sampleConfig = ` ## AMQP url url = "amqp://localhost:5672/influxdb" - ## AMQP exchange + + ## Exchange to declare and publish to. exchange = "telegraf" + + ## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash". + # exchange_type = "topic" + + ## If true, exchange will be passively declared. + # exchange_passive = false + + ## Exchange durability can be either "transient" or "durable". + # exchange_durability = "durable" + + ## Additional exchange arguments. + # exchange_args = { } + # exchange_args = {"hash_propery" = "timestamp"} + ## Auth method. PLAIN and EXTERNAL are supported ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as ## described here: https://www.rabbitmq.com/plugins.html @@ -166,17 +190,28 @@ func (q *AMQP) Connect() error { return fmt.Errorf("Failed to open a channel: %s", err) } - err = channel.ExchangeDeclare( - q.Exchange, // name - "topic", // type - true, // durable - false, // delete when unused - false, // internal - false, // no-wait - nil, // arguments - ) + var exchangeDurable = true + switch q.ExchangeDurability { + case "transient": + exchangeDurable = false + default: + exchangeDurable = true + } + + exchangeArgs := make(amqp.Table, len(q.ExchangeArguments)) + for k, v := range q.ExchangeArguments { + exchangeArgs[k] = v + } + + err = declareExchange( + channel, + q.Exchange, + q.ExchangeType, + q.ExchangePassive, + exchangeDurable, + exchangeArgs) if err != nil { - return fmt.Errorf("Failed to declare an exchange: %s", err) + return err } q.setClient(&client{ @@ -203,6 +238,42 @@ func (q *AMQP) Connect() error { return nil } +func declareExchange( + channel *amqp.Channel, + exchangeName string, + exchangeType string, + exchangePassive bool, + exchangeDurable bool, + exchangeArguments amqp.Table, +) error { + var err error + if exchangePassive { + err = channel.ExchangeDeclarePassive( + exchangeName, + exchangeType, + exchangeDurable, + false, // delete when unused + false, // internal + false, // no-wait + exchangeArguments, + ) + } else { + err = channel.ExchangeDeclare( + exchangeName, + exchangeType, + exchangeDurable, + false, // delete when unused + false, // internal + false, // no-wait + exchangeArguments, + ) + } + if err != nil { + return fmt.Errorf("error declaring exchange: %v", err) + } + return nil +} + func (q *AMQP) Close() error { c := q.getClient() if c == nil { @@ -292,10 +363,12 @@ func (q *AMQP) setClient(c *client) { func init() { outputs.Add("amqp", func() telegraf.Output { return &AMQP{ - AuthMethod: DefaultAuthMethod, - Database: DefaultDatabase, - RetentionPolicy: DefaultRetentionPolicy, - Timeout: internal.Duration{Duration: time.Second * 5}, + AuthMethod: DefaultAuthMethod, + ExchangeType: DefaultExchangeType, + ExchangeDurability: DefaultExchangeDurability, + Database: DefaultDatabase, + RetentionPolicy: DefaultRetentionPolicy, + Timeout: internal.Duration{Duration: time.Second * 5}, } }) }