From 986186a440521bde1d97cc4ce8b344dae06d9aa0 Mon Sep 17 00:00:00 2001 From: Mike Gent Date: Sun, 3 Jun 2018 17:52:00 -0500 Subject: [PATCH] Add static routing_key option to amqp output (#3994) --- etc/telegraf.conf | 6 ++++-- plugins/outputs/amqp/README.md | 17 +++++++++++++---- plugins/outputs/amqp/amqp.go | 10 +++++++++- 3 files changed, 26 insertions(+), 7 deletions(-) diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 1e7e8bed1..12baeb982 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -168,8 +168,11 @@ # ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as # ## described here: https://www.rabbitmq.com/plugins.html # # auth_method = "PLAIN" +# ## Topic routing key +# # routing_key = "" # ## Telegraf tag to use as a routing key # ## ie, if this tag exists, its value will be used as the routing key +# ## and override routing_key config even if defined # routing_tag = "host" # ## Delivery Mode controls if a published message is persistent # ## Valid options are "transient" and "persistent". default: "transient" @@ -2328,7 +2331,7 @@ # reverse_metric_names = true -# # A plugin to collect stats from Opensmtpd - a validating, recursive, and caching DNS resolver +# # A plugin to collect stats from Opensmtpd - a validating, recursive, and caching DNS resolver # [[inputs.opensmtpd]] # ## If running as a restricted user you can prepend sudo for additional access: # #use_sudo = false @@ -3630,4 +3633,3 @@ # [[inputs.zipkin]] # # path = "/api/v1/spans" # URL path for span data # # port = 9411 # Port on which Telegraf listens - diff --git a/plugins/outputs/amqp/README.md b/plugins/outputs/amqp/README.md index ea17fe769..52a4ccbd1 100644 --- a/plugins/outputs/amqp/README.md +++ b/plugins/outputs/amqp/README.md @@ -2,12 +2,18 @@ This plugin writes to a AMQP 0-9-1 Exchange, a promenent implementation of this protocol being [RabbitMQ](https://www.rabbitmq.com/). -Metrics are written to a topic exchange using tag, defined in configuration file as RoutingTag, as a routing key. +Metrics are written to a topic exchange using a routing key defined by: +1. The routing_key config defines a static value +2. The routing_tag config defines a metric tag with a dynamic value, overriding the static routing_key if found +3. If neither option is defined, or the tag is not found in a metric, then the empty routing key will be used -If RoutingTag is empty, then empty routing key will be used. -Metrics are grouped in batches by RoutingTag. +Metrics are grouped in batches by the final routing key. -This plugin doesn't bind exchange to a queue, so it should be done by consumer. +This plugin doesn't bind exchange to a queue, so it should be done by consumer. The exchange is always defined as type: topic. +To use it for distributing metrics equally among workers (type: direct), set the routing_key to a static value on the exchange, +declare and bind a single queue with the same routing_key, and consume from the same queue in each worker. +To use it to send metrics to many consumers at once (type: fanout), set the routing_key to "#" on the exchange, then declare, bind, +and consume from individual queues in each worker. For an introduction to AMQP see: - https://www.rabbitmq.com/tutorials/amqp-concepts.html @@ -26,8 +32,11 @@ For an introduction to AMQP see: ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as ## described here: https://www.rabbitmq.com/plugins.html # auth_method = "PLAIN" + ## Topic routing key + # routing_key = "" ## Telegraf tag to use as a routing key ## ie, if this tag exists, its value will be used as the routing key + ## and override routing_key config even if defined routing_tag = "host" ## Delivery Mode controls if a published message is persistent ## Valid options are "transient" and "persistent". default: "transient" diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go index f2bfb7ac7..bd3068eb8 100644 --- a/plugins/outputs/amqp/amqp.go +++ b/plugins/outputs/amqp/amqp.go @@ -30,7 +30,9 @@ type AMQP struct { Exchange string // AMQP Auth method AuthMethod string - // Routing Key Tag + // Routing Key (static) + RoutingKey string `toml:"routing_key"` + // Routing Key from Tag RoutingTag string `toml:"routing_tag"` // InfluxDB database Database string @@ -77,8 +79,11 @@ var sampleConfig = ` ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as ## described here: https://www.rabbitmq.com/plugins.html # auth_method = "PLAIN" + ## Topic routing key + # routing_key = "" ## Telegraf tag to use as a routing key ## ie, if this tag exists, its value will be used as the routing key + ## and override routing_key config even if defined routing_tag = "host" ## Delivery Mode controls if a published message is persistent ## Valid options are "transient" and "persistent". default: "transient" @@ -234,6 +239,9 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error { for _, metric := range metrics { var key string + if q.RoutingKey != "" { + key = q.RoutingKey + } if q.RoutingTag != "" { if h, ok := metric.Tags()[q.RoutingTag]; ok { key = h