telegraf/plugins/outputs/amqp/amqp.go

406 lines
11 KiB
Go

package amqp
import (
"bytes"
"fmt"
"log"
"strings"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/streadway/amqp"
)
const (
DefaultURL = "amqp://localhost:5672/influxdb"
DefaultAuthMethod = "PLAIN"
DefaultExchangeType = "topic"
DefaultRetentionPolicy = "default"
DefaultDatabase = "telegraf"
)
type externalAuth struct{}
func (a *externalAuth) Mechanism() string {
return "EXTERNAL"
}
func (a *externalAuth) Response() string {
return fmt.Sprintf("\000")
}
type AMQP struct {
URL string `toml:"url"` // deprecated in 1.7; use brokers
Brokers []string `toml:"brokers"`
Exchange string `toml:"exchange"`
ExchangeType string `toml:"exchange_type"`
ExchangePassive bool `toml:"exchange_passive"`
ExchangeDurability string `toml:"exchange_durability"`
ExchangeArguments map[string]string `toml:"exchange_arguments"`
Username string `toml:"username"`
Password string `toml:"password"`
MaxMessages int `toml:"max_messages"`
AuthMethod string `toml:"auth_method"`
RoutingTag string `toml:"routing_tag"`
RoutingKey string `toml:"routing_key"`
DeliveryMode string `toml:"delivery_mode"`
Database string `toml:"database"` // deprecated in 1.7; use headers
RetentionPolicy string `toml:"retention_policy"` // deprecated in 1.7; use headers
Precision string `toml:"precision"` // deprecated; has no effect
Headers map[string]string `toml:"headers"`
Timeout internal.Duration `toml:"timeout"`
UseBatchFormat bool `toml:"use_batch_format"`
ContentEncoding string `toml:"content_encoding"`
tls.ClientConfig
serializer serializers.Serializer
connect func(*ClientConfig) (Client, error)
client Client
config *ClientConfig
sentMessages int
encoder internal.ContentEncoder
}
type Client interface {
Publish(key string, body []byte) error
Close() error
}
var sampleConfig = `
## Broker to publish to.
## deprecated in 1.7; use the brokers option
# url = "amqp://localhost:5672/influxdb"
## Brokers to publish to. If multiple brokers are specified a random broker
## will be selected anytime a connection is established. This can be
## helpful for load balancing when not using a dedicated load balancer.
brokers = ["amqp://localhost:5672/influxdb"]
## Maximum messages to send over a connection. Once this is reached, the
## connection is closed and a new connection is made. This can be helpful for
## load balancing when not using a dedicated load balancer.
# max_messages = 0
## Exchange to declare and publish to.
exchange = "telegraf"
## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash".
# exchange_type = "topic"
## If true, exchange will be passively declared.
# exchange_passive = false
## Exchange durability can be either "transient" or "durable".
# exchange_durability = "durable"
## Additional exchange arguments.
# exchange_arguments = { }
# exchange_arguments = {"hash_propery" = "timestamp"}
## Authentication credentials for the PLAIN auth_method.
# username = ""
# password = ""
## Auth method. PLAIN and EXTERNAL are supported
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
# auth_method = "PLAIN"
## Metric tag to use as a routing key.
## ie, if this tag exists, its value will be used as the routing key
# routing_tag = "host"
## Static routing key. Used when no routing_tag is set or as a fallback
## when the tag specified in routing tag is not found.
# routing_key = ""
# routing_key = "telegraf"
## Delivery Mode controls if a published message is persistent.
## One of "transient" or "persistent".
# delivery_mode = "transient"
## InfluxDB database added as a message header.
## deprecated in 1.7; use the headers option
# database = "telegraf"
## InfluxDB retention policy added as a message header
## deprecated in 1.7; use the headers option
# retention_policy = "default"
## Static headers added to each published message.
# headers = { }
# headers = {"database" = "telegraf", "retention_policy" = "default"}
## Connection timeout. If not provided, will default to 5s. 0s means no
## timeout (not recommended).
# timeout = "5s"
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## If true use batch serialization format instead of line based delimiting.
## Only applies to data formats which are not line based such as JSON.
## Recommended to set to true.
# use_batch_format = false
## Content encoding for message payloads, can be set to "gzip" to or
## "identity" to apply no encoding.
##
## Please note that when use_batch_format = false each amqp message contains only
## a single metric, it is recommended to use compression with batch format
## for best results.
# content_encoding = "identity"
## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
# data_format = "influx"
`
func (q *AMQP) SampleConfig() string {
return sampleConfig
}
func (q *AMQP) Description() string {
return "Publishes metrics to an AMQP broker"
}
func (q *AMQP) SetSerializer(serializer serializers.Serializer) {
q.serializer = serializer
}
func (q *AMQP) Connect() error {
if q.config == nil {
config, err := q.makeClientConfig()
if err != nil {
return err
}
q.config = config
}
var err error
q.encoder, err = internal.NewContentEncoder(q.ContentEncoding)
if err != nil {
return err
}
q.client, err = q.connect(q.config)
if err != nil {
return err
}
return nil
}
func (q *AMQP) Close() error {
if q.client != nil {
return q.client.Close()
}
return nil
}
func (q *AMQP) routingKey(metric telegraf.Metric) string {
if q.RoutingTag != "" {
key, ok := metric.GetTag(q.RoutingTag)
if ok {
return key
}
}
return q.RoutingKey
}
func (q *AMQP) Write(metrics []telegraf.Metric) error {
batches := make(map[string][]telegraf.Metric)
if q.ExchangeType == "header" {
// Since the routing_key is ignored for this exchange type send as a
// single batch.
batches[""] = metrics
} else {
for _, metric := range metrics {
routingKey := q.routingKey(metric)
if _, ok := batches[routingKey]; !ok {
batches[routingKey] = make([]telegraf.Metric, 0)
}
batches[routingKey] = append(batches[routingKey], metric)
}
}
first := true
for key, metrics := range batches {
body, err := q.serialize(metrics)
if err != nil {
return err
}
body, err = q.encoder.Encode(body)
if err != nil {
return err
}
err = q.publish(key, body)
if err != nil {
// If this is the first attempt to publish and the connection is
// closed, try to reconnect and retry once.
if aerr, ok := err.(*amqp.Error); first && ok && aerr == amqp.ErrClosed {
first = false
q.client = nil
err := q.publish(key, body)
if err != nil {
return err
}
} else {
q.client = nil
return err
}
}
first = false
}
if q.sentMessages >= q.MaxMessages && q.MaxMessages > 0 {
log.Printf("D! Output [amqp] sent MaxMessages; closing connection")
q.client.Close()
q.client = nil
}
return nil
}
func (q *AMQP) publish(key string, body []byte) error {
if q.client == nil {
client, err := q.connect(q.config)
if err != nil {
return err
}
q.sentMessages = 0
q.client = client
}
err := q.client.Publish(key, body)
if err != nil {
return err
}
q.sentMessages++
return nil
}
func (q *AMQP) serialize(metrics []telegraf.Metric) ([]byte, error) {
if q.UseBatchFormat {
return q.serializer.SerializeBatch(metrics)
} else {
var buf bytes.Buffer
for _, metric := range metrics {
octets, err := q.serializer.Serialize(metric)
if err != nil {
log.Printf("D! [outputs.amqp] Could not serialize metric: %v", err)
continue
}
_, err = buf.Write(octets)
if err != nil {
return nil, err
}
}
body := buf.Bytes()
return body, nil
}
}
func (q *AMQP) makeClientConfig() (*ClientConfig, error) {
config := &ClientConfig{
exchange: q.Exchange,
exchangeType: q.ExchangeType,
exchangePassive: q.ExchangePassive,
encoding: q.ContentEncoding,
timeout: q.Timeout.Duration,
}
switch q.ExchangeDurability {
case "transient":
config.exchangeDurable = false
default:
config.exchangeDurable = true
}
config.brokers = q.Brokers
if len(config.brokers) == 0 {
config.brokers = []string{q.URL}
}
switch q.DeliveryMode {
case "transient":
config.deliveryMode = amqp.Transient
case "persistent":
config.deliveryMode = amqp.Persistent
default:
config.deliveryMode = amqp.Transient
}
if len(q.Headers) > 0 {
config.headers = make(amqp.Table, len(q.Headers))
for k, v := range q.Headers {
config.headers[k] = v
}
} else {
// Copy deprecated fields into message header
config.headers = amqp.Table{
"database": q.Database,
"retention_policy": q.RetentionPolicy,
}
}
if len(q.ExchangeArguments) > 0 {
config.exchangeArguments = make(amqp.Table, len(q.ExchangeArguments))
for k, v := range q.ExchangeArguments {
config.exchangeArguments[k] = v
}
}
tlsConfig, err := q.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}
config.tlsConfig = tlsConfig
var auth []amqp.Authentication
if strings.ToUpper(q.AuthMethod) == "EXTERNAL" {
auth = []amqp.Authentication{&externalAuth{}}
} else if q.Username != "" || q.Password != "" {
auth = []amqp.Authentication{
&amqp.PlainAuth{
Username: q.Username,
Password: q.Password,
},
}
}
config.auth = auth
return config, nil
}
func connect(config *ClientConfig) (Client, error) {
return Connect(config)
}
func init() {
outputs.Add("amqp", func() telegraf.Output {
return &AMQP{
URL: DefaultURL,
ExchangeType: DefaultExchangeType,
AuthMethod: DefaultAuthMethod,
Database: DefaultDatabase,
RetentionPolicy: DefaultRetentionPolicy,
Timeout: internal.Duration{Duration: time.Second * 5},
connect: connect,
}
})
}