package amqp_consumer import ( "context" "errors" "fmt" "math/rand" "strings" "sync" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" "github.com/streadway/amqp" ) const ( defaultMaxUndeliveredMessages = 1000 ) type empty struct{} type semaphore chan empty // AMQPConsumer is the top level struct for this plugin type AMQPConsumer struct { URL string `toml:"url"` // deprecated in 1.7; use brokers Brokers []string `toml:"brokers"` Username string `toml:"username"` Password string `toml:"password"` Exchange string `toml:"exchange"` ExchangeType string `toml:"exchange_type"` ExchangeDurability string `toml:"exchange_durability"` ExchangePassive bool `toml:"exchange_passive"` ExchangeArguments map[string]string `toml:"exchange_arguments"` MaxUndeliveredMessages int `toml:"max_undelivered_messages"` // Queue Name Queue string `toml:"queue"` QueueDurability string `toml:"queue_durability"` QueuePassive bool `toml:"queue_passive"` // Binding Key BindingKey string `toml:"binding_key"` // Controls how many messages the server will try to keep on the network // for consumers before receiving delivery acks. PrefetchCount int // AMQP Auth method AuthMethod string tls.ClientConfig ContentEncoding string `toml:"content_encoding"` Log telegraf.Logger deliveries map[telegraf.TrackingID]amqp.Delivery parser parsers.Parser conn *amqp.Connection wg *sync.WaitGroup cancel context.CancelFunc decoder internal.ContentDecoder } type externalAuth struct{} func (a *externalAuth) Mechanism() string { return "EXTERNAL" } func (a *externalAuth) Response() string { return fmt.Sprintf("\000") } const ( DefaultAuthMethod = "PLAIN" DefaultBroker = "amqp://localhost:5672/influxdb" DefaultExchangeType = "topic" DefaultExchangeDurability = "durable" DefaultQueueDurability = "durable" DefaultPrefetchCount = 50 ) func (a *AMQPConsumer) SampleConfig() string { return ` ## Broker to consume from. ## deprecated in 1.7; use the brokers option # url = "amqp://localhost:5672/influxdb" ## Brokers to consume from. If multiple brokers are specified a random broker ## will be selected anytime a connection is established. This can be ## helpful for load balancing when not using a dedicated load balancer. brokers = ["amqp://localhost:5672/influxdb"] ## Authentication credentials for the PLAIN auth_method. # username = "" # password = "" ## Name of the exchange to declare. If unset, no exchange will be declared. exchange = "telegraf" ## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash". # exchange_type = "topic" ## If true, exchange will be passively declared. # exchange_passive = false ## Exchange durability can be either "transient" or "durable". # exchange_durability = "durable" ## Additional exchange arguments. # exchange_arguments = { } # exchange_arguments = {"hash_property" = "timestamp"} ## AMQP queue name. queue = "telegraf" ## AMQP queue durability can be "transient" or "durable". queue_durability = "durable" ## If true, queue will be passively declared. # queue_passive = false ## A binding between the exchange and queue using this binding key is ## created. If unset, no binding is created. binding_key = "#" ## Maximum number of messages server should give to the worker. # prefetch_count = 50 ## Maximum messages to read from the broker that have not been written by an ## output. For best throughput set based on the number of metrics within ## each message and the size of the output's metric_batch_size. ## ## For example, if each message from the queue contains 10 metrics and the ## output metric_batch_size is 1000, setting this to 100 will ensure that a ## full batch is collected and the write is triggered immediately without ## waiting until the next flush_interval. # max_undelivered_messages = 1000 ## Auth method. PLAIN and EXTERNAL are supported ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as ## described here: https://www.rabbitmq.com/plugins.html # auth_method = "PLAIN" ## Optional TLS Config # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false ## Content encoding for message payloads, can be set to "gzip" to or ## "identity" to apply no encoding. # content_encoding = "identity" ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "influx" ` } func (a *AMQPConsumer) Description() string { return "AMQP consumer plugin" } func (a *AMQPConsumer) SetParser(parser parsers.Parser) { a.parser = parser } // All gathering is done in the Start function func (a *AMQPConsumer) Gather(_ telegraf.Accumulator) error { return nil } func (a *AMQPConsumer) createConfig() (*amqp.Config, error) { // make new tls config tls, err := a.ClientConfig.TLSConfig() if err != nil { return nil, err } var auth []amqp.Authentication if strings.ToUpper(a.AuthMethod) == "EXTERNAL" { auth = []amqp.Authentication{&externalAuth{}} } else if a.Username != "" || a.Password != "" { auth = []amqp.Authentication{ &amqp.PlainAuth{ Username: a.Username, Password: a.Password, }, } } config := amqp.Config{ TLSClientConfig: tls, SASL: auth, // if nil, it will be PLAIN } return &config, nil } // Start satisfies the telegraf.ServiceInput interface func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error { amqpConf, err := a.createConfig() if err != nil { return err } a.decoder, err = internal.NewContentDecoder(a.ContentEncoding) if err != nil { return err } msgs, err := a.connect(amqpConf) if err != nil { return err } ctx, cancel := context.WithCancel(context.Background()) a.cancel = cancel a.wg = &sync.WaitGroup{} a.wg.Add(1) go func() { defer a.wg.Done() a.process(ctx, msgs, acc) }() go func() { for { err := <-a.conn.NotifyClose(make(chan *amqp.Error)) if err == nil { break } a.Log.Infof("Connection closed: %s; trying to reconnect", err) for { msgs, err := a.connect(amqpConf) if err != nil { a.Log.Errorf("AMQP connection failed: %s", err) time.Sleep(10 * time.Second) continue } a.wg.Add(1) go func() { defer a.wg.Done() a.process(ctx, msgs, acc) }() break } } }() return nil } func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, error) { brokers := a.Brokers if len(brokers) == 0 { brokers = []string{a.URL} } p := rand.Perm(len(brokers)) for _, n := range p { broker := brokers[n] a.Log.Debugf("Connecting to %q", broker) conn, err := amqp.DialConfig(broker, *amqpConf) if err == nil { a.conn = conn a.Log.Debugf("Connected to %q", broker) break } a.Log.Debugf("Error connecting to %q", broker) } if a.conn == nil { return nil, errors.New("could not connect to any broker") } ch, err := a.conn.Channel() if err != nil { return nil, fmt.Errorf("Failed to open a channel: %s", err.Error()) } if a.Exchange != "" { var exchangeDurable = true switch a.ExchangeDurability { case "transient": exchangeDurable = false default: exchangeDurable = true } exchangeArgs := make(amqp.Table, len(a.ExchangeArguments)) for k, v := range a.ExchangeArguments { exchangeArgs[k] = v } err = declareExchange( ch, a.Exchange, a.ExchangeType, a.ExchangePassive, exchangeDurable, exchangeArgs) if err != nil { return nil, err } } q, err := declareQueue( ch, a.Queue, a.QueueDurability, a.QueuePassive) if err != nil { return nil, err } if a.BindingKey != "" { err = ch.QueueBind( q.Name, // queue a.BindingKey, // binding-key a.Exchange, // exchange false, nil, ) if err != nil { return nil, fmt.Errorf("Failed to bind a queue: %s", err) } } err = ch.Qos( a.PrefetchCount, 0, // prefetch-size false, // global ) if err != nil { return nil, fmt.Errorf("Failed to set QoS: %s", err) } msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // arguments ) if err != nil { return nil, fmt.Errorf("Failed establishing connection to queue: %s", err) } return msgs, err } func declareExchange( channel *amqp.Channel, exchangeName string, exchangeType string, exchangePassive bool, exchangeDurable bool, exchangeArguments amqp.Table, ) error { var err error if exchangePassive { err = channel.ExchangeDeclarePassive( exchangeName, exchangeType, exchangeDurable, false, // delete when unused false, // internal false, // no-wait exchangeArguments, ) } else { err = channel.ExchangeDeclare( exchangeName, exchangeType, exchangeDurable, false, // delete when unused false, // internal false, // no-wait exchangeArguments, ) } if err != nil { return fmt.Errorf("Error declaring exchange: %v", err) } return nil } func declareQueue( channel *amqp.Channel, queueName string, queueDurability string, queuePassive bool, ) (*amqp.Queue, error) { var queue amqp.Queue var err error var queueDurable = true switch queueDurability { case "transient": queueDurable = false default: queueDurable = true } if queuePassive { queue, err = channel.QueueDeclarePassive( queueName, // queue queueDurable, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) } else { queue, err = channel.QueueDeclare( queueName, // queue queueDurable, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) } if err != nil { return nil, fmt.Errorf("Error declaring queue: %v", err) } return &queue, nil } // Read messages from queue and add them to the Accumulator func (a *AMQPConsumer) process(ctx context.Context, msgs <-chan amqp.Delivery, ac telegraf.Accumulator) { a.deliveries = make(map[telegraf.TrackingID]amqp.Delivery) acc := ac.WithTracking(a.MaxUndeliveredMessages) sem := make(semaphore, a.MaxUndeliveredMessages) for { select { case <-ctx.Done(): return case track := <-acc.Delivered(): if a.onDelivery(track) { <-sem } case sem <- empty{}: select { case <-ctx.Done(): return case track := <-acc.Delivered(): if a.onDelivery(track) { <-sem <-sem } case d, ok := <-msgs: if !ok { return } err := a.onMessage(acc, d) if err != nil { acc.AddError(err) <-sem } } } } } func (a *AMQPConsumer) onMessage(acc telegraf.TrackingAccumulator, d amqp.Delivery) error { onError := func() { // Discard the message from the queue; will never be able to process // this message. rejErr := d.Ack(false) if rejErr != nil { a.Log.Errorf("Unable to reject message: %d: %v", d.DeliveryTag, rejErr) a.conn.Close() } } body, err := a.decoder.Decode(d.Body) if err != nil { onError() return err } metrics, err := a.parser.Parse(body) if err != nil { onError() return err } id := acc.AddTrackingMetricGroup(metrics) a.deliveries[id] = d return nil } func (a *AMQPConsumer) onDelivery(track telegraf.DeliveryInfo) bool { delivery, ok := a.deliveries[track.ID()] if !ok { // Added by a previous connection return false } if track.Delivered() { err := delivery.Ack(false) if err != nil { a.Log.Errorf("Unable to ack written delivery: %d: %v", delivery.DeliveryTag, err) a.conn.Close() } } else { err := delivery.Reject(false) if err != nil { a.Log.Errorf("Unable to reject failed delivery: %d: %v", delivery.DeliveryTag, err) a.conn.Close() } } delete(a.deliveries, track.ID()) return true } func (a *AMQPConsumer) Stop() { a.cancel() a.wg.Wait() err := a.conn.Close() if err != nil && err != amqp.ErrClosed { a.Log.Errorf("Error closing AMQP connection: %s", err) return } } func init() { inputs.Add("amqp_consumer", func() telegraf.Input { return &AMQPConsumer{ URL: DefaultBroker, AuthMethod: DefaultAuthMethod, ExchangeType: DefaultExchangeType, ExchangeDurability: DefaultExchangeDurability, QueueDurability: DefaultQueueDurability, PrefetchCount: DefaultPrefetchCount, MaxUndeliveredMessages: defaultMaxUndeliveredMessages, } }) }