Allow configuration of amqp exchange type, durability, and arguments

This commit is contained in:
Daniel Nelson 2018-06-03 16:31:11 -07:00 committed by Daniel Nelson
parent fcea745e99
commit e3f1d28908
4 changed files with 217 additions and 59 deletions

View File

@ -17,23 +17,36 @@ The following defaults are known to work with RabbitMQ:
[[inputs.amqp_consumer]]
## AMQP url
url = "amqp://localhost:5672/influxdb"
## AMQP exchange
## Exchange to declare and consume from.
exchange = "telegraf"
## Exchange passive mode
exchange_passive = false
## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash".
# exchange_type = "topic"
## If true, exchange will be passively declared.
# exchange_passive = false
## Exchange durability can be either "transient" or "durable".
# exchange_durability = "durable"
## Additional exchange arguments.
# exchange_args = { }
# exchange_args = {"hash_propery" = "timestamp"}
## AMQP queue name
queue = "telegraf"
## Binding Key
binding_key = "#"
## Controls how many messages the server will try to keep on the network
## for consumers before receiving delivery acks.
#prefetch_count = 50
## Maximum number of messages server should give to the worker.
prefetch_count = 50
## Auth method. PLAIN and EXTERNAL are supported.
## Auth method. PLAIN and EXTERNAL are supported
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
# auth_method = "PLAIN"
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"

View File

@ -18,10 +18,13 @@ import (
// AMQPConsumer is the top level struct for this plugin
type AMQPConsumer struct {
URL string
// AMQP exchange
Exchange string
// Exchange passive mode
ExchangePassive bool
Exchange string `toml:"exchange"`
ExchangeType string `toml:"exchange_type"`
ExchangeDurability string `toml:"exchange_durability"`
ExchangePassive bool `toml:"exchange_passive"`
ExchangeArguments map[string]string `toml:"exchange_arguments"`
// Queue Name
Queue string
// Binding Key
@ -50,7 +53,11 @@ func (a *externalAuth) Response() string {
}
const (
DefaultAuthMethod = "PLAIN"
DefaultAuthMethod = "PLAIN"
DefaultExchangeType = "topic"
DefaultExchangeDurability = "durable"
DefaultPrefetchCount = 50
)
@ -58,10 +65,23 @@ func (a *AMQPConsumer) SampleConfig() string {
return `
## AMQP url
url = "amqp://localhost:5672/influxdb"
## AMQP exchange
## Exchange to declare and consume from.
exchange = "telegraf"
## Exchange passive mode
exchange_passive = false
## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash".
# exchange_type = "topic"
## If true, exchange will be passively declared.
# exchange_passive = false
## Exchange durability can be either "transient" or "durable".
# exchange_durability = "durable"
## Additional exchange arguments.
# exchange_args = { }
# exchange_args = {"hash_propery" = "timestamp"}
## AMQP queue name
queue = "telegraf"
## Binding Key
@ -178,29 +198,28 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err
return nil, fmt.Errorf("Failed to open a channel: %s", err)
}
if a.ExchangePassive == true {
err = ch.ExchangeDeclarePassive(
a.Exchange, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
} else {
err = ch.ExchangeDeclare(
a.Exchange, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
var exchangeDurable = true
switch a.ExchangeDurability {
case "transient":
exchangeDurable = false
default:
exchangeDurable = true
}
exchangeArgs := make(amqp.Table, len(a.ExchangeArguments))
for k, v := range a.ExchangeArguments {
exchangeArgs[k] = v
}
err = declareExchange(
ch,
a.Exchange,
a.ExchangeType,
a.ExchangePassive,
exchangeDurable,
exchangeArgs)
if err != nil {
return nil, fmt.Errorf("Failed to declare an exchange: %s", err)
return nil, err
}
q, err := ch.QueueDeclare(
@ -252,6 +271,42 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err
return msgs, err
}
func declareExchange(
channel *amqp.Channel,
exchangeName string,
exchangeType string,
exchangePassive bool,
exchangeDurable bool,
exchangeArguments amqp.Table,
) error {
var err error
if exchangePassive {
err = channel.ExchangeDeclarePassive(
exchangeName,
exchangeType,
exchangeDurable,
false, // delete when unused
false, // internal
false, // no-wait
exchangeArguments,
)
} else {
err = channel.ExchangeDeclare(
exchangeName,
exchangeType,
exchangeDurable,
false, // delete when unused
false, // internal
false, // no-wait
exchangeArguments,
)
}
if err != nil {
return fmt.Errorf("error declaring exchange: %v", err)
}
return nil
}
// Read messages from queue and add them to the Accumulator
func (a *AMQPConsumer) process(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) {
defer a.wg.Done()
@ -283,8 +338,10 @@ func (a *AMQPConsumer) Stop() {
func init() {
inputs.Add("amqp_consumer", func() telegraf.Input {
return &AMQPConsumer{
AuthMethod: DefaultAuthMethod,
PrefetchCount: DefaultPrefetchCount,
AuthMethod: DefaultAuthMethod,
ExchangeType: DefaultExchangeType,
ExchangeDurability: DefaultExchangeDurability,
PrefetchCount: DefaultPrefetchCount,
}
})
}

View File

@ -21,13 +21,28 @@ For an introduction to AMQP see:
### Configuration:
```
```toml
# Configuration for the AMQP server to send metrics to
[[outputs.amqp]]
## AMQP url
url = "amqp://localhost:5672/influxdb"
## AMQP exchange
## Exchange to declare and publish to.
exchange = "telegraf"
## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash".
# exchange_type = "topic"
## If true, exchange will be passively declared.
# exchange_passive = false
## Exchange durability can be either "transient" or "durable".
# exchange_durability = "durable"
## Additional exchange arguments.
# exchange_args = { }
# exchange_args = {"hash_propery" = "timestamp"}
## Auth method. PLAIN and EXTERNAL are supported
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
@ -40,7 +55,7 @@ For an introduction to AMQP see:
routing_tag = "host"
## Delivery Mode controls if a published message is persistent
## Valid options are "transient" and "persistent". default: "transient"
# delivery_mode = "transient"
delivery_mode = "transient"
## InfluxDB retention policy
# retention_policy = "default"

View File

@ -26,8 +26,13 @@ type client struct {
type AMQP struct {
// AMQP brokers to send metrics to
URL string
// AMQP exchange
Exchange string
Exchange string `toml:"exchange"`
ExchangeType string `toml:"exchange_type"`
ExchangeDurability string `toml:"exchange_durability"`
ExchangePassive bool `toml:"exchange_passive"`
ExchangeArguments map[string]string `toml:"exchange_arguments"`
// AMQP Auth method
AuthMethod string
// Routing Key (static)
@ -65,7 +70,11 @@ func (a *externalAuth) Response() string {
}
const (
DefaultAuthMethod = "PLAIN"
DefaultAuthMethod = "PLAIN"
DefaultExchangeType = "topic"
DefaultExchangeDurability = "durable"
DefaultRetentionPolicy = "default"
DefaultDatabase = "telegraf"
)
@ -73,8 +82,23 @@ const (
var sampleConfig = `
## AMQP url
url = "amqp://localhost:5672/influxdb"
## AMQP exchange
## Exchange to declare and publish to.
exchange = "telegraf"
## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash".
# exchange_type = "topic"
## If true, exchange will be passively declared.
# exchange_passive = false
## Exchange durability can be either "transient" or "durable".
# exchange_durability = "durable"
## Additional exchange arguments.
# exchange_args = { }
# exchange_args = {"hash_propery" = "timestamp"}
## Auth method. PLAIN and EXTERNAL are supported
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
@ -166,17 +190,28 @@ func (q *AMQP) Connect() error {
return fmt.Errorf("Failed to open a channel: %s", err)
}
err = channel.ExchangeDeclare(
q.Exchange, // name
"topic", // type
true, // durable
false, // delete when unused
false, // internal
false, // no-wait
nil, // arguments
)
var exchangeDurable = true
switch q.ExchangeDurability {
case "transient":
exchangeDurable = false
default:
exchangeDurable = true
}
exchangeArgs := make(amqp.Table, len(q.ExchangeArguments))
for k, v := range q.ExchangeArguments {
exchangeArgs[k] = v
}
err = declareExchange(
channel,
q.Exchange,
q.ExchangeType,
q.ExchangePassive,
exchangeDurable,
exchangeArgs)
if err != nil {
return fmt.Errorf("Failed to declare an exchange: %s", err)
return err
}
q.setClient(&client{
@ -203,6 +238,42 @@ func (q *AMQP) Connect() error {
return nil
}
func declareExchange(
channel *amqp.Channel,
exchangeName string,
exchangeType string,
exchangePassive bool,
exchangeDurable bool,
exchangeArguments amqp.Table,
) error {
var err error
if exchangePassive {
err = channel.ExchangeDeclarePassive(
exchangeName,
exchangeType,
exchangeDurable,
false, // delete when unused
false, // internal
false, // no-wait
exchangeArguments,
)
} else {
err = channel.ExchangeDeclare(
exchangeName,
exchangeType,
exchangeDurable,
false, // delete when unused
false, // internal
false, // no-wait
exchangeArguments,
)
}
if err != nil {
return fmt.Errorf("error declaring exchange: %v", err)
}
return nil
}
func (q *AMQP) Close() error {
c := q.getClient()
if c == nil {
@ -292,10 +363,12 @@ func (q *AMQP) setClient(c *client) {
func init() {
outputs.Add("amqp", func() telegraf.Output {
return &AMQP{
AuthMethod: DefaultAuthMethod,
Database: DefaultDatabase,
RetentionPolicy: DefaultRetentionPolicy,
Timeout: internal.Duration{Duration: time.Second * 5},
AuthMethod: DefaultAuthMethod,
ExchangeType: DefaultExchangeType,
ExchangeDurability: DefaultExchangeDurability,
Database: DefaultDatabase,
RetentionPolicy: DefaultRetentionPolicy,
Timeout: internal.Duration{Duration: time.Second * 5},
}
})
}