diff --git a/plugins/inputs/amqp_consumer/README.md b/plugins/inputs/amqp_consumer/README.md index 84661aec3..e7701ad73 100644 --- a/plugins/inputs/amqp_consumer/README.md +++ b/plugins/inputs/amqp_consumer/README.md @@ -50,7 +50,7 @@ The following defaults are known to work with RabbitMQ: binding_key = "#" ## Maximum number of messages server should give to the worker. - prefetch_count = 50 + # prefetch_count = 50 ## Auth method. PLAIN and EXTERNAL are supported ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as diff --git a/plugins/inputs/amqp_consumer/amqp_consumer.go b/plugins/inputs/amqp_consumer/amqp_consumer.go index d39c995c4..1dde6fe23 100644 --- a/plugins/inputs/amqp_consumer/amqp_consumer.go +++ b/plugins/inputs/amqp_consumer/amqp_consumer.go @@ -100,11 +100,12 @@ func (a *AMQPConsumer) SampleConfig() string { ## AMQP queue name queue = "telegraf" + ## Binding Key binding_key = "#" ## Maximum number of messages server should give to the worker. - prefetch_count = 50 + # prefetch_count = 50 ## Auth method. PLAIN and EXTERNAL are supported ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as diff --git a/plugins/outputs/amqp/README.md b/plugins/outputs/amqp/README.md index ebc555142..817eea673 100644 --- a/plugins/outputs/amqp/README.md +++ b/plugins/outputs/amqp/README.md @@ -2,27 +2,15 @@ This plugin writes to a AMQP 0-9-1 Exchange, a promenent implementation of this protocol being [RabbitMQ](https://www.rabbitmq.com/). -Metrics are written to a topic exchange using a routing key defined by: -1. The routing_key config defines a static value -2. The routing_tag config defines a metric tag with a dynamic value, overriding the static routing_key if found -3. If neither option is defined, or the tag is not found in a metric, then the empty routing key will be used - -Metrics are grouped in batches by the final routing key. - -This plugin doesn't bind exchange to a queue, so it should be done by consumer. The exchange is always defined as type: topic. -To use it for distributing metrics equally among workers (type: direct), set the routing_key to a static value on the exchange, -declare and bind a single queue with the same routing_key, and consume from the same queue in each worker. -To use it to send metrics to many consumers at once (type: fanout), set the routing_key to "#" on the exchange, then declare, bind, -and consume from individual queues in each worker. +This plugin does not bind the exchange to a queue. For an introduction to AMQP see: - https://www.rabbitmq.com/tutorials/amqp-concepts.html - https://www.rabbitmq.com/getstarted.html ### Configuration: - ```toml -# Configuration for the AMQP server to send metrics to +# Publishes metrics to an AMQP broker [[outputs.amqp]] ## Broker to publish to. ## deprecated in 1.7; use the brokers option @@ -33,9 +21,10 @@ For an introduction to AMQP see: ## helpful for load balancing when not using a dedicated load balancer. brokers = ["amqp://localhost:5672/influxdb"] - ## Authentication credentials for the PLAIN auth_method. - # username = "" - # password = "" + ## Maximum messages to send over a connection. Once this is reached, the + ## connection is closed and a new connection is made. This can be helpful for + ## load balancing when not using a dedicated load balancer. + # max_messages = 0 ## Exchange to declare and publish to. exchange = "telegraf" @@ -44,36 +33,51 @@ For an introduction to AMQP see: # exchange_type = "topic" ## If true, exchange will be passively declared. - # exchange_passive = false + # exchange_declare_passive = false - ## Exchange durability can be either "transient" or "durable". - # exchange_durability = "durable" + ## If true, exchange will be created as a durable exchange. + # exchange_durable = true ## Additional exchange arguments. # exchange_args = { } # exchange_args = {"hash_propery" = "timestamp"} + ## Authentication credentials for the PLAIN auth_method. + # username = "" + # password = "" + ## Auth method. PLAIN and EXTERNAL are supported ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as ## described here: https://www.rabbitmq.com/plugins.html # auth_method = "PLAIN" - ## Topic routing key - # routing_key = "" - ## Telegraf tag to use as a routing key - ## ie, if this tag exists, its value will be used as the routing key - ## and override routing_key config even if defined - routing_tag = "host" - ## Delivery Mode controls if a published message is persistent - ## Valid options are "transient" and "persistent". default: "transient" - delivery_mode = "transient" - ## InfluxDB retention policy - # retention_policy = "default" - ## InfluxDB database + ## Metric tag to use as a routing key. + ## ie, if this tag exists, its value will be used as the routing key + # routing_tag = "host" + + ## Static routing key. Used when no routing_tag is set or as a fallback + ## when the tag specified in routing tag is not found. + # routing_key = "" + # routing_key = "telegraf" + + ## Delivery Mode controls if a published message is persistent. + ## One of "transient" or "persistent". + # delivery_mode = "transient" + + ## InfluxDB database added as a message header. + ## deprecated in 1.7; use the headers option # database = "telegraf" - ## Write timeout, formatted as a string. If not provided, will default - ## to 5s. 0s means no timeout (not recommended). + ## InfluxDB retention policy added as a message header + ## deprecated in 1.7; use the headers option + # retention_policy = "default" + + ## Static headers added to each published message. + # headers = { } + # headers = {"database" = "telegraf", "retention_policy" = "default"} + + ## Connection timeout. If not provided, will default to 5s. 0s means no + ## timeout (not recommended). # timeout = "5s" ## Optional TLS Config @@ -83,9 +87,25 @@ For an introduction to AMQP see: ## Use TLS but skip chain & host verification # insecure_skip_verify = false + ## If true use batch serialization format instead of line based delimiting. + ## Only applies to data formats which are not line based such as JSON. + ## Recommended to set to true. + # use_batch_format = false + ## Data format to output. ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md - data_format = "influx" + # data_format = "influx" ``` + +#### Routing + +If `routing_tag` is set, and the tag is defined on the metric, the value of +the tag is used as the routing key. Otherwise the value of `routing_key` is +used directly. If both are unset the empty string is used. + +Exchange types that do not use a routing key, `direct` and `header`, always +use the empty string as the routing key. + +Metrics are published in batches based on the final routing key. diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go index dfed5713b..faaa1027c 100644 --- a/plugins/outputs/amqp/amqp.go +++ b/plugins/outputs/amqp/amqp.go @@ -1,13 +1,10 @@ package amqp import ( - "errors" + "bytes" "fmt" "log" - "math/rand" - "net" "strings" - "sync" "time" "github.com/influxdata/telegraf" @@ -19,70 +16,58 @@ import ( "github.com/streadway/amqp" ) -type client struct { - conn *amqp.Connection - channel *amqp.Channel - headers amqp.Table -} - -type AMQP struct { - URL string `toml:"url"` // deprecated in 1.7; use brokers - Brokers []string `toml:"brokers"` - Username string `toml:"username"` - Password string `toml:"password"` - Exchange string `toml:"exchange"` - ExchangeType string `toml:"exchange_type"` - ExchangeDurability string `toml:"exchange_durability"` - ExchangePassive bool `toml:"exchange_passive"` - ExchangeArguments map[string]string `toml:"exchange_arguments"` - - // AMQP Auth method - AuthMethod string - // Routing Key (static) - RoutingKey string `toml:"routing_key"` - // Routing Key from Tag - RoutingTag string `toml:"routing_tag"` - // InfluxDB database - Database string - // InfluxDB retention policy - RetentionPolicy string - // InfluxDB precision (DEPRECATED) - Precision string - // Connection timeout - Timeout internal.Duration - // Delivery Mode controls if a published message is persistent - // Valid options are "transient" and "persistent". default: "transient" - DeliveryMode string - - tls.ClientConfig - - sync.Mutex - c *client - - deliveryMode uint8 - serializer serializers.Serializer -} +const ( + DefaultURL = "amqp://localhost:5672/influxdb" + DefaultAuthMethod = "PLAIN" + DefaultExchangeType = "topic" + DefaultRetentionPolicy = "default" + DefaultDatabase = "telegraf" +) type externalAuth struct{} func (a *externalAuth) Mechanism() string { return "EXTERNAL" } + func (a *externalAuth) Response() string { return fmt.Sprintf("\000") } -const ( - DefaultAuthMethod = "PLAIN" +type AMQP struct { + URL string `toml:"url"` // deprecated in 1.7; use brokers + Brokers []string `toml:"brokers"` + Exchange string `toml:"exchange"` + ExchangeType string `toml:"exchange_type"` + ExchangePassive bool `toml:"exchange_passive"` + ExchangeDurability string `toml:"exchange_durability"` + ExchangeArguments map[string]string `toml:"exchange_arguments"` + Username string `toml:"username"` + Password string `toml:"password"` + MaxMessages int `toml:"max_messages"` + AuthMethod string `toml:"auth_method"` + RoutingTag string `toml:"routing_tag"` + RoutingKey string `toml:"routing_key"` + DeliveryMode string `toml:"delivery_mode"` + Database string `toml:"database"` // deprecated in 1.7; use headers + RetentionPolicy string `toml:"retention_policy"` // deprecated in 1.7; use headers + Precision string `toml:"precision"` // deprecated; has no effect + Headers map[string]string `toml:"headers"` + Timeout internal.Duration `toml:"timeout"` + UseBatchFormat bool `toml:"use_batch_format"` + tls.ClientConfig - DefaultBroker = "amqp://localhost:5672/influxdb" + serializer serializers.Serializer + connect func(*ClientConfig) (Client, error) + client Client + config *ClientConfig + sentMessages int +} - DefaultExchangeType = "topic" - DefaultExchangeDurability = "durable" - - DefaultRetentionPolicy = "default" - DefaultDatabase = "telegraf" -) +type Client interface { + Publish(key string, body []byte) error + Close() error +} var sampleConfig = ` ## Broker to publish to. @@ -94,9 +79,10 @@ var sampleConfig = ` ## helpful for load balancing when not using a dedicated load balancer. brokers = ["amqp://localhost:5672/influxdb"] - ## Authentication credentials for the PLAIN auth_method. - # username = "" - # password = "" + ## Maximum messages to send over a connection. Once this is reached, the + ## connection is closed and a new connection is made. This can be helpful for + ## load balancing when not using a dedicated load balancer. + # max_messages = 0 ## Exchange to declare and publish to. exchange = "telegraf" @@ -105,36 +91,51 @@ var sampleConfig = ` # exchange_type = "topic" ## If true, exchange will be passively declared. - # exchange_passive = false + # exchange_declare_passive = false - ## Exchange durability can be either "transient" or "durable". - # exchange_durability = "durable" + ## If true, exchange will be created as a durable exchange. + # exchange_durable = true ## Additional exchange arguments. # exchange_args = { } # exchange_args = {"hash_propery" = "timestamp"} + ## Authentication credentials for the PLAIN auth_method. + # username = "" + # password = "" + ## Auth method. PLAIN and EXTERNAL are supported ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as ## described here: https://www.rabbitmq.com/plugins.html # auth_method = "PLAIN" - ## Topic routing key - # routing_key = "" - ## Telegraf tag to use as a routing key - ## ie, if this tag exists, its value will be used as the routing key - ## and override routing_key config even if defined - routing_tag = "host" - ## Delivery Mode controls if a published message is persistent - ## Valid options are "transient" and "persistent". default: "transient" - delivery_mode = "transient" - ## InfluxDB retention policy - # retention_policy = "default" - ## InfluxDB database + ## Metric tag to use as a routing key. + ## ie, if this tag exists, its value will be used as the routing key + # routing_tag = "host" + + ## Static routing key. Used when no routing_tag is set or as a fallback + ## when the tag specified in routing tag is not found. + # routing_key = "" + # routing_key = "telegraf" + + ## Delivery Mode controls if a published message is persistent. + ## One of "transient" or "persistent". + # delivery_mode = "transient" + + ## InfluxDB database added as a message header. + ## deprecated in 1.7; use the headers option # database = "telegraf" - ## Write timeout, formatted as a string. If not provided, will default - ## to 5s. 0s means no timeout (not recommended). + ## InfluxDB retention policy added as a message header + ## deprecated in 1.7; use the headers option + # retention_policy = "default" + + ## Static headers added to each published message. + # headers = { } + # headers = {"database" = "telegraf", "retention_policy" = "default"} + + ## Connection timeout. If not provided, will default to 5s. 0s means no + ## timeout (not recommended). # timeout = "5s" ## Optional TLS Config @@ -144,40 +145,208 @@ var sampleConfig = ` ## Use TLS but skip chain & host verification # insecure_skip_verify = false + ## If true use batch serialization format instead of line based delimiting. + ## Only applies to data formats which are not line based such as JSON. + ## Recommended to set to true. + # use_batch_format = false + ## Data format to output. ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md - data_format = "influx" + # data_format = "influx" ` -func (a *AMQP) SetSerializer(serializer serializers.Serializer) { - a.serializer = serializer +func (q *AMQP) SampleConfig() string { + return sampleConfig +} + +func (q *AMQP) Description() string { + return "Publishes metrics to an AMQP broker" +} + +func (q *AMQP) SetSerializer(serializer serializers.Serializer) { + q.serializer = serializer } func (q *AMQP) Connect() error { - switch q.DeliveryMode { - case "transient": - q.deliveryMode = amqp.Transient - break - case "persistent": - q.deliveryMode = amqp.Persistent - break - default: - q.deliveryMode = amqp.Transient - break + if q.config == nil { + config, err := q.makeClientConfig() + if err != nil { + return err + } + q.config = config } - headers := amqp.Table{ - "database": q.Database, - "retention_policy": q.RetentionPolicy, - } - - // make new tls config - tls, err := q.ClientConfig.TLSConfig() + client, err := q.connect(q.config) if err != nil { return err } + q.client = client + + return nil +} + +func (q *AMQP) Close() error { + if q.client != nil { + return q.client.Close() + } + return nil +} + +func (q *AMQP) routingKey(metric telegraf.Metric) string { + if q.RoutingTag != "" { + key, ok := metric.GetTag(q.RoutingTag) + if ok { + return key + } + } + return q.RoutingKey +} + +func (q *AMQP) Write(metrics []telegraf.Metric) error { + batches := make(map[string][]telegraf.Metric) + if q.ExchangeType == "direct" || q.ExchangeType == "header" { + // Since the routing_key is ignored for these exchange types send as a + // single batch. + batches[""] = metrics + } else { + for _, metric := range metrics { + routingKey := q.routingKey(metric) + if _, ok := batches[routingKey]; !ok { + batches[routingKey] = make([]telegraf.Metric, 0) + } + + batches[routingKey] = append(batches[routingKey], metric) + } + } + + first := true + for key, metrics := range batches { + body, err := q.serialize(metrics) + if err != nil { + return err + } + + err = q.publish(key, body) + if err != nil { + // If this is the first attempt to publish and the connection is + // closed, try to reconnect and retry once. + if aerr, ok := err.(*amqp.Error); first && ok && aerr == amqp.ErrClosed { + first = false + q.client = nil + err := q.publish(key, body) + if err != nil { + return err + } + } else { + q.client = nil + return err + } + } + first = false + } + + if q.sentMessages >= q.MaxMessages && q.MaxMessages > 0 { + log.Printf("D! Output [amqp] sent MaxMessages; closing connection") + q.client = nil + } + + return nil +} + +func (q *AMQP) publish(key string, body []byte) error { + if q.client == nil { + client, err := q.connect(q.config) + if err != nil { + return err + } + q.sentMessages = 0 + q.client = client + } + + err := q.client.Publish(key, body) + if err != nil { + return err + } + q.sentMessages++ + return nil +} + +func (q *AMQP) serialize(metrics []telegraf.Metric) ([]byte, error) { + if q.UseBatchFormat { + return q.serializer.SerializeBatch(metrics) + } else { + var buf bytes.Buffer + for _, metric := range metrics { + octets, err := q.serializer.Serialize(metric) + if err != nil { + return nil, err + } + _, err = buf.Write(octets) + if err != nil { + return nil, err + } + } + body := buf.Bytes() + return body, nil + } +} + +func (q *AMQP) makeClientConfig() (*ClientConfig, error) { + config := &ClientConfig{ + exchange: q.Exchange, + exchangeType: q.ExchangeType, + exchangePassive: q.ExchangePassive, + timeout: q.Timeout.Duration, + } + + switch q.ExchangeDurability { + case "transient": + config.exchangeDurable = false + default: + config.exchangeDurable = true + } + + config.brokers = q.Brokers + if len(config.brokers) == 0 { + config.brokers = []string{q.URL} + } + + switch q.DeliveryMode { + case "transient": + config.deliveryMode = amqp.Transient + case "persistent": + config.deliveryMode = amqp.Persistent + default: + config.deliveryMode = amqp.Transient + } + + if len(q.Headers) > 0 { + config.headers = make(amqp.Table, len(q.Headers)) + for k, v := range q.Headers { + config.headers[k] = v + } + } else { + // Copy deprecated fields into message header + config.headers = amqp.Table{ + "database": q.Database, + "retention_policy": q.RetentionPolicy, + } + } + + if len(q.ExchangeArguments) > 0 { + config.exchangeArguments = make(amqp.Table, len(q.ExchangeArguments)) + for k, v := range q.ExchangeArguments { + config.exchangeArguments[k] = v + } + } + + tlsConfig, err := q.ClientConfig.TLSConfig() + if err != nil { + return nil, err + } + config.tlsConfig = tlsConfig var auth []amqp.Authentication if strings.ToUpper(q.AuthMethod) == "EXTERNAL" { @@ -190,223 +359,25 @@ func (q *AMQP) Connect() error { }, } } + config.auth = auth - brokers := q.Brokers - if len(brokers) == 0 { - brokers = []string{q.URL} - } - - amqpConf := amqp.Config{ - TLSClientConfig: tls, - SASL: auth, // if nil, it will be PLAIN - Dial: func(network, addr string) (net.Conn, error) { - return net.DialTimeout(network, addr, q.Timeout.Duration) - }, - } - - var connection *amqp.Connection - p := rand.Perm(len(brokers)) - for _, n := range p { - broker := brokers[n] - log.Printf("D! Output [amqp] connecting to %q", broker) - conn, err := amqp.DialConfig(broker, amqpConf) - if err == nil { - connection = conn - log.Printf("D! Output [amqp] connected to %q", broker) - break - } - log.Printf("D! Output [amqp] error connecting to %q", broker) - } - - if connection == nil { - return errors.New("could not connect to any broker") - } - - channel, err := connection.Channel() - if err != nil { - return fmt.Errorf("Failed to open a channel: %s", err) - } - - var exchangeDurable = true - switch q.ExchangeDurability { - case "transient": - exchangeDurable = false - default: - exchangeDurable = true - } - - exchangeArgs := make(amqp.Table, len(q.ExchangeArguments)) - for k, v := range q.ExchangeArguments { - exchangeArgs[k] = v - } - - err = declareExchange( - channel, - q.Exchange, - q.ExchangeType, - q.ExchangePassive, - exchangeDurable, - exchangeArgs) - if err != nil { - return err - } - - q.setClient(&client{ - conn: connection, - channel: channel, - headers: headers, - }) - - go func() { - err := <-connection.NotifyClose(make(chan *amqp.Error)) - if err == nil { - return - } - - q.setClient(nil) - - log.Printf("I! Closing: %s", err) - log.Printf("I! Trying to reconnect") - for err := q.Connect(); err != nil; err = q.Connect() { - log.Println("E! ", err.Error()) - time.Sleep(10 * time.Second) - } - }() - return nil + return config, nil } -func declareExchange( - channel *amqp.Channel, - exchangeName string, - exchangeType string, - exchangePassive bool, - exchangeDurable bool, - exchangeArguments amqp.Table, -) error { - var err error - if exchangePassive { - err = channel.ExchangeDeclarePassive( - exchangeName, - exchangeType, - exchangeDurable, - false, // delete when unused - false, // internal - false, // no-wait - exchangeArguments, - ) - } else { - err = channel.ExchangeDeclare( - exchangeName, - exchangeType, - exchangeDurable, - false, // delete when unused - false, // internal - false, // no-wait - exchangeArguments, - ) - } - if err != nil { - return fmt.Errorf("error declaring exchange: %v", err) - } - return nil -} - -func (q *AMQP) Close() error { - c := q.getClient() - if c == nil { - return nil - } - - err := c.conn.Close() - if err != nil && err != amqp.ErrClosed { - log.Printf("E! Error closing AMQP connection: %s", err) - return err - } - return nil -} - -func (q *AMQP) SampleConfig() string { - return sampleConfig -} - -func (q *AMQP) Description() string { - return "Configuration for the AMQP server to send metrics to" -} - -func (q *AMQP) Write(metrics []telegraf.Metric) error { - if len(metrics) == 0 { - return nil - } - - c := q.getClient() - if c == nil { - return fmt.Errorf("connection is not open") - } - - outbuf := make(map[string][]byte) - - for _, metric := range metrics { - var key string - if q.RoutingKey != "" { - key = q.RoutingKey - } - if q.RoutingTag != "" { - if h, ok := metric.Tags()[q.RoutingTag]; ok { - key = h - } - } - - buf, err := q.serializer.Serialize(metric) - if err != nil { - return err - } - - outbuf[key] = append(outbuf[key], buf...) - } - - for key, buf := range outbuf { - // Note that since the channel is not in confirm mode, the absence of - // an error does not indicate successful delivery. - err := c.channel.Publish( - q.Exchange, // exchange - key, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - Headers: c.headers, - ContentType: "text/plain", - Body: buf, - DeliveryMode: q.deliveryMode, - }) - if err != nil { - return fmt.Errorf("Failed to send AMQP message: %s", err) - } - } - return nil -} - -func (q *AMQP) getClient() *client { - q.Lock() - defer q.Unlock() - return q.c -} - -func (q *AMQP) setClient(c *client) { - q.Lock() - q.c = c - q.Unlock() +func connect(config *ClientConfig) (Client, error) { + return Connect(config) } func init() { outputs.Add("amqp", func() telegraf.Output { return &AMQP{ - URL: DefaultBroker, - AuthMethod: DefaultAuthMethod, - ExchangeType: DefaultExchangeType, - ExchangeDurability: DefaultExchangeDurability, - Database: DefaultDatabase, - RetentionPolicy: DefaultRetentionPolicy, - Timeout: internal.Duration{Duration: time.Second * 5}, + URL: DefaultURL, + ExchangeType: DefaultExchangeType, + AuthMethod: DefaultAuthMethod, + Database: DefaultDatabase, + RetentionPolicy: DefaultRetentionPolicy, + Timeout: internal.Duration{Duration: time.Second * 5}, + connect: connect, } }) } diff --git a/plugins/outputs/amqp/amqp_test.go b/plugins/outputs/amqp/amqp_test.go index 66a082627..32a914528 100644 --- a/plugins/outputs/amqp/amqp_test.go +++ b/plugins/outputs/amqp/amqp_test.go @@ -2,30 +2,161 @@ package amqp import ( "testing" + "time" - "github.com/influxdata/telegraf/plugins/serializers" - "github.com/influxdata/telegraf/testutil" + "github.com/influxdata/telegraf/internal" + "github.com/streadway/amqp" "github.com/stretchr/testify/require" ) -func TestConnectAndWrite(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integration test in short mode") - } +type MockClient struct { + PublishF func(key string, body []byte) error + CloseF func() error - var url = "amqp://" + testutil.GetLocalHost() + ":5672/" - s, _ := serializers.NewInfluxSerializer() - q := &AMQP{ - URL: url, - Exchange: "telegraf_test", - serializer: s, - } + PublishCallCount int + CloseCallCount int - // Verify that we can connect to the AMQP broker - err := q.Connect() - require.NoError(t, err) - - // Verify that we can successfully write data to the amqp broker - err = q.Write(testutil.MockMetrics()) - require.NoError(t, err) + t *testing.T +} + +func (c *MockClient) Publish(key string, body []byte) error { + c.PublishCallCount++ + return c.PublishF(key, body) +} + +func (c *MockClient) Close() error { + c.CloseCallCount++ + return c.CloseF() +} + +func MockConnect(config *ClientConfig) (Client, error) { + return &MockClient{}, nil +} + +func NewMockClient() Client { + return &MockClient{ + PublishF: func(key string, body []byte) error { + return nil + }, + CloseF: func() error { + return nil + }, + } +} + +func TestConnect(t *testing.T) { + tests := []struct { + name string + output *AMQP + errFunc func(t *testing.T, output *AMQP, err error) + }{ + { + name: "defaults", + output: &AMQP{ + Brokers: []string{DefaultURL}, + ExchangeType: DefaultExchangeType, + ExchangeDurability: "durable", + AuthMethod: DefaultAuthMethod, + Database: DefaultDatabase, + RetentionPolicy: DefaultRetentionPolicy, + Timeout: internal.Duration{Duration: time.Second * 5}, + connect: func(config *ClientConfig) (Client, error) { + return NewMockClient(), nil + }, + }, + errFunc: func(t *testing.T, output *AMQP, err error) { + config := output.config + require.Equal(t, []string{DefaultURL}, config.brokers) + require.Equal(t, "", config.exchange) + require.Equal(t, "topic", config.exchangeType) + require.Equal(t, false, config.exchangePassive) + require.Equal(t, true, config.exchangeDurable) + require.Equal(t, amqp.Table(nil), config.exchangeArguments) + require.Equal(t, amqp.Table{ + "database": DefaultDatabase, + "retention_policy": DefaultRetentionPolicy, + }, config.headers) + require.Equal(t, amqp.Transient, config.deliveryMode) + require.NoError(t, err) + }, + }, + { + name: "headers overrides deprecated dbrp", + output: &AMQP{ + Headers: map[string]string{ + "foo": "bar", + }, + connect: func(config *ClientConfig) (Client, error) { + return NewMockClient(), nil + }, + }, + errFunc: func(t *testing.T, output *AMQP, err error) { + config := output.config + require.Equal(t, amqp.Table{ + "foo": "bar", + }, config.headers) + require.NoError(t, err) + }, + }, + { + name: "exchange args", + output: &AMQP{ + ExchangeArguments: map[string]string{ + "foo": "bar", + }, + connect: func(config *ClientConfig) (Client, error) { + return NewMockClient(), nil + }, + }, + errFunc: func(t *testing.T, output *AMQP, err error) { + config := output.config + require.Equal(t, amqp.Table{ + "foo": "bar", + }, config.exchangeArguments) + require.NoError(t, err) + }, + }, + { + name: "username password", + output: &AMQP{ + URL: "amqp://foo:bar@localhost", + Username: "telegraf", + Password: "pa$$word", + connect: func(config *ClientConfig) (Client, error) { + return NewMockClient(), nil + }, + }, + errFunc: func(t *testing.T, output *AMQP, err error) { + config := output.config + require.Equal(t, []amqp.Authentication{ + &amqp.PlainAuth{ + Username: "telegraf", + Password: "pa$$word", + }, + }, config.auth) + + require.NoError(t, err) + }, + }, + { + name: "url support", + output: &AMQP{ + URL: DefaultURL, + connect: func(config *ClientConfig) (Client, error) { + return NewMockClient(), nil + }, + }, + errFunc: func(t *testing.T, output *AMQP, err error) { + config := output.config + require.Equal(t, []string{DefaultURL}, config.brokers) + require.NoError(t, err) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.output.Connect() + tt.errFunc(t, tt.output, err) + }) + } } diff --git a/plugins/outputs/amqp/client.go b/plugins/outputs/amqp/client.go new file mode 100644 index 000000000..ba4e45162 --- /dev/null +++ b/plugins/outputs/amqp/client.go @@ -0,0 +1,134 @@ +package amqp + +import ( + "crypto/tls" + "errors" + "fmt" + "log" + "math/rand" + "net" + "time" + + "github.com/streadway/amqp" +) + +type ClientConfig struct { + brokers []string + exchange string + exchangeType string + exchangePassive bool + exchangeDurable bool + exchangeArguments amqp.Table + headers amqp.Table + deliveryMode uint8 + tlsConfig *tls.Config + timeout time.Duration + auth []amqp.Authentication +} + +type client struct { + conn *amqp.Connection + channel *amqp.Channel + config *ClientConfig +} + +// Connect opens a connection to one of the brokers at random +func Connect(config *ClientConfig) (*client, error) { + client := &client{ + config: config, + } + + p := rand.Perm(len(config.brokers)) + for _, n := range p { + broker := config.brokers[n] + log.Printf("D! Output [amqp] connecting to %q", broker) + conn, err := amqp.DialConfig( + broker, amqp.Config{ + TLSClientConfig: config.tlsConfig, + SASL: config.auth, // if nil, it will be PLAIN taken from url + Dial: func(network, addr string) (net.Conn, error) { + return net.DialTimeout(network, addr, config.timeout) + }, + }) + if err == nil { + client.conn = conn + log.Printf("D! Output [amqp] connected to %q", broker) + break + } + log.Printf("D! Output [amqp] error connecting to %q", broker) + } + + if client.conn == nil { + return nil, errors.New("could not connect to any broker") + } + + channel, err := client.conn.Channel() + if err != nil { + return nil, fmt.Errorf("error opening channel: %v", err) + } + client.channel = channel + + err = client.DeclareExchange() + if err != nil { + return nil, err + } + + return client, nil +} + +func (c *client) DeclareExchange() error { + var err error + if c.config.exchangePassive { + err = c.channel.ExchangeDeclarePassive( + c.config.exchange, + c.config.exchangeType, + c.config.exchangeDurable, + false, // delete when unused + false, // internal + false, // no-wait + c.config.exchangeArguments, + ) + } else { + err = c.channel.ExchangeDeclare( + c.config.exchange, + c.config.exchangeType, + c.config.exchangeDurable, + false, // delete when unused + false, // internal + false, // no-wait + c.config.exchangeArguments, + ) + } + if err != nil { + return fmt.Errorf("error declaring exchange: %v", err) + } + return nil +} + +func (c *client) Publish(key string, body []byte) error { + // Note that since the channel is not in confirm mode, the absence of + // an error does not indicate successful delivery. + return c.channel.Publish( + c.config.exchange, // exchange + key, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + Headers: c.config.headers, + ContentType: "text/plain", + Body: body, + DeliveryMode: c.config.deliveryMode, + }) +} + +func (c *client) Close() error { + if c.conn == nil { + return nil + } + + err := c.conn.Close() + if err != nil && err != amqp.ErrClosed { + return err + } + return nil +}