telegraf/plugins/outputs/amqp/amqp.go

201 lines
4.2 KiB
Go

package amqp
import (
"bytes"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"log"
"sync"
"time"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/streadway/amqp"
)
type AMQP struct {
// AMQP brokers to send metrics to
URL string
// AMQP exchange
Exchange string
// path to CA file
SslCa string
// path to host cert file
SslCert string
// path to cert key file
SslKey string
// Routing Key Tag
RoutingTag string `toml:"routing_tag"`
// InfluxDB database
Database string
// InfluxDB retention policy
RetentionPolicy string
// InfluxDB precision
Precision string
channel *amqp.Channel
sync.Mutex
headers amqp.Table
}
const (
DefaultRetentionPolicy = "default"
DefaultDatabase = "telegraf"
DefaultPrecision = "s"
)
var sampleConfig = `
# AMQP url
url = "amqp://localhost:5672/influxdb"
# AMQP exchange
exchange = "telegraf"
# Telegraf tag to use as a routing key
# ie, if this tag exists, it's value will be used as the routing key
routing_tag = "host"
# Use ssl
#ssl_ca = "/etc/telegraf/ca.pem"
#ssl_cert = "/etc/telegraf/cert.pem"
#ssl_key = "/etc/telegraf/key.pem"
# InfluxDB retention policy
#retention_policy = "default"
# InfluxDB database
#database = "telegraf"
# InfluxDB precision
#precision = "s"
`
func (q *AMQP) Connect() error {
q.Lock()
defer q.Unlock()
q.headers = amqp.Table{
"precision": q.Precision,
"database": q.Database,
"retention_policy": q.RetentionPolicy,
}
var connection *amqp.Connection
var err error
if q.SslCert != "" && q.SslKey != "" {
// make new tls config
cfg := new(tls.Config)
if q.SslCa != "" {
// create ca pool
cfg.RootCAs = x509.NewCertPool()
// add self-signed cert
if ca, err := ioutil.ReadFile(q.SslCa); err == nil {
cfg.RootCAs.AppendCertsFromPEM(ca)
} else {
log.Println(err)
}
}
if cert, err := tls.LoadX509KeyPair(q.SslCert, q.SslKey); err == nil {
cfg.Certificates = append(cfg.Certificates, cert)
} else {
log.Println(err)
}
connection, err = amqp.DialTLS(q.URL, cfg)
} else {
connection, err = amqp.Dial(q.URL)
}
if err != nil {
return err
}
channel, err := connection.Channel()
if err != nil {
return fmt.Errorf("Failed to open a channel: %s", err)
}
err = channel.ExchangeDeclare(
q.Exchange, // name
"topic", // type
true, // durable
false, // delete when unused
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("Failed to declare an exchange: %s", err)
}
q.channel = channel
go func() {
log.Printf("Closing: %s", <-connection.NotifyClose(make(chan *amqp.Error)))
log.Printf("Trying to reconnect")
for err := q.Connect(); err != nil; err = q.Connect() {
log.Println(err)
time.Sleep(10 * time.Second)
}
}()
return nil
}
func (q *AMQP) Close() error {
return q.channel.Close()
}
func (q *AMQP) SampleConfig() string {
return sampleConfig
}
func (q *AMQP) Description() string {
return "Configuration for the AMQP server to send metrics to"
}
func (q *AMQP) Write(points []*client.Point) error {
q.Lock()
defer q.Unlock()
if len(points) == 0 {
return nil
}
var outbuf = make(map[string][][]byte)
for _, p := range points {
// Combine tags from Point and BatchPoints and grab the resulting
// line-protocol output string to write to AMQP
var value, key string
value = p.String()
if q.RoutingTag != "" {
if h, ok := p.Tags()[q.RoutingTag]; ok {
key = h
}
}
outbuf[key] = append(outbuf[key], []byte(value))
}
for key, buf := range outbuf {
err := q.channel.Publish(
q.Exchange, // exchange
key, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: q.headers,
ContentType: "text/plain",
Body: bytes.Join(buf, []byte("\n")),
})
if err != nil {
return fmt.Errorf("FAILED to send amqp message: %s", err)
}
}
return nil
}
func init() {
outputs.Add("amqp", func() outputs.Output {
return &AMQP{
Database: DefaultDatabase,
Precision: DefaultPrecision,
RetentionPolicy: DefaultRetentionPolicy,
}
})
}