diff --git a/CHANGELOG.md b/CHANGELOG.md index fe5631767..323b23915 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,7 @@ be deprecated eventually. - [#2244](https://github.com/influxdata/telegraf/pull/2244): Support ipmi_sensor plugin querying local ipmi sensors. - [#2339](https://github.com/influxdata/telegraf/pull/2339): Increment gather_errors for all errors emitted by inputs. - [#2071](https://github.com/influxdata/telegraf/issues/2071): Use official docker SDK. +- [#1678](https://github.com/influxdata/telegraf/pull/1678): Add AMQP consumer input plugin ### Bugfixes diff --git a/README.md b/README.md index 3dd06e93a..915c7b761 100644 --- a/README.md +++ b/README.md @@ -97,9 +97,10 @@ configuration options. ## Input Plugins -* [aws cloudwatch](./plugins/inputs/cloudwatch) * [aerospike](./plugins/inputs/aerospike) +* [amqp_consumer](./plugins/inputs/amqp_consumer) (rabbitmq) * [apache](./plugins/inputs/apache) +* [aws cloudwatch](./plugins/inputs/cloudwatch) * [bcache](./plugins/inputs/bcache) * [cassandra](./plugins/inputs/cassandra) * [ceph](./plugins/inputs/ceph) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 924dffe3d..a9147c53e 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -2,6 +2,7 @@ package all import ( _ "github.com/influxdata/telegraf/plugins/inputs/aerospike" + _ "github.com/influxdata/telegraf/plugins/inputs/amqp_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/apache" _ "github.com/influxdata/telegraf/plugins/inputs/bcache" _ "github.com/influxdata/telegraf/plugins/inputs/cassandra" diff --git a/plugins/inputs/amqp_consumer/README.md b/plugins/inputs/amqp_consumer/README.md new file mode 100644 index 000000000..85780700f --- /dev/null +++ b/plugins/inputs/amqp_consumer/README.md @@ -0,0 +1,47 @@ +# AMQP Consumer Input Plugin + +This plugin provides a consumer for use with AMQP 0-9-1, a promenent implementation of this protocol being [RabbitMQ](https://www.rabbitmq.com/). + +Metrics are read from a topic exchange using the configured queue and binding_key. + +Message payload should be formatted in one of the [Telegraf Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md). + +For an introduction to AMQP see: +- https://www.rabbitmq.com/tutorials/amqp-concepts.html +- https://www.rabbitmq.com/getstarted.html + +The following defaults are known to work with RabbitMQ: + +```toml +# AMQP consumer plugin +[[inputs.amqp_consumer]] + ## AMQP url + url = "amqp://localhost:5672/influxdb" + ## AMQP exchange + exchange = "telegraf" + ## AMQP queue name + queue = "telegraf" + ## Binding Key + binding_key = "#" + + ## Controls how many messages the server will try to keep on the network + ## for consumers before receiving delivery acks. + #prefetch_count = 50 + + ## Auth method. PLAIN and EXTERNAL are supported. + ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as + ## described here: https://www.rabbitmq.com/plugins.html + # auth_method = "PLAIN" + ## Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ## Use SSL but skip chain & host verification + # insecure_skip_verify = false + + ## Data format to output. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "influx" +``` diff --git a/plugins/inputs/amqp_consumer/amqp_consumer.go b/plugins/inputs/amqp_consumer/amqp_consumer.go new file mode 100644 index 000000000..6f12244aa --- /dev/null +++ b/plugins/inputs/amqp_consumer/amqp_consumer.go @@ -0,0 +1,280 @@ +package amqp_consumer + +import ( + "fmt" + "log" + "strings" + "sync" + "time" + + "github.com/streadway/amqp" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" +) + +// AMQPConsumer is the top level struct for this plugin +type AMQPConsumer struct { + URL string + // AMQP exchange + Exchange string + // Queue Name + Queue string + // Binding Key + BindingKey string `toml:"binding_key"` + + // Controls how many messages the server will try to keep on the network + // for consumers before receiving delivery acks. + PrefetchCount int + + // AMQP Auth method + AuthMethod string + // Path to CA file + SSLCA string `toml:"ssl_ca"` + // Path to host cert file + SSLCert string `toml:"ssl_cert"` + // Path to cert key file + SSLKey string `toml:"ssl_key"` + // Use SSL but skip chain & host verification + InsecureSkipVerify bool + + parser parsers.Parser + conn *amqp.Connection + wg *sync.WaitGroup +} + +type externalAuth struct{} + +func (a *externalAuth) Mechanism() string { + return "EXTERNAL" +} +func (a *externalAuth) Response() string { + return fmt.Sprintf("\000") +} + +const ( + DefaultAuthMethod = "PLAIN" + DefaultPrefetchCount = 50 +) + +func (a *AMQPConsumer) SampleConfig() string { + return ` + ## AMQP url + url = "amqp://localhost:5672/influxdb" + ## AMQP exchange + exchange = "telegraf" + ## AMQP queue name + queue = "telegraf" + ## Binding Key + binding_key = "#" + + ## Maximum number of messages server should give to the worker. + prefetch_count = 50 + + ## Auth method. PLAIN and EXTERNAL are supported + ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as + ## described here: https://www.rabbitmq.com/plugins.html + # auth_method = "PLAIN" + + ## Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ## Use SSL but skip chain & host verification + # insecure_skip_verify = false + + ## Data format to output. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "influx" +` +} + +func (a *AMQPConsumer) Description() string { + return "AMQP consumer plugin" +} + +func (a *AMQPConsumer) SetParser(parser parsers.Parser) { + a.parser = parser +} + +// All gathering is done in the Start function +func (a *AMQPConsumer) Gather(_ telegraf.Accumulator) error { + return nil +} + +func (a *AMQPConsumer) createConfig() (*amqp.Config, error) { + // make new tls config + tls, err := internal.GetTLSConfig( + a.SSLCert, a.SSLKey, a.SSLCA, a.InsecureSkipVerify) + if err != nil { + return nil, err + } + + // parse auth method + var sasl []amqp.Authentication // nil by default + + if strings.ToUpper(a.AuthMethod) == "EXTERNAL" { + sasl = []amqp.Authentication{&externalAuth{}} + } + + config := amqp.Config{ + TLSClientConfig: tls, + SASL: sasl, // if nil, it will be PLAIN + } + return &config, nil +} + +// Start satisfies the telegraf.ServiceInput interface +func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error { + amqpConf, err := a.createConfig() + if err != nil { + return err + } + + msgs, err := a.connect(amqpConf) + if err != nil { + return err + } + + a.wg = &sync.WaitGroup{} + a.wg.Add(1) + go a.process(msgs, acc) + + go func() { + err := <-a.conn.NotifyClose(make(chan *amqp.Error)) + if err == nil { + return + } + + log.Printf("I! AMQP consumer connection closed: %s; trying to reconnect", err) + for { + msgs, err := a.connect(amqpConf) + if err != nil { + log.Printf("E! AMQP connection failed: %s", err) + time.Sleep(10 * time.Second) + continue + } + + a.wg.Add(1) + go a.process(msgs, acc) + break + } + }() + + return nil +} + +func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, error) { + conn, err := amqp.DialConfig(a.URL, *amqpConf) + if err != nil { + return nil, err + } + a.conn = conn + + ch, err := conn.Channel() + if err != nil { + return nil, fmt.Errorf("Failed to open a channel: %s", err) + } + + err = ch.ExchangeDeclare( + a.Exchange, // name + "topic", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + if err != nil { + return nil, fmt.Errorf("Failed to declare an exchange: %s", err) + } + + q, err := ch.QueueDeclare( + a.Queue, // queue + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + if err != nil { + return nil, fmt.Errorf("Failed to declare a queue: %s", err) + } + + err = ch.QueueBind( + q.Name, // queue + a.BindingKey, // binding-key + a.Exchange, // exchange + false, + nil, + ) + if err != nil { + return nil, fmt.Errorf("Failed to bind a queue: %s", err) + } + + err = ch.Qos( + a.PrefetchCount, + 0, // prefetch-size + false, // global + ) + if err != nil { + return nil, fmt.Errorf("Failed to set QoS: %s", err) + } + + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + false, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // arguments + ) + if err != nil { + return nil, fmt.Errorf("Failed establishing connection to queue: %s", err) + } + + log.Println("I! Started AMQP consumer") + return msgs, err +} + +// Read messages from queue and add them to the Accumulator +func (a *AMQPConsumer) process(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) { + defer a.wg.Done() + for d := range msgs { + metrics, err := a.parser.Parse(d.Body) + if err != nil { + log.Printf("E! %v: error parsing metric - %v", err, string(d.Body)) + } else { + for _, m := range metrics { + acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + } + } + + d.Ack(false) + } + log.Printf("I! AMQP consumer queue closed") +} + +func (a *AMQPConsumer) Stop() { + err := a.conn.Close() + if err != nil && err != amqp.ErrClosed { + log.Printf("E! Error closing AMQP connection: %s", err) + return + } + a.wg.Wait() + log.Println("I! Stopped AMQP service") +} + +func init() { + inputs.Add("amqp_consumer", func() telegraf.Input { + return &AMQPConsumer{ + AuthMethod: DefaultAuthMethod, + PrefetchCount: DefaultPrefetchCount, + } + }) +} diff --git a/plugins/outputs/amqp/README.md b/plugins/outputs/amqp/README.md index d49c507b8..208ae934c 100644 --- a/plugins/outputs/amqp/README.md +++ b/plugins/outputs/amqp/README.md @@ -1,13 +1,18 @@ # AMQP Output Plugin -This plugin writes to a AMQP exchange using tag, defined in configuration file -as RoutingTag, as a routing key. +This plugin writes to a AMQP 0-9-1 Exchange, a promenent implementation of this protocol being [RabbitMQ](https://www.rabbitmq.com/). + +Metrics are written to a topic exchange using tag, defined in configuration file as RoutingTag, as a routing key. If RoutingTag is empty, then empty routing key will be used. Metrics are grouped in batches by RoutingTag. This plugin doesn't bind exchange to a queue, so it should be done by consumer. +For an introduction to AMQP see: +- https://www.rabbitmq.com/tutorials/amqp-concepts.html +- https://www.rabbitmq.com/getstarted.html + ### Configuration: ``` @@ -18,6 +23,8 @@ This plugin doesn't bind exchange to a queue, so it should be done by consumer. ## AMQP exchange exchange = "telegraf" ## Auth method. PLAIN and EXTERNAL are supported + ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as + ## described here: https://www.rabbitmq.com/plugins.html # auth_method = "PLAIN" ## Telegraf tag to use as a routing key ## ie, if this tag exists, it's value will be used as the routing key diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go index d86cac596..7b4c7d4c9 100644 --- a/plugins/outputs/amqp/amqp.go +++ b/plugins/outputs/amqp/amqp.go @@ -40,6 +40,7 @@ type AMQP struct { // Use SSL but skip chain & host verification InsecureSkipVerify bool + conn *amqp.Connection channel *amqp.Channel sync.Mutex headers amqp.Table @@ -68,6 +69,8 @@ var sampleConfig = ` ## AMQP exchange exchange = "telegraf" ## Auth method. PLAIN and EXTERNAL are supported + ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as + ## described here: https://www.rabbitmq.com/plugins.html # auth_method = "PLAIN" ## Telegraf tag to use as a routing key ## ie, if this tag exists, it's value will be used as the routing key @@ -129,6 +132,8 @@ func (q *AMQP) Connect() error { if err != nil { return err } + q.conn = connection + channel, err := connection.Channel() if err != nil { return fmt.Errorf("Failed to open a channel: %s", err) @@ -148,7 +153,11 @@ func (q *AMQP) Connect() error { } q.channel = channel go func() { - log.Printf("I! Closing: %s", <-connection.NotifyClose(make(chan *amqp.Error))) + err := <-connection.NotifyClose(make(chan *amqp.Error)) + if err == nil { + return + } + log.Printf("I! Closing: %s", err) log.Printf("I! Trying to reconnect") for err := q.Connect(); err != nil; err = q.Connect() { log.Println("E! ", err.Error()) @@ -160,7 +169,12 @@ func (q *AMQP) Connect() error { } func (q *AMQP) Close() error { - return q.channel.Close() + err := q.conn.Close() + if err != nil && err != amqp.ErrClosed { + log.Printf("E! Error closing AMQP connection: %s", err) + return err + } + return nil } func (q *AMQP) SampleConfig() string { @@ -207,7 +221,7 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error { Body: buf, }) if err != nil { - return fmt.Errorf("FAILED to send amqp message: %s", err) + return fmt.Errorf("Failed to send AMQP message: %s", err) } } return nil