diff --git a/outputs/amqp/amqp.go b/outputs/amqp/amqp.go index e33aad274..a5c8c5a9f 100644 --- a/outputs/amqp/amqp.go +++ b/outputs/amqp/amqp.go @@ -19,11 +19,24 @@ type AMQP struct { Exchange string // Routing Key Tag RoutingTag string `toml:"routing_tag"` + // InfluxDB database + Database string + // InfluxDB retention policy + RetentionPolicy string + // InfluxDB precision + Precision string channel *amqp.Channel sync.Mutex + headers amqp.Table } +const ( + DefaultRetentionPolicy = "default" + DefaultDatabase = "telegraf" + DefaultPrecision = "s" +) + var sampleConfig = ` # AMQP url url = "amqp://localhost:5672/influxdb" @@ -32,11 +45,25 @@ var sampleConfig = ` # Telegraf tag to use as a routing key # ie, if this tag exists, it's value will be used as the routing key routing_tag = "host" + + # InfluxDB retention policy + #retention_policy = "default" + # InfluxDB database + #database = "telegraf" + # InfluxDB precision + #precision = "s" ` func (q *AMQP) Connect() error { q.Lock() defer q.Unlock() + + q.headers = amqp.Table{ + "precision": q.Precision, + "database": q.Database, + "retention_policy": q.RetentionPolicy, + } + connection, err := amqp.Dial(q.URL) if err != nil { return err @@ -112,6 +139,7 @@ func (q *AMQP) Write(points []*client.Point) error { false, // mandatory false, // immediate amqp.Publishing{ + Headers: q.headers, ContentType: "text/plain", Body: bytes.Join(buf, []byte("\n")), }) @@ -124,6 +152,10 @@ func (q *AMQP) Write(points []*client.Point) error { func init() { outputs.Add("amqp", func() outputs.Output { - return &AMQP{} + return &AMQP{ + Database: DefaultDatabase, + Precision: DefaultPrecision, + RetentionPolicy: DefaultRetentionPolicy, + } }) }