Add static routing_key option to amqp output (#3994)

This commit is contained in:
Mike Gent 2018-06-03 17:52:00 -05:00 committed by Daniel Nelson
parent 414d7b9543
commit 986186a440
3 changed files with 26 additions and 7 deletions

View File

@ -168,8 +168,11 @@
# ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
# ## described here: https://www.rabbitmq.com/plugins.html
# # auth_method = "PLAIN"
# ## Topic routing key
# # routing_key = ""
# ## Telegraf tag to use as a routing key
# ## ie, if this tag exists, its value will be used as the routing key
# ## and override routing_key config even if defined
# routing_tag = "host"
# ## Delivery Mode controls if a published message is persistent
# ## Valid options are "transient" and "persistent". default: "transient"
@ -2328,7 +2331,7 @@
# reverse_metric_names = true
# # A plugin to collect stats from Opensmtpd - a validating, recursive, and caching DNS resolver
# # A plugin to collect stats from Opensmtpd - a validating, recursive, and caching DNS resolver
# [[inputs.opensmtpd]]
# ## If running as a restricted user you can prepend sudo for additional access:
# #use_sudo = false
@ -3630,4 +3633,3 @@
# [[inputs.zipkin]]
# # path = "/api/v1/spans" # URL path for span data
# # port = 9411 # Port on which Telegraf listens

View File

@ -2,12 +2,18 @@
This plugin writes to a AMQP 0-9-1 Exchange, a promenent implementation of this protocol being [RabbitMQ](https://www.rabbitmq.com/).
Metrics are written to a topic exchange using tag, defined in configuration file as RoutingTag, as a routing key.
Metrics are written to a topic exchange using a routing key defined by:
1. The routing_key config defines a static value
2. The routing_tag config defines a metric tag with a dynamic value, overriding the static routing_key if found
3. If neither option is defined, or the tag is not found in a metric, then the empty routing key will be used
If RoutingTag is empty, then empty routing key will be used.
Metrics are grouped in batches by RoutingTag.
Metrics are grouped in batches by the final routing key.
This plugin doesn't bind exchange to a queue, so it should be done by consumer.
This plugin doesn't bind exchange to a queue, so it should be done by consumer. The exchange is always defined as type: topic.
To use it for distributing metrics equally among workers (type: direct), set the routing_key to a static value on the exchange,
declare and bind a single queue with the same routing_key, and consume from the same queue in each worker.
To use it to send metrics to many consumers at once (type: fanout), set the routing_key to "#" on the exchange, then declare, bind,
and consume from individual queues in each worker.
For an introduction to AMQP see:
- https://www.rabbitmq.com/tutorials/amqp-concepts.html
@ -26,8 +32,11 @@ For an introduction to AMQP see:
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
# auth_method = "PLAIN"
## Topic routing key
# routing_key = ""
## Telegraf tag to use as a routing key
## ie, if this tag exists, its value will be used as the routing key
## and override routing_key config even if defined
routing_tag = "host"
## Delivery Mode controls if a published message is persistent
## Valid options are "transient" and "persistent". default: "transient"

View File

@ -30,7 +30,9 @@ type AMQP struct {
Exchange string
// AMQP Auth method
AuthMethod string
// Routing Key Tag
// Routing Key (static)
RoutingKey string `toml:"routing_key"`
// Routing Key from Tag
RoutingTag string `toml:"routing_tag"`
// InfluxDB database
Database string
@ -77,8 +79,11 @@ var sampleConfig = `
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
# auth_method = "PLAIN"
## Topic routing key
# routing_key = ""
## Telegraf tag to use as a routing key
## ie, if this tag exists, its value will be used as the routing key
## and override routing_key config even if defined
routing_tag = "host"
## Delivery Mode controls if a published message is persistent
## Valid options are "transient" and "persistent". default: "transient"
@ -234,6 +239,9 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
for _, metric := range metrics {
var key string
if q.RoutingKey != "" {
key = q.RoutingKey
}
if q.RoutingTag != "" {
if h, ok := metric.Tags()[q.RoutingTag]; ok {
key = h