AMQP Consumer plugin (#1678)

This commit is contained in:
Jack Zampolin 2017-03-03 10:24:50 -08:00 committed by Daniel Nelson
parent 1873abd248
commit 10744646db
7 changed files with 357 additions and 6 deletions

View File

@ -55,6 +55,7 @@ be deprecated eventually.
- [#2244](https://github.com/influxdata/telegraf/pull/2244): Support ipmi_sensor plugin querying local ipmi sensors. - [#2244](https://github.com/influxdata/telegraf/pull/2244): Support ipmi_sensor plugin querying local ipmi sensors.
- [#2339](https://github.com/influxdata/telegraf/pull/2339): Increment gather_errors for all errors emitted by inputs. - [#2339](https://github.com/influxdata/telegraf/pull/2339): Increment gather_errors for all errors emitted by inputs.
- [#2071](https://github.com/influxdata/telegraf/issues/2071): Use official docker SDK. - [#2071](https://github.com/influxdata/telegraf/issues/2071): Use official docker SDK.
- [#1678](https://github.com/influxdata/telegraf/pull/1678): Add AMQP consumer input plugin
### Bugfixes ### Bugfixes

View File

@ -97,9 +97,10 @@ configuration options.
## Input Plugins ## Input Plugins
* [aws cloudwatch](./plugins/inputs/cloudwatch)
* [aerospike](./plugins/inputs/aerospike) * [aerospike](./plugins/inputs/aerospike)
* [amqp_consumer](./plugins/inputs/amqp_consumer) (rabbitmq)
* [apache](./plugins/inputs/apache) * [apache](./plugins/inputs/apache)
* [aws cloudwatch](./plugins/inputs/cloudwatch)
* [bcache](./plugins/inputs/bcache) * [bcache](./plugins/inputs/bcache)
* [cassandra](./plugins/inputs/cassandra) * [cassandra](./plugins/inputs/cassandra)
* [ceph](./plugins/inputs/ceph) * [ceph](./plugins/inputs/ceph)

View File

@ -2,6 +2,7 @@ package all
import ( import (
_ "github.com/influxdata/telegraf/plugins/inputs/aerospike" _ "github.com/influxdata/telegraf/plugins/inputs/aerospike"
_ "github.com/influxdata/telegraf/plugins/inputs/amqp_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/apache" _ "github.com/influxdata/telegraf/plugins/inputs/apache"
_ "github.com/influxdata/telegraf/plugins/inputs/bcache" _ "github.com/influxdata/telegraf/plugins/inputs/bcache"
_ "github.com/influxdata/telegraf/plugins/inputs/cassandra" _ "github.com/influxdata/telegraf/plugins/inputs/cassandra"

View File

@ -0,0 +1,47 @@
# AMQP Consumer Input Plugin
This plugin provides a consumer for use with AMQP 0-9-1, a promenent implementation of this protocol being [RabbitMQ](https://www.rabbitmq.com/).
Metrics are read from a topic exchange using the configured queue and binding_key.
Message payload should be formatted in one of the [Telegraf Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md).
For an introduction to AMQP see:
- https://www.rabbitmq.com/tutorials/amqp-concepts.html
- https://www.rabbitmq.com/getstarted.html
The following defaults are known to work with RabbitMQ:
```toml
# AMQP consumer plugin
[[inputs.amqp_consumer]]
## AMQP url
url = "amqp://localhost:5672/influxdb"
## AMQP exchange
exchange = "telegraf"
## AMQP queue name
queue = "telegraf"
## Binding Key
binding_key = "#"
## Controls how many messages the server will try to keep on the network
## for consumers before receiving delivery acks.
#prefetch_count = 50
## Auth method. PLAIN and EXTERNAL are supported.
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
# auth_method = "PLAIN"
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
## Data format to output.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
```

View File

@ -0,0 +1,280 @@
package amqp_consumer
import (
"fmt"
"log"
"strings"
"sync"
"time"
"github.com/streadway/amqp"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)
// AMQPConsumer is the top level struct for this plugin
type AMQPConsumer struct {
URL string
// AMQP exchange
Exchange string
// Queue Name
Queue string
// Binding Key
BindingKey string `toml:"binding_key"`
// Controls how many messages the server will try to keep on the network
// for consumers before receiving delivery acks.
PrefetchCount int
// AMQP Auth method
AuthMethod string
// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
parser parsers.Parser
conn *amqp.Connection
wg *sync.WaitGroup
}
type externalAuth struct{}
func (a *externalAuth) Mechanism() string {
return "EXTERNAL"
}
func (a *externalAuth) Response() string {
return fmt.Sprintf("\000")
}
const (
DefaultAuthMethod = "PLAIN"
DefaultPrefetchCount = 50
)
func (a *AMQPConsumer) SampleConfig() string {
return `
## AMQP url
url = "amqp://localhost:5672/influxdb"
## AMQP exchange
exchange = "telegraf"
## AMQP queue name
queue = "telegraf"
## Binding Key
binding_key = "#"
## Maximum number of messages server should give to the worker.
prefetch_count = 50
## Auth method. PLAIN and EXTERNAL are supported
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
# auth_method = "PLAIN"
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
## Data format to output.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
`
}
func (a *AMQPConsumer) Description() string {
return "AMQP consumer plugin"
}
func (a *AMQPConsumer) SetParser(parser parsers.Parser) {
a.parser = parser
}
// All gathering is done in the Start function
func (a *AMQPConsumer) Gather(_ telegraf.Accumulator) error {
return nil
}
func (a *AMQPConsumer) createConfig() (*amqp.Config, error) {
// make new tls config
tls, err := internal.GetTLSConfig(
a.SSLCert, a.SSLKey, a.SSLCA, a.InsecureSkipVerify)
if err != nil {
return nil, err
}
// parse auth method
var sasl []amqp.Authentication // nil by default
if strings.ToUpper(a.AuthMethod) == "EXTERNAL" {
sasl = []amqp.Authentication{&externalAuth{}}
}
config := amqp.Config{
TLSClientConfig: tls,
SASL: sasl, // if nil, it will be PLAIN
}
return &config, nil
}
// Start satisfies the telegraf.ServiceInput interface
func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error {
amqpConf, err := a.createConfig()
if err != nil {
return err
}
msgs, err := a.connect(amqpConf)
if err != nil {
return err
}
a.wg = &sync.WaitGroup{}
a.wg.Add(1)
go a.process(msgs, acc)
go func() {
err := <-a.conn.NotifyClose(make(chan *amqp.Error))
if err == nil {
return
}
log.Printf("I! AMQP consumer connection closed: %s; trying to reconnect", err)
for {
msgs, err := a.connect(amqpConf)
if err != nil {
log.Printf("E! AMQP connection failed: %s", err)
time.Sleep(10 * time.Second)
continue
}
a.wg.Add(1)
go a.process(msgs, acc)
break
}
}()
return nil
}
func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, error) {
conn, err := amqp.DialConfig(a.URL, *amqpConf)
if err != nil {
return nil, err
}
a.conn = conn
ch, err := conn.Channel()
if err != nil {
return nil, fmt.Errorf("Failed to open a channel: %s", err)
}
err = ch.ExchangeDeclare(
a.Exchange, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("Failed to declare an exchange: %s", err)
}
q, err := ch.QueueDeclare(
a.Queue, // queue
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("Failed to declare a queue: %s", err)
}
err = ch.QueueBind(
q.Name, // queue
a.BindingKey, // binding-key
a.Exchange, // exchange
false,
nil,
)
if err != nil {
return nil, fmt.Errorf("Failed to bind a queue: %s", err)
}
err = ch.Qos(
a.PrefetchCount,
0, // prefetch-size
false, // global
)
if err != nil {
return nil, fmt.Errorf("Failed to set QoS: %s", err)
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("Failed establishing connection to queue: %s", err)
}
log.Println("I! Started AMQP consumer")
return msgs, err
}
// Read messages from queue and add them to the Accumulator
func (a *AMQPConsumer) process(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) {
defer a.wg.Done()
for d := range msgs {
metrics, err := a.parser.Parse(d.Body)
if err != nil {
log.Printf("E! %v: error parsing metric - %v", err, string(d.Body))
} else {
for _, m := range metrics {
acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
}
d.Ack(false)
}
log.Printf("I! AMQP consumer queue closed")
}
func (a *AMQPConsumer) Stop() {
err := a.conn.Close()
if err != nil && err != amqp.ErrClosed {
log.Printf("E! Error closing AMQP connection: %s", err)
return
}
a.wg.Wait()
log.Println("I! Stopped AMQP service")
}
func init() {
inputs.Add("amqp_consumer", func() telegraf.Input {
return &AMQPConsumer{
AuthMethod: DefaultAuthMethod,
PrefetchCount: DefaultPrefetchCount,
}
})
}

View File

@ -1,13 +1,18 @@
# AMQP Output Plugin # AMQP Output Plugin
This plugin writes to a AMQP exchange using tag, defined in configuration file This plugin writes to a AMQP 0-9-1 Exchange, a promenent implementation of this protocol being [RabbitMQ](https://www.rabbitmq.com/).
as RoutingTag, as a routing key.
Metrics are written to a topic exchange using tag, defined in configuration file as RoutingTag, as a routing key.
If RoutingTag is empty, then empty routing key will be used. If RoutingTag is empty, then empty routing key will be used.
Metrics are grouped in batches by RoutingTag. Metrics are grouped in batches by RoutingTag.
This plugin doesn't bind exchange to a queue, so it should be done by consumer. This plugin doesn't bind exchange to a queue, so it should be done by consumer.
For an introduction to AMQP see:
- https://www.rabbitmq.com/tutorials/amqp-concepts.html
- https://www.rabbitmq.com/getstarted.html
### Configuration: ### Configuration:
``` ```
@ -18,6 +23,8 @@ This plugin doesn't bind exchange to a queue, so it should be done by consumer.
## AMQP exchange ## AMQP exchange
exchange = "telegraf" exchange = "telegraf"
## Auth method. PLAIN and EXTERNAL are supported ## Auth method. PLAIN and EXTERNAL are supported
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
# auth_method = "PLAIN" # auth_method = "PLAIN"
## Telegraf tag to use as a routing key ## Telegraf tag to use as a routing key
## ie, if this tag exists, it's value will be used as the routing key ## ie, if this tag exists, it's value will be used as the routing key

View File

@ -40,6 +40,7 @@ type AMQP struct {
// Use SSL but skip chain & host verification // Use SSL but skip chain & host verification
InsecureSkipVerify bool InsecureSkipVerify bool
conn *amqp.Connection
channel *amqp.Channel channel *amqp.Channel
sync.Mutex sync.Mutex
headers amqp.Table headers amqp.Table
@ -68,6 +69,8 @@ var sampleConfig = `
## AMQP exchange ## AMQP exchange
exchange = "telegraf" exchange = "telegraf"
## Auth method. PLAIN and EXTERNAL are supported ## Auth method. PLAIN and EXTERNAL are supported
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
# auth_method = "PLAIN" # auth_method = "PLAIN"
## Telegraf tag to use as a routing key ## Telegraf tag to use as a routing key
## ie, if this tag exists, it's value will be used as the routing key ## ie, if this tag exists, it's value will be used as the routing key
@ -129,6 +132,8 @@ func (q *AMQP) Connect() error {
if err != nil { if err != nil {
return err return err
} }
q.conn = connection
channel, err := connection.Channel() channel, err := connection.Channel()
if err != nil { if err != nil {
return fmt.Errorf("Failed to open a channel: %s", err) return fmt.Errorf("Failed to open a channel: %s", err)
@ -148,7 +153,11 @@ func (q *AMQP) Connect() error {
} }
q.channel = channel q.channel = channel
go func() { go func() {
log.Printf("I! Closing: %s", <-connection.NotifyClose(make(chan *amqp.Error))) err := <-connection.NotifyClose(make(chan *amqp.Error))
if err == nil {
return
}
log.Printf("I! Closing: %s", err)
log.Printf("I! Trying to reconnect") log.Printf("I! Trying to reconnect")
for err := q.Connect(); err != nil; err = q.Connect() { for err := q.Connect(); err != nil; err = q.Connect() {
log.Println("E! ", err.Error()) log.Println("E! ", err.Error())
@ -160,7 +169,12 @@ func (q *AMQP) Connect() error {
} }
func (q *AMQP) Close() error { func (q *AMQP) Close() error {
return q.channel.Close() err := q.conn.Close()
if err != nil && err != amqp.ErrClosed {
log.Printf("E! Error closing AMQP connection: %s", err)
return err
}
return nil
} }
func (q *AMQP) SampleConfig() string { func (q *AMQP) SampleConfig() string {
@ -207,7 +221,7 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
Body: buf, Body: buf,
}) })
if err != nil { if err != nil {
return fmt.Errorf("FAILED to send amqp message: %s", err) return fmt.Errorf("Failed to send AMQP message: %s", err)
} }
} }
return nil return nil