package amqp import ( "fmt" "log" "net" "strings" "sync" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers" "github.com/streadway/amqp" ) type client struct { conn *amqp.Connection channel *amqp.Channel headers amqp.Table } type AMQP struct { // AMQP brokers to send metrics to URL string // AMQP exchange Exchange string // AMQP Auth method AuthMethod string // Routing Key Tag RoutingTag string `toml:"routing_tag"` // InfluxDB database Database string // InfluxDB retention policy RetentionPolicy string // InfluxDB precision (DEPRECATED) Precision string // Connection timeout Timeout internal.Duration // Delivery Mode controls if a published message is persistent // Valid options are "transient" and "persistent". default: "transient" DeliveryMode string // Path to CA file SSLCA string `toml:"ssl_ca"` // Path to host cert file SSLCert string `toml:"ssl_cert"` // Path to cert key file SSLKey string `toml:"ssl_key"` // Use SSL but skip chain & host verification InsecureSkipVerify bool sync.Mutex c *client deliveryMode uint8 serializer serializers.Serializer } type externalAuth struct{} func (a *externalAuth) Mechanism() string { return "EXTERNAL" } func (a *externalAuth) Response() string { return fmt.Sprintf("\000") } const ( DefaultAuthMethod = "PLAIN" DefaultRetentionPolicy = "default" DefaultDatabase = "telegraf" ) var sampleConfig = ` ## AMQP url url = "amqp://localhost:5672/influxdb" ## AMQP exchange exchange = "telegraf" ## Auth method. PLAIN and EXTERNAL are supported ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as ## described here: https://www.rabbitmq.com/plugins.html # auth_method = "PLAIN" ## Telegraf tag to use as a routing key ## ie, if this tag exists, its value will be used as the routing key routing_tag = "host" ## Delivery Mode controls if a published message is persistent ## Valid options are "transient" and "persistent". default: "transient" delivery_mode = "transient" ## InfluxDB retention policy # retention_policy = "default" ## InfluxDB database # database = "telegraf" ## Write timeout, formatted as a string. If not provided, will default ## to 5s. 0s means no timeout (not recommended). # timeout = "5s" ## Optional SSL Config # ssl_ca = "/etc/telegraf/ca.pem" # ssl_cert = "/etc/telegraf/cert.pem" # ssl_key = "/etc/telegraf/key.pem" ## Use SSL but skip chain & host verification # insecure_skip_verify = false ## Data format to output. ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "influx" ` func (a *AMQP) SetSerializer(serializer serializers.Serializer) { a.serializer = serializer } func (q *AMQP) Connect() error { switch q.DeliveryMode { case "transient": q.deliveryMode = amqp.Transient break case "persistent": q.deliveryMode = amqp.Persistent break default: q.deliveryMode = amqp.Transient break } headers := amqp.Table{ "database": q.Database, "retention_policy": q.RetentionPolicy, } var connection *amqp.Connection // make new tls config tls, err := internal.GetTLSConfig( q.SSLCert, q.SSLKey, q.SSLCA, q.InsecureSkipVerify) if err != nil { return err } // parse auth method var sasl []amqp.Authentication // nil by default if strings.ToUpper(q.AuthMethod) == "EXTERNAL" { sasl = []amqp.Authentication{&externalAuth{}} } amqpConf := amqp.Config{ TLSClientConfig: tls, SASL: sasl, // if nil, it will be PLAIN Dial: func(network, addr string) (net.Conn, error) { return net.DialTimeout(network, addr, q.Timeout.Duration) }, } connection, err = amqp.DialConfig(q.URL, amqpConf) if err != nil { return err } channel, err := connection.Channel() if err != nil { return fmt.Errorf("Failed to open a channel: %s", err) } err = channel.ExchangeDeclare( q.Exchange, // name "topic", // type true, // durable false, // delete when unused false, // internal false, // no-wait nil, // arguments ) if err != nil { return fmt.Errorf("Failed to declare an exchange: %s", err) } q.setClient(&client{ conn: connection, channel: channel, headers: headers, }) go func() { err := <-connection.NotifyClose(make(chan *amqp.Error)) if err == nil { return } q.setClient(nil) log.Printf("I! Closing: %s", err) log.Printf("I! Trying to reconnect") for err := q.Connect(); err != nil; err = q.Connect() { log.Println("E! ", err.Error()) time.Sleep(10 * time.Second) } }() return nil } func (q *AMQP) Close() error { c := q.getClient() if c == nil { return nil } err := c.conn.Close() if err != nil && err != amqp.ErrClosed { log.Printf("E! Error closing AMQP connection: %s", err) return err } return nil } func (q *AMQP) SampleConfig() string { return sampleConfig } func (q *AMQP) Description() string { return "Configuration for the AMQP server to send metrics to" } func (q *AMQP) Write(metrics []telegraf.Metric) error { if len(metrics) == 0 { return nil } c := q.getClient() if c == nil { return fmt.Errorf("connection is not open") } outbuf := make(map[string][]byte) for _, metric := range metrics { var key string if q.RoutingTag != "" { if h, ok := metric.Tags()[q.RoutingTag]; ok { key = h } } buf, err := q.serializer.Serialize(metric) if err != nil { return err } outbuf[key] = append(outbuf[key], buf...) } for key, buf := range outbuf { // Note that since the channel is not in confirm mode, the absence of // an error does not indicate successful delivery. err := c.channel.Publish( q.Exchange, // exchange key, // routing key false, // mandatory false, // immediate amqp.Publishing{ Headers: c.headers, ContentType: "text/plain", Body: buf, DeliveryMode: q.deliveryMode, }) if err != nil { return fmt.Errorf("Failed to send AMQP message: %s", err) } } return nil } func (q *AMQP) getClient() *client { q.Lock() defer q.Unlock() return q.c } func (q *AMQP) setClient(c *client) { q.Lock() q.c = c q.Unlock() } func init() { outputs.Add("amqp", func() telegraf.Output { return &AMQP{ AuthMethod: DefaultAuthMethod, Database: DefaultDatabase, RetentionPolicy: DefaultRetentionPolicy, Timeout: internal.Duration{Duration: time.Second * 5}, } }) }