Add option to amqp output to publish persistent messages (#3528)

This commit is contained in:
Daniel Nelson 2017-11-30 18:40:12 -08:00 committed by GitHub
parent 1dee532574
commit bca73f0923
2 changed files with 27 additions and 4 deletions

View File

@ -29,6 +29,9 @@ For an introduction to AMQP see:
## Telegraf tag to use as a routing key ## Telegraf tag to use as a routing key
## ie, if this tag exists, its value will be used as the routing key ## ie, if this tag exists, its value will be used as the routing key
routing_tag = "host" routing_tag = "host"
## Delivery Mode controls if a published message is persistent
## Valid options are "transient" and "persistent". default: "transient"
# delivery_mode = "transient"
## InfluxDB retention policy ## InfluxDB retention policy
# retention_policy = "default" # retention_policy = "default"

View File

@ -39,6 +39,9 @@ type AMQP struct {
Precision string Precision string
// Connection timeout // Connection timeout
Timeout internal.Duration Timeout internal.Duration
// Delivery Mode controls if a published message is persistent
// Valid options are "transient" and "persistent". default: "transient"
DeliveryMode string
// Path to CA file // Path to CA file
SSLCA string `toml:"ssl_ca"` SSLCA string `toml:"ssl_ca"`
@ -52,6 +55,7 @@ type AMQP struct {
sync.Mutex sync.Mutex
c *client c *client
deliveryMode uint8
serializer serializers.Serializer serializer serializers.Serializer
} }
@ -82,6 +86,9 @@ var sampleConfig = `
## Telegraf tag to use as a routing key ## Telegraf tag to use as a routing key
## ie, if this tag exists, its value will be used as the routing key ## ie, if this tag exists, its value will be used as the routing key
routing_tag = "host" routing_tag = "host"
## Delivery Mode controls if a published message is persistent
## Valid options are "transient" and "persistent". default: "transient"
delivery_mode = "transient"
## InfluxDB retention policy ## InfluxDB retention policy
# retention_policy = "default" # retention_policy = "default"
@ -111,6 +118,18 @@ func (a *AMQP) SetSerializer(serializer serializers.Serializer) {
} }
func (q *AMQP) Connect() error { func (q *AMQP) Connect() error {
switch q.DeliveryMode {
case "transient":
q.deliveryMode = amqp.Transient
break
case "persistent":
q.deliveryMode = amqp.Persistent
break
default:
q.deliveryMode = amqp.Transient
break
}
headers := amqp.Table{ headers := amqp.Table{
"database": q.Database, "database": q.Database,
"retention_policy": q.RetentionPolicy, "retention_policy": q.RetentionPolicy,
@ -248,6 +267,7 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
Headers: c.headers, Headers: c.headers,
ContentType: "text/plain", ContentType: "text/plain",
Body: buf, Body: buf,
DeliveryMode: q.deliveryMode,
}) })
if err != nil { if err != nil {
return fmt.Errorf("Failed to send AMQP message: %s", err) return fmt.Errorf("Failed to send AMQP message: %s", err)