AMQP Consumer plugin (#1678)
This commit is contained in:
parent
53fb5608a8
commit
2fe161356b
|
@ -55,6 +55,7 @@ be deprecated eventually.
|
||||||
- [#2244](https://github.com/influxdata/telegraf/pull/2244): Support ipmi_sensor plugin querying local ipmi sensors.
|
- [#2244](https://github.com/influxdata/telegraf/pull/2244): Support ipmi_sensor plugin querying local ipmi sensors.
|
||||||
- [#2339](https://github.com/influxdata/telegraf/pull/2339): Increment gather_errors for all errors emitted by inputs.
|
- [#2339](https://github.com/influxdata/telegraf/pull/2339): Increment gather_errors for all errors emitted by inputs.
|
||||||
- [#2071](https://github.com/influxdata/telegraf/issues/2071): Use official docker SDK.
|
- [#2071](https://github.com/influxdata/telegraf/issues/2071): Use official docker SDK.
|
||||||
|
- [#1678](https://github.com/influxdata/telegraf/pull/1678): Add AMQP consumer input plugin
|
||||||
|
|
||||||
### Bugfixes
|
### Bugfixes
|
||||||
|
|
||||||
|
|
|
@ -97,9 +97,10 @@ configuration options.
|
||||||
|
|
||||||
## Input Plugins
|
## Input Plugins
|
||||||
|
|
||||||
* [aws cloudwatch](./plugins/inputs/cloudwatch)
|
|
||||||
* [aerospike](./plugins/inputs/aerospike)
|
* [aerospike](./plugins/inputs/aerospike)
|
||||||
|
* [amqp_consumer](./plugins/inputs/amqp_consumer) (rabbitmq)
|
||||||
* [apache](./plugins/inputs/apache)
|
* [apache](./plugins/inputs/apache)
|
||||||
|
* [aws cloudwatch](./plugins/inputs/cloudwatch)
|
||||||
* [bcache](./plugins/inputs/bcache)
|
* [bcache](./plugins/inputs/bcache)
|
||||||
* [cassandra](./plugins/inputs/cassandra)
|
* [cassandra](./plugins/inputs/cassandra)
|
||||||
* [ceph](./plugins/inputs/ceph)
|
* [ceph](./plugins/inputs/ceph)
|
||||||
|
|
|
@ -2,6 +2,7 @@ package all
|
||||||
|
|
||||||
import (
|
import (
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/aerospike"
|
_ "github.com/influxdata/telegraf/plugins/inputs/aerospike"
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/inputs/amqp_consumer"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/apache"
|
_ "github.com/influxdata/telegraf/plugins/inputs/apache"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
|
_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/cassandra"
|
_ "github.com/influxdata/telegraf/plugins/inputs/cassandra"
|
||||||
|
|
|
@ -0,0 +1,47 @@
|
||||||
|
# AMQP Consumer Input Plugin
|
||||||
|
|
||||||
|
This plugin provides a consumer for use with AMQP 0-9-1, a promenent implementation of this protocol being [RabbitMQ](https://www.rabbitmq.com/).
|
||||||
|
|
||||||
|
Metrics are read from a topic exchange using the configured queue and binding_key.
|
||||||
|
|
||||||
|
Message payload should be formatted in one of the [Telegraf Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md).
|
||||||
|
|
||||||
|
For an introduction to AMQP see:
|
||||||
|
- https://www.rabbitmq.com/tutorials/amqp-concepts.html
|
||||||
|
- https://www.rabbitmq.com/getstarted.html
|
||||||
|
|
||||||
|
The following defaults are known to work with RabbitMQ:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
# AMQP consumer plugin
|
||||||
|
[[inputs.amqp_consumer]]
|
||||||
|
## AMQP url
|
||||||
|
url = "amqp://localhost:5672/influxdb"
|
||||||
|
## AMQP exchange
|
||||||
|
exchange = "telegraf"
|
||||||
|
## AMQP queue name
|
||||||
|
queue = "telegraf"
|
||||||
|
## Binding Key
|
||||||
|
binding_key = "#"
|
||||||
|
|
||||||
|
## Controls how many messages the server will try to keep on the network
|
||||||
|
## for consumers before receiving delivery acks.
|
||||||
|
#prefetch_count = 50
|
||||||
|
|
||||||
|
## Auth method. PLAIN and EXTERNAL are supported.
|
||||||
|
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
|
||||||
|
## described here: https://www.rabbitmq.com/plugins.html
|
||||||
|
# auth_method = "PLAIN"
|
||||||
|
## Optional SSL Config
|
||||||
|
# ssl_ca = "/etc/telegraf/ca.pem"
|
||||||
|
# ssl_cert = "/etc/telegraf/cert.pem"
|
||||||
|
# ssl_key = "/etc/telegraf/key.pem"
|
||||||
|
## Use SSL but skip chain & host verification
|
||||||
|
# insecure_skip_verify = false
|
||||||
|
|
||||||
|
## Data format to output.
|
||||||
|
## Each data format has it's own unique set of configuration options, read
|
||||||
|
## more about them here:
|
||||||
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
|
||||||
|
data_format = "influx"
|
||||||
|
```
|
|
@ -0,0 +1,280 @@
|
||||||
|
package amqp_consumer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/streadway/amqp"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
|
)
|
||||||
|
|
||||||
|
// AMQPConsumer is the top level struct for this plugin
|
||||||
|
type AMQPConsumer struct {
|
||||||
|
URL string
|
||||||
|
// AMQP exchange
|
||||||
|
Exchange string
|
||||||
|
// Queue Name
|
||||||
|
Queue string
|
||||||
|
// Binding Key
|
||||||
|
BindingKey string `toml:"binding_key"`
|
||||||
|
|
||||||
|
// Controls how many messages the server will try to keep on the network
|
||||||
|
// for consumers before receiving delivery acks.
|
||||||
|
PrefetchCount int
|
||||||
|
|
||||||
|
// AMQP Auth method
|
||||||
|
AuthMethod string
|
||||||
|
// Path to CA file
|
||||||
|
SSLCA string `toml:"ssl_ca"`
|
||||||
|
// Path to host cert file
|
||||||
|
SSLCert string `toml:"ssl_cert"`
|
||||||
|
// Path to cert key file
|
||||||
|
SSLKey string `toml:"ssl_key"`
|
||||||
|
// Use SSL but skip chain & host verification
|
||||||
|
InsecureSkipVerify bool
|
||||||
|
|
||||||
|
parser parsers.Parser
|
||||||
|
conn *amqp.Connection
|
||||||
|
wg *sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
type externalAuth struct{}
|
||||||
|
|
||||||
|
func (a *externalAuth) Mechanism() string {
|
||||||
|
return "EXTERNAL"
|
||||||
|
}
|
||||||
|
func (a *externalAuth) Response() string {
|
||||||
|
return fmt.Sprintf("\000")
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
DefaultAuthMethod = "PLAIN"
|
||||||
|
DefaultPrefetchCount = 50
|
||||||
|
)
|
||||||
|
|
||||||
|
func (a *AMQPConsumer) SampleConfig() string {
|
||||||
|
return `
|
||||||
|
## AMQP url
|
||||||
|
url = "amqp://localhost:5672/influxdb"
|
||||||
|
## AMQP exchange
|
||||||
|
exchange = "telegraf"
|
||||||
|
## AMQP queue name
|
||||||
|
queue = "telegraf"
|
||||||
|
## Binding Key
|
||||||
|
binding_key = "#"
|
||||||
|
|
||||||
|
## Maximum number of messages server should give to the worker.
|
||||||
|
prefetch_count = 50
|
||||||
|
|
||||||
|
## Auth method. PLAIN and EXTERNAL are supported
|
||||||
|
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
|
||||||
|
## described here: https://www.rabbitmq.com/plugins.html
|
||||||
|
# auth_method = "PLAIN"
|
||||||
|
|
||||||
|
## Optional SSL Config
|
||||||
|
# ssl_ca = "/etc/telegraf/ca.pem"
|
||||||
|
# ssl_cert = "/etc/telegraf/cert.pem"
|
||||||
|
# ssl_key = "/etc/telegraf/key.pem"
|
||||||
|
## Use SSL but skip chain & host verification
|
||||||
|
# insecure_skip_verify = false
|
||||||
|
|
||||||
|
## Data format to output.
|
||||||
|
## Each data format has it's own unique set of configuration options, read
|
||||||
|
## more about them here:
|
||||||
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
|
||||||
|
data_format = "influx"
|
||||||
|
`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *AMQPConsumer) Description() string {
|
||||||
|
return "AMQP consumer plugin"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *AMQPConsumer) SetParser(parser parsers.Parser) {
|
||||||
|
a.parser = parser
|
||||||
|
}
|
||||||
|
|
||||||
|
// All gathering is done in the Start function
|
||||||
|
func (a *AMQPConsumer) Gather(_ telegraf.Accumulator) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *AMQPConsumer) createConfig() (*amqp.Config, error) {
|
||||||
|
// make new tls config
|
||||||
|
tls, err := internal.GetTLSConfig(
|
||||||
|
a.SSLCert, a.SSLKey, a.SSLCA, a.InsecureSkipVerify)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// parse auth method
|
||||||
|
var sasl []amqp.Authentication // nil by default
|
||||||
|
|
||||||
|
if strings.ToUpper(a.AuthMethod) == "EXTERNAL" {
|
||||||
|
sasl = []amqp.Authentication{&externalAuth{}}
|
||||||
|
}
|
||||||
|
|
||||||
|
config := amqp.Config{
|
||||||
|
TLSClientConfig: tls,
|
||||||
|
SASL: sasl, // if nil, it will be PLAIN
|
||||||
|
}
|
||||||
|
return &config, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start satisfies the telegraf.ServiceInput interface
|
||||||
|
func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error {
|
||||||
|
amqpConf, err := a.createConfig()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs, err := a.connect(amqpConf)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
a.wg = &sync.WaitGroup{}
|
||||||
|
a.wg.Add(1)
|
||||||
|
go a.process(msgs, acc)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
err := <-a.conn.NotifyClose(make(chan *amqp.Error))
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("I! AMQP consumer connection closed: %s; trying to reconnect", err)
|
||||||
|
for {
|
||||||
|
msgs, err := a.connect(amqpConf)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("E! AMQP connection failed: %s", err)
|
||||||
|
time.Sleep(10 * time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
a.wg.Add(1)
|
||||||
|
go a.process(msgs, acc)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, error) {
|
||||||
|
conn, err := amqp.DialConfig(a.URL, *amqpConf)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
a.conn = conn
|
||||||
|
|
||||||
|
ch, err := conn.Channel()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Failed to open a channel: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = ch.ExchangeDeclare(
|
||||||
|
a.Exchange, // name
|
||||||
|
"topic", // type
|
||||||
|
true, // durable
|
||||||
|
false, // auto-deleted
|
||||||
|
false, // internal
|
||||||
|
false, // no-wait
|
||||||
|
nil, // arguments
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Failed to declare an exchange: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
q, err := ch.QueueDeclare(
|
||||||
|
a.Queue, // queue
|
||||||
|
true, // durable
|
||||||
|
false, // delete when unused
|
||||||
|
false, // exclusive
|
||||||
|
false, // no-wait
|
||||||
|
nil, // arguments
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Failed to declare a queue: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = ch.QueueBind(
|
||||||
|
q.Name, // queue
|
||||||
|
a.BindingKey, // binding-key
|
||||||
|
a.Exchange, // exchange
|
||||||
|
false,
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Failed to bind a queue: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = ch.Qos(
|
||||||
|
a.PrefetchCount,
|
||||||
|
0, // prefetch-size
|
||||||
|
false, // global
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Failed to set QoS: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs, err := ch.Consume(
|
||||||
|
q.Name, // queue
|
||||||
|
"", // consumer
|
||||||
|
false, // auto-ack
|
||||||
|
false, // exclusive
|
||||||
|
false, // no-local
|
||||||
|
false, // no-wait
|
||||||
|
nil, // arguments
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Failed establishing connection to queue: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Println("I! Started AMQP consumer")
|
||||||
|
return msgs, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read messages from queue and add them to the Accumulator
|
||||||
|
func (a *AMQPConsumer) process(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) {
|
||||||
|
defer a.wg.Done()
|
||||||
|
for d := range msgs {
|
||||||
|
metrics, err := a.parser.Parse(d.Body)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("E! %v: error parsing metric - %v", err, string(d.Body))
|
||||||
|
} else {
|
||||||
|
for _, m := range metrics {
|
||||||
|
acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
d.Ack(false)
|
||||||
|
}
|
||||||
|
log.Printf("I! AMQP consumer queue closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *AMQPConsumer) Stop() {
|
||||||
|
err := a.conn.Close()
|
||||||
|
if err != nil && err != amqp.ErrClosed {
|
||||||
|
log.Printf("E! Error closing AMQP connection: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
a.wg.Wait()
|
||||||
|
log.Println("I! Stopped AMQP service")
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
inputs.Add("amqp_consumer", func() telegraf.Input {
|
||||||
|
return &AMQPConsumer{
|
||||||
|
AuthMethod: DefaultAuthMethod,
|
||||||
|
PrefetchCount: DefaultPrefetchCount,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
|
@ -1,13 +1,18 @@
|
||||||
# AMQP Output Plugin
|
# AMQP Output Plugin
|
||||||
|
|
||||||
This plugin writes to a AMQP exchange using tag, defined in configuration file
|
This plugin writes to a AMQP 0-9-1 Exchange, a promenent implementation of this protocol being [RabbitMQ](https://www.rabbitmq.com/).
|
||||||
as RoutingTag, as a routing key.
|
|
||||||
|
Metrics are written to a topic exchange using tag, defined in configuration file as RoutingTag, as a routing key.
|
||||||
|
|
||||||
If RoutingTag is empty, then empty routing key will be used.
|
If RoutingTag is empty, then empty routing key will be used.
|
||||||
Metrics are grouped in batches by RoutingTag.
|
Metrics are grouped in batches by RoutingTag.
|
||||||
|
|
||||||
This plugin doesn't bind exchange to a queue, so it should be done by consumer.
|
This plugin doesn't bind exchange to a queue, so it should be done by consumer.
|
||||||
|
|
||||||
|
For an introduction to AMQP see:
|
||||||
|
- https://www.rabbitmq.com/tutorials/amqp-concepts.html
|
||||||
|
- https://www.rabbitmq.com/getstarted.html
|
||||||
|
|
||||||
### Configuration:
|
### Configuration:
|
||||||
|
|
||||||
```
|
```
|
||||||
|
@ -18,6 +23,8 @@ This plugin doesn't bind exchange to a queue, so it should be done by consumer.
|
||||||
## AMQP exchange
|
## AMQP exchange
|
||||||
exchange = "telegraf"
|
exchange = "telegraf"
|
||||||
## Auth method. PLAIN and EXTERNAL are supported
|
## Auth method. PLAIN and EXTERNAL are supported
|
||||||
|
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
|
||||||
|
## described here: https://www.rabbitmq.com/plugins.html
|
||||||
# auth_method = "PLAIN"
|
# auth_method = "PLAIN"
|
||||||
## Telegraf tag to use as a routing key
|
## Telegraf tag to use as a routing key
|
||||||
## ie, if this tag exists, it's value will be used as the routing key
|
## ie, if this tag exists, it's value will be used as the routing key
|
||||||
|
|
|
@ -40,6 +40,7 @@ type AMQP struct {
|
||||||
// Use SSL but skip chain & host verification
|
// Use SSL but skip chain & host verification
|
||||||
InsecureSkipVerify bool
|
InsecureSkipVerify bool
|
||||||
|
|
||||||
|
conn *amqp.Connection
|
||||||
channel *amqp.Channel
|
channel *amqp.Channel
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
headers amqp.Table
|
headers amqp.Table
|
||||||
|
@ -68,6 +69,8 @@ var sampleConfig = `
|
||||||
## AMQP exchange
|
## AMQP exchange
|
||||||
exchange = "telegraf"
|
exchange = "telegraf"
|
||||||
## Auth method. PLAIN and EXTERNAL are supported
|
## Auth method. PLAIN and EXTERNAL are supported
|
||||||
|
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
|
||||||
|
## described here: https://www.rabbitmq.com/plugins.html
|
||||||
# auth_method = "PLAIN"
|
# auth_method = "PLAIN"
|
||||||
## Telegraf tag to use as a routing key
|
## Telegraf tag to use as a routing key
|
||||||
## ie, if this tag exists, it's value will be used as the routing key
|
## ie, if this tag exists, it's value will be used as the routing key
|
||||||
|
@ -129,6 +132,8 @@ func (q *AMQP) Connect() error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
q.conn = connection
|
||||||
|
|
||||||
channel, err := connection.Channel()
|
channel, err := connection.Channel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Failed to open a channel: %s", err)
|
return fmt.Errorf("Failed to open a channel: %s", err)
|
||||||
|
@ -148,7 +153,11 @@ func (q *AMQP) Connect() error {
|
||||||
}
|
}
|
||||||
q.channel = channel
|
q.channel = channel
|
||||||
go func() {
|
go func() {
|
||||||
log.Printf("I! Closing: %s", <-connection.NotifyClose(make(chan *amqp.Error)))
|
err := <-connection.NotifyClose(make(chan *amqp.Error))
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Printf("I! Closing: %s", err)
|
||||||
log.Printf("I! Trying to reconnect")
|
log.Printf("I! Trying to reconnect")
|
||||||
for err := q.Connect(); err != nil; err = q.Connect() {
|
for err := q.Connect(); err != nil; err = q.Connect() {
|
||||||
log.Println("E! ", err.Error())
|
log.Println("E! ", err.Error())
|
||||||
|
@ -160,7 +169,12 @@ func (q *AMQP) Connect() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *AMQP) Close() error {
|
func (q *AMQP) Close() error {
|
||||||
return q.channel.Close()
|
err := q.conn.Close()
|
||||||
|
if err != nil && err != amqp.ErrClosed {
|
||||||
|
log.Printf("E! Error closing AMQP connection: %s", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *AMQP) SampleConfig() string {
|
func (q *AMQP) SampleConfig() string {
|
||||||
|
@ -207,7 +221,7 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
|
||||||
Body: buf,
|
Body: buf,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("FAILED to send amqp message: %s", err)
|
return fmt.Errorf("Failed to send AMQP message: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Reference in New Issue