Add option to amqp output to publish persistent messages (#3528)

This commit is contained in:
Daniel Nelson 2017-11-30 18:40:12 -08:00 committed by GitHub
parent a9951710b3
commit 44320a5421
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 27 additions and 4 deletions

View File

@ -29,6 +29,9 @@ For an introduction to AMQP see:
## Telegraf tag to use as a routing key ## Telegraf tag to use as a routing key
## ie, if this tag exists, its value will be used as the routing key ## ie, if this tag exists, its value will be used as the routing key
routing_tag = "host" routing_tag = "host"
## Delivery Mode controls if a published message is persistent
## Valid options are "transient" and "persistent". default: "transient"
# delivery_mode = "transient"
## InfluxDB retention policy ## InfluxDB retention policy
# retention_policy = "default" # retention_policy = "default"

View File

@ -39,6 +39,9 @@ type AMQP struct {
Precision string Precision string
// Connection timeout // Connection timeout
Timeout internal.Duration Timeout internal.Duration
// Delivery Mode controls if a published message is persistent
// Valid options are "transient" and "persistent". default: "transient"
DeliveryMode string
// Path to CA file // Path to CA file
SSLCA string `toml:"ssl_ca"` SSLCA string `toml:"ssl_ca"`
@ -52,7 +55,8 @@ type AMQP struct {
sync.Mutex sync.Mutex
c *client c *client
serializer serializers.Serializer deliveryMode uint8
serializer serializers.Serializer
} }
type externalAuth struct{} type externalAuth struct{}
@ -82,6 +86,9 @@ var sampleConfig = `
## Telegraf tag to use as a routing key ## Telegraf tag to use as a routing key
## ie, if this tag exists, its value will be used as the routing key ## ie, if this tag exists, its value will be used as the routing key
routing_tag = "host" routing_tag = "host"
## Delivery Mode controls if a published message is persistent
## Valid options are "transient" and "persistent". default: "transient"
delivery_mode = "transient"
## InfluxDB retention policy ## InfluxDB retention policy
# retention_policy = "default" # retention_policy = "default"
@ -111,6 +118,18 @@ func (a *AMQP) SetSerializer(serializer serializers.Serializer) {
} }
func (q *AMQP) Connect() error { func (q *AMQP) Connect() error {
switch q.DeliveryMode {
case "transient":
q.deliveryMode = amqp.Transient
break
case "persistent":
q.deliveryMode = amqp.Persistent
break
default:
q.deliveryMode = amqp.Transient
break
}
headers := amqp.Table{ headers := amqp.Table{
"database": q.Database, "database": q.Database,
"retention_policy": q.RetentionPolicy, "retention_policy": q.RetentionPolicy,
@ -245,9 +264,10 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
false, // mandatory false, // mandatory
false, // immediate false, // immediate
amqp.Publishing{ amqp.Publishing{
Headers: c.headers, Headers: c.headers,
ContentType: "text/plain", ContentType: "text/plain",
Body: buf, Body: buf,
DeliveryMode: q.deliveryMode,
}) })
if err != nil { if err != nil {
return fmt.Errorf("Failed to send AMQP message: %s", err) return fmt.Errorf("Failed to send AMQP message: %s", err)