From bca73f09231784587b18c5127c089204e785625f Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Thu, 30 Nov 2017 18:40:12 -0800 Subject: [PATCH] Add option to amqp output to publish persistent messages (#3528) --- plugins/outputs/amqp/README.md | 3 +++ plugins/outputs/amqp/amqp.go | 28 ++++++++++++++++++++++++---- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/plugins/outputs/amqp/README.md b/plugins/outputs/amqp/README.md index d79af597a..834074436 100644 --- a/plugins/outputs/amqp/README.md +++ b/plugins/outputs/amqp/README.md @@ -29,6 +29,9 @@ For an introduction to AMQP see: ## Telegraf tag to use as a routing key ## ie, if this tag exists, its value will be used as the routing key routing_tag = "host" + ## Delivery Mode controls if a published message is persistent + ## Valid options are "transient" and "persistent". default: "transient" + # delivery_mode = "transient" ## InfluxDB retention policy # retention_policy = "default" diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go index 75fe4c711..fed1edfe4 100644 --- a/plugins/outputs/amqp/amqp.go +++ b/plugins/outputs/amqp/amqp.go @@ -39,6 +39,9 @@ type AMQP struct { Precision string // Connection timeout Timeout internal.Duration + // Delivery Mode controls if a published message is persistent + // Valid options are "transient" and "persistent". default: "transient" + DeliveryMode string // Path to CA file SSLCA string `toml:"ssl_ca"` @@ -52,7 +55,8 @@ type AMQP struct { sync.Mutex c *client - serializer serializers.Serializer + deliveryMode uint8 + serializer serializers.Serializer } type externalAuth struct{} @@ -82,6 +86,9 @@ var sampleConfig = ` ## Telegraf tag to use as a routing key ## ie, if this tag exists, its value will be used as the routing key routing_tag = "host" + ## Delivery Mode controls if a published message is persistent + ## Valid options are "transient" and "persistent". default: "transient" + delivery_mode = "transient" ## InfluxDB retention policy # retention_policy = "default" @@ -111,6 +118,18 @@ func (a *AMQP) SetSerializer(serializer serializers.Serializer) { } func (q *AMQP) Connect() error { + switch q.DeliveryMode { + case "transient": + q.deliveryMode = amqp.Transient + break + case "persistent": + q.deliveryMode = amqp.Persistent + break + default: + q.deliveryMode = amqp.Transient + break + } + headers := amqp.Table{ "database": q.Database, "retention_policy": q.RetentionPolicy, @@ -245,9 +264,10 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error { false, // mandatory false, // immediate amqp.Publishing{ - Headers: c.headers, - ContentType: "text/plain", - Body: buf, + Headers: c.headers, + ContentType: "text/plain", + Body: buf, + DeliveryMode: q.deliveryMode, }) if err != nil { return fmt.Errorf("Failed to send AMQP message: %s", err)