telegraf/plugins/inputs/amqp_consumer/amqp_consumer.go

387 lines
8.9 KiB
Go
Raw Normal View History

2017-03-03 18:24:50 +00:00
package amqp_consumer
import (
"errors"
2017-03-03 18:24:50 +00:00
"fmt"
"log"
"math/rand"
2017-03-03 18:24:50 +00:00
"strings"
"sync"
"time"
"github.com/streadway/amqp"
"github.com/influxdata/telegraf"
2018-05-04 23:33:23 +00:00
"github.com/influxdata/telegraf/internal/tls"
2017-03-03 18:24:50 +00:00
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)
// AMQPConsumer is the top level struct for this plugin
type AMQPConsumer struct {
URL string `toml:"url"` // deprecated in 1.7; use brokers
Brokers []string `toml:"brokers"`
Username string `toml:"username"`
Password string `toml:"password"`
Exchange string `toml:"exchange"`
ExchangeType string `toml:"exchange_type"`
ExchangeDurability string `toml:"exchange_durability"`
ExchangePassive bool `toml:"exchange_passive"`
ExchangeArguments map[string]string `toml:"exchange_arguments"`
2017-03-03 18:24:50 +00:00
// Queue Name
Queue string
// Binding Key
BindingKey string `toml:"binding_key"`
// Controls how many messages the server will try to keep on the network
// for consumers before receiving delivery acks.
PrefetchCount int
// AMQP Auth method
AuthMethod string
2018-05-04 23:33:23 +00:00
tls.ClientConfig
2017-03-03 18:24:50 +00:00
parser parsers.Parser
conn *amqp.Connection
wg *sync.WaitGroup
}
type externalAuth struct{}
func (a *externalAuth) Mechanism() string {
return "EXTERNAL"
}
func (a *externalAuth) Response() string {
return fmt.Sprintf("\000")
}
const (
DefaultAuthMethod = "PLAIN"
DefaultBroker = "amqp://localhost:5672/influxdb"
DefaultExchangeType = "topic"
DefaultExchangeDurability = "durable"
2017-03-03 18:24:50 +00:00
DefaultPrefetchCount = 50
)
func (a *AMQPConsumer) SampleConfig() string {
return `
## Broker to consume from.
## deprecated in 1.7; use the brokers option
# url = "amqp://localhost:5672/influxdb"
## Brokers to consume from. If multiple brokers are specified a random broker
## will be selected anytime a connection is established. This can be
## helpful for load balancing when not using a dedicated load balancer.
brokers = ["amqp://localhost:5672/influxdb"]
## Authentication credentials for the PLAIN auth_method.
# username = ""
# password = ""
## Exchange to declare and consume from.
2017-03-03 18:24:50 +00:00
exchange = "telegraf"
## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash".
# exchange_type = "topic"
## If true, exchange will be passively declared.
# exchange_passive = false
## Exchange durability can be either "transient" or "durable".
# exchange_durability = "durable"
## Additional exchange arguments.
# exchange_arguments = { }
# exchange_arguments = {"hash_propery" = "timestamp"}
2017-03-03 18:24:50 +00:00
## AMQP queue name
queue = "telegraf"
2017-03-03 18:24:50 +00:00
## Binding Key
binding_key = "#"
## Maximum number of messages server should give to the worker.
# prefetch_count = 50
2017-03-03 18:24:50 +00:00
## Auth method. PLAIN and EXTERNAL are supported
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
# auth_method = "PLAIN"
2018-05-04 23:33:23 +00:00
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
2017-03-03 18:24:50 +00:00
# insecure_skip_verify = false
## Data format to consume.
2017-04-27 21:59:18 +00:00
## Each data format has its own unique set of configuration options, read
2017-03-03 18:24:50 +00:00
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
2017-03-03 18:24:50 +00:00
data_format = "influx"
`
}
func (a *AMQPConsumer) Description() string {
return "AMQP consumer plugin"
}
func (a *AMQPConsumer) SetParser(parser parsers.Parser) {
a.parser = parser
}
// All gathering is done in the Start function
func (a *AMQPConsumer) Gather(_ telegraf.Accumulator) error {
return nil
}
func (a *AMQPConsumer) createConfig() (*amqp.Config, error) {
// make new tls config
2018-05-04 23:33:23 +00:00
tls, err := a.ClientConfig.TLSConfig()
2017-03-03 18:24:50 +00:00
if err != nil {
return nil, err
}
var auth []amqp.Authentication
2017-03-03 18:24:50 +00:00
if strings.ToUpper(a.AuthMethod) == "EXTERNAL" {
auth = []amqp.Authentication{&externalAuth{}}
} else if a.Username != "" || a.Password != "" {
auth = []amqp.Authentication{
&amqp.PlainAuth{
Username: a.Username,
Password: a.Password,
},
}
2017-03-03 18:24:50 +00:00
}
config := amqp.Config{
TLSClientConfig: tls,
SASL: auth, // if nil, it will be PLAIN
2017-03-03 18:24:50 +00:00
}
return &config, nil
}
// Start satisfies the telegraf.ServiceInput interface
func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error {
amqpConf, err := a.createConfig()
if err != nil {
return err
}
msgs, err := a.connect(amqpConf)
if err != nil {
return err
}
a.wg = &sync.WaitGroup{}
a.wg.Add(1)
go a.process(msgs, acc)
go func() {
for {
err := <-a.conn.NotifyClose(make(chan *amqp.Error))
if err == nil {
break
2017-03-03 18:24:50 +00:00
}
log.Printf("I! AMQP consumer connection closed: %s; trying to reconnect", err)
for {
msgs, err := a.connect(amqpConf)
if err != nil {
log.Printf("E! AMQP connection failed: %s", err)
time.Sleep(10 * time.Second)
continue
}
a.wg.Add(1)
go a.process(msgs, acc)
break
}
2017-03-03 18:24:50 +00:00
}
}()
return nil
}
func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, error) {
brokers := a.Brokers
if len(brokers) == 0 {
brokers = []string{a.URL}
}
p := rand.Perm(len(brokers))
for _, n := range p {
broker := brokers[n]
log.Printf("D! [amqp_consumer] connecting to %q", broker)
conn, err := amqp.DialConfig(broker, *amqpConf)
if err == nil {
a.conn = conn
log.Printf("D! [amqp_consumer] connected to %q", broker)
break
}
log.Printf("D! [amqp_consumer] error connecting to %q", broker)
}
if a.conn == nil {
return nil, errors.New("could not connect to any broker")
2017-03-03 18:24:50 +00:00
}
ch, err := a.conn.Channel()
2017-03-03 18:24:50 +00:00
if err != nil {
return nil, fmt.Errorf("Failed to open a channel: %s", err)
}
var exchangeDurable = true
switch a.ExchangeDurability {
case "transient":
exchangeDurable = false
default:
exchangeDurable = true
}
exchangeArgs := make(amqp.Table, len(a.ExchangeArguments))
for k, v := range a.ExchangeArguments {
exchangeArgs[k] = v
}
err = declareExchange(
ch,
a.Exchange,
a.ExchangeType,
a.ExchangePassive,
exchangeDurable,
exchangeArgs)
2017-03-03 18:24:50 +00:00
if err != nil {
return nil, err
2017-03-03 18:24:50 +00:00
}
q, err := ch.QueueDeclare(
a.Queue, // queue
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("Failed to declare a queue: %s", err)
}
err = ch.QueueBind(
q.Name, // queue
a.BindingKey, // binding-key
a.Exchange, // exchange
false,
nil,
)
if err != nil {
return nil, fmt.Errorf("Failed to bind a queue: %s", err)
}
err = ch.Qos(
a.PrefetchCount,
0, // prefetch-size
false, // global
)
if err != nil {
return nil, fmt.Errorf("Failed to set QoS: %s", err)
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("Failed establishing connection to queue: %s", err)
}
log.Println("I! Started AMQP consumer")
return msgs, err
}
func declareExchange(
channel *amqp.Channel,
exchangeName string,
exchangeType string,
exchangePassive bool,
exchangeDurable bool,
exchangeArguments amqp.Table,
) error {
var err error
if exchangePassive {
err = channel.ExchangeDeclarePassive(
exchangeName,
exchangeType,
exchangeDurable,
false, // delete when unused
false, // internal
false, // no-wait
exchangeArguments,
)
} else {
err = channel.ExchangeDeclare(
exchangeName,
exchangeType,
exchangeDurable,
false, // delete when unused
false, // internal
false, // no-wait
exchangeArguments,
)
}
if err != nil {
return fmt.Errorf("error declaring exchange: %v", err)
}
return nil
}
2017-03-03 18:24:50 +00:00
// Read messages from queue and add them to the Accumulator
func (a *AMQPConsumer) process(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) {
defer a.wg.Done()
for d := range msgs {
metrics, err := a.parser.Parse(d.Body)
if err != nil {
log.Printf("E! %v: error parsing metric - %v", err, string(d.Body))
} else {
for _, m := range metrics {
acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
}
d.Ack(false)
}
log.Printf("I! AMQP consumer queue closed")
}
func (a *AMQPConsumer) Stop() {
err := a.conn.Close()
if err != nil && err != amqp.ErrClosed {
log.Printf("E! Error closing AMQP connection: %s", err)
return
}
a.wg.Wait()
log.Println("I! Stopped AMQP service")
}
func init() {
inputs.Add("amqp_consumer", func() telegraf.Input {
return &AMQPConsumer{
URL: DefaultBroker,
AuthMethod: DefaultAuthMethod,
ExchangeType: DefaultExchangeType,
ExchangeDurability: DefaultExchangeDurability,
PrefetchCount: DefaultPrefetchCount,
2017-03-03 18:24:50 +00:00
}
})
}