[amqp output] Add ability to specify influxdb database

and retention policy, as well as precision as amqp headers
This commit is contained in:
Eugene Dementiev 2015-10-30 12:02:14 +03:00
parent 7cc60dfb8f
commit f388aa8d13
1 changed files with 33 additions and 1 deletions

View File

@ -19,11 +19,24 @@ type AMQP struct {
Exchange string Exchange string
// Routing Key Tag // Routing Key Tag
RoutingTag string `toml:"routing_tag"` RoutingTag string `toml:"routing_tag"`
// InfluxDB database
Database string
// InfluxDB retention policy
RetentionPolicy string
// InfluxDB precision
Precision string
channel *amqp.Channel channel *amqp.Channel
sync.Mutex sync.Mutex
headers amqp.Table
} }
const (
DefaultRetentionPolicy = "default"
DefaultDatabase = "telegraf"
DefaultPrecision = "s"
)
var sampleConfig = ` var sampleConfig = `
# AMQP url # AMQP url
url = "amqp://localhost:5672/influxdb" url = "amqp://localhost:5672/influxdb"
@ -32,11 +45,25 @@ var sampleConfig = `
# Telegraf tag to use as a routing key # Telegraf tag to use as a routing key
# ie, if this tag exists, it's value will be used as the routing key # ie, if this tag exists, it's value will be used as the routing key
routing_tag = "host" routing_tag = "host"
# InfluxDB retention policy
#retention_policy = "default"
# InfluxDB database
#database = "telegraf"
# InfluxDB precision
#precision = "s"
` `
func (q *AMQP) Connect() error { func (q *AMQP) Connect() error {
q.Lock() q.Lock()
defer q.Unlock() defer q.Unlock()
q.headers = amqp.Table{
"precision": q.Precision,
"database": q.Database,
"retention_policy": q.RetentionPolicy,
}
connection, err := amqp.Dial(q.URL) connection, err := amqp.Dial(q.URL)
if err != nil { if err != nil {
return err return err
@ -112,6 +139,7 @@ func (q *AMQP) Write(points []*client.Point) error {
false, // mandatory false, // mandatory
false, // immediate false, // immediate
amqp.Publishing{ amqp.Publishing{
Headers: q.headers,
ContentType: "text/plain", ContentType: "text/plain",
Body: bytes.Join(buf, []byte("\n")), Body: bytes.Join(buf, []byte("\n")),
}) })
@ -124,6 +152,10 @@ func (q *AMQP) Write(points []*client.Point) error {
func init() { func init() {
outputs.Add("amqp", func() outputs.Output { outputs.Add("amqp", func() outputs.Output {
return &AMQP{} return &AMQP{
Database: DefaultDatabase,
Precision: DefaultPrecision,
RetentionPolicy: DefaultRetentionPolicy,
}
}) })
} }