telegraf/outputs/amqp/amqp.go

127 lines
2.5 KiB
Go
Raw Normal View History

2015-09-15 18:16:53 +00:00
package amqp
import (
"fmt"
"time"
2015-09-15 18:16:53 +00:00
"github.com/influxdb/influxdb/client"
"github.com/influxdb/telegraf/outputs"
"github.com/streadway/amqp"
)
type AMQP struct {
// AMQP brokers to send metrics to
URL string
// AMQP exchange
Exchange string
// Routing Key Tag
RoutingTag string `toml:"routing_tag"`
2015-09-15 18:16:53 +00:00
channel *amqp.Channel
}
var sampleConfig = `
# AMQP url
url = "amqp://localhost:5672/influxdb"
# AMQP exchange
exchange = "telegraf"
# Telegraf tag to use as a routing key
# ie, if this tag exists, it's value will be used as the routing key
routing_tag = "host"
2015-09-15 18:16:53 +00:00
`
func (q *AMQP) Connect() error {
connection, err := amqp.Dial(q.URL)
if err != nil {
return err
}
channel, err := connection.Channel()
if err != nil {
return fmt.Errorf("Failed to open a channel: %s", err)
}
err = channel.ExchangeDeclare(
q.Exchange, // name
"topic", // type
true, // durable
false, // delete when unused
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("Failed to declare an exchange: %s", err)
}
q.channel = channel
return nil
}
func (q *AMQP) Close() error {
return q.channel.Close()
}
func (q *AMQP) SampleConfig() string {
return sampleConfig
}
func (q *AMQP) Description() string {
return "Configuration for the AMQP server to send metrics to"
}
func (q *AMQP) Write(bp client.BatchPoints) error {
if len(bp.Points) == 0 {
return nil
}
var zero_time time.Time
2015-09-15 18:16:53 +00:00
for _, p := range bp.Points {
// Combine tags from Point and BatchPoints and grab the resulting
// line-protocol output string to write to AMQP
2015-09-15 18:16:53 +00:00
var value, key string
if p.Raw != "" {
value = p.Raw
} else {
for k, v := range bp.Tags {
if p.Tags == nil {
p.Tags = make(map[string]string, len(bp.Tags))
}
p.Tags[k] = v
}
if p.Time == zero_time {
if bp.Time == zero_time {
p.Time = time.Now()
} else {
p.Time = bp.Time
}
}
2015-09-15 18:16:53 +00:00
value = p.MarshalString()
}
if q.RoutingTag != "" {
if h, ok := p.Tags[q.RoutingTag]; ok {
key = h
}
2015-09-15 18:16:53 +00:00
}
err := q.channel.Publish(
q.Exchange, // exchange
key, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(value),
})
if err != nil {
return fmt.Errorf("FAILED to send amqp message: %s", err)
2015-09-15 18:16:53 +00:00
}
}
return nil
}
func init() {
outputs.Add("amqp", func() outputs.Output {
return &AMQP{}
})
}