Add static routing_key option to amqp output (#3994)
This commit is contained in:
parent
b556eb8b2f
commit
2cc2913d81
|
@ -168,8 +168,11 @@
|
||||||
# ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
|
# ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
|
||||||
# ## described here: https://www.rabbitmq.com/plugins.html
|
# ## described here: https://www.rabbitmq.com/plugins.html
|
||||||
# # auth_method = "PLAIN"
|
# # auth_method = "PLAIN"
|
||||||
|
# ## Topic routing key
|
||||||
|
# # routing_key = ""
|
||||||
# ## Telegraf tag to use as a routing key
|
# ## Telegraf tag to use as a routing key
|
||||||
# ## ie, if this tag exists, its value will be used as the routing key
|
# ## ie, if this tag exists, its value will be used as the routing key
|
||||||
|
# ## and override routing_key config even if defined
|
||||||
# routing_tag = "host"
|
# routing_tag = "host"
|
||||||
# ## Delivery Mode controls if a published message is persistent
|
# ## Delivery Mode controls if a published message is persistent
|
||||||
# ## Valid options are "transient" and "persistent". default: "transient"
|
# ## Valid options are "transient" and "persistent". default: "transient"
|
||||||
|
@ -3630,4 +3633,3 @@
|
||||||
# [[inputs.zipkin]]
|
# [[inputs.zipkin]]
|
||||||
# # path = "/api/v1/spans" # URL path for span data
|
# # path = "/api/v1/spans" # URL path for span data
|
||||||
# # port = 9411 # Port on which Telegraf listens
|
# # port = 9411 # Port on which Telegraf listens
|
||||||
|
|
||||||
|
|
|
@ -2,12 +2,18 @@
|
||||||
|
|
||||||
This plugin writes to a AMQP 0-9-1 Exchange, a promenent implementation of this protocol being [RabbitMQ](https://www.rabbitmq.com/).
|
This plugin writes to a AMQP 0-9-1 Exchange, a promenent implementation of this protocol being [RabbitMQ](https://www.rabbitmq.com/).
|
||||||
|
|
||||||
Metrics are written to a topic exchange using tag, defined in configuration file as RoutingTag, as a routing key.
|
Metrics are written to a topic exchange using a routing key defined by:
|
||||||
|
1. The routing_key config defines a static value
|
||||||
|
2. The routing_tag config defines a metric tag with a dynamic value, overriding the static routing_key if found
|
||||||
|
3. If neither option is defined, or the tag is not found in a metric, then the empty routing key will be used
|
||||||
|
|
||||||
If RoutingTag is empty, then empty routing key will be used.
|
Metrics are grouped in batches by the final routing key.
|
||||||
Metrics are grouped in batches by RoutingTag.
|
|
||||||
|
|
||||||
This plugin doesn't bind exchange to a queue, so it should be done by consumer.
|
This plugin doesn't bind exchange to a queue, so it should be done by consumer. The exchange is always defined as type: topic.
|
||||||
|
To use it for distributing metrics equally among workers (type: direct), set the routing_key to a static value on the exchange,
|
||||||
|
declare and bind a single queue with the same routing_key, and consume from the same queue in each worker.
|
||||||
|
To use it to send metrics to many consumers at once (type: fanout), set the routing_key to "#" on the exchange, then declare, bind,
|
||||||
|
and consume from individual queues in each worker.
|
||||||
|
|
||||||
For an introduction to AMQP see:
|
For an introduction to AMQP see:
|
||||||
- https://www.rabbitmq.com/tutorials/amqp-concepts.html
|
- https://www.rabbitmq.com/tutorials/amqp-concepts.html
|
||||||
|
@ -26,8 +32,11 @@ For an introduction to AMQP see:
|
||||||
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
|
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
|
||||||
## described here: https://www.rabbitmq.com/plugins.html
|
## described here: https://www.rabbitmq.com/plugins.html
|
||||||
# auth_method = "PLAIN"
|
# auth_method = "PLAIN"
|
||||||
|
## Topic routing key
|
||||||
|
# routing_key = ""
|
||||||
## Telegraf tag to use as a routing key
|
## Telegraf tag to use as a routing key
|
||||||
## ie, if this tag exists, its value will be used as the routing key
|
## ie, if this tag exists, its value will be used as the routing key
|
||||||
|
## and override routing_key config even if defined
|
||||||
routing_tag = "host"
|
routing_tag = "host"
|
||||||
## Delivery Mode controls if a published message is persistent
|
## Delivery Mode controls if a published message is persistent
|
||||||
## Valid options are "transient" and "persistent". default: "transient"
|
## Valid options are "transient" and "persistent". default: "transient"
|
||||||
|
|
|
@ -30,7 +30,9 @@ type AMQP struct {
|
||||||
Exchange string
|
Exchange string
|
||||||
// AMQP Auth method
|
// AMQP Auth method
|
||||||
AuthMethod string
|
AuthMethod string
|
||||||
// Routing Key Tag
|
// Routing Key (static)
|
||||||
|
RoutingKey string `toml:"routing_key"`
|
||||||
|
// Routing Key from Tag
|
||||||
RoutingTag string `toml:"routing_tag"`
|
RoutingTag string `toml:"routing_tag"`
|
||||||
// InfluxDB database
|
// InfluxDB database
|
||||||
Database string
|
Database string
|
||||||
|
@ -77,8 +79,11 @@ var sampleConfig = `
|
||||||
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
|
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
|
||||||
## described here: https://www.rabbitmq.com/plugins.html
|
## described here: https://www.rabbitmq.com/plugins.html
|
||||||
# auth_method = "PLAIN"
|
# auth_method = "PLAIN"
|
||||||
|
## Topic routing key
|
||||||
|
# routing_key = ""
|
||||||
## Telegraf tag to use as a routing key
|
## Telegraf tag to use as a routing key
|
||||||
## ie, if this tag exists, its value will be used as the routing key
|
## ie, if this tag exists, its value will be used as the routing key
|
||||||
|
## and override routing_key config even if defined
|
||||||
routing_tag = "host"
|
routing_tag = "host"
|
||||||
## Delivery Mode controls if a published message is persistent
|
## Delivery Mode controls if a published message is persistent
|
||||||
## Valid options are "transient" and "persistent". default: "transient"
|
## Valid options are "transient" and "persistent". default: "transient"
|
||||||
|
@ -234,6 +239,9 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
var key string
|
var key string
|
||||||
|
if q.RoutingKey != "" {
|
||||||
|
key = q.RoutingKey
|
||||||
|
}
|
||||||
if q.RoutingTag != "" {
|
if q.RoutingTag != "" {
|
||||||
if h, ok := metric.Tags()[q.RoutingTag]; ok {
|
if h, ok := metric.Tags()[q.RoutingTag]; ok {
|
||||||
key = h
|
key = h
|
||||||
|
|
Loading…
Reference in New Issue