Add option to disconnect after a message limit is reached in amqp output
This commit is contained in:
committed by
Daniel Nelson
parent
5a38c152b6
commit
ddebe1aff6
134
plugins/outputs/amqp/client.go
Normal file
134
plugins/outputs/amqp/client.go
Normal file
@@ -0,0 +1,134 @@
|
||||
package amqp
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
type ClientConfig struct {
|
||||
brokers []string
|
||||
exchange string
|
||||
exchangeType string
|
||||
exchangePassive bool
|
||||
exchangeDurable bool
|
||||
exchangeArguments amqp.Table
|
||||
headers amqp.Table
|
||||
deliveryMode uint8
|
||||
tlsConfig *tls.Config
|
||||
timeout time.Duration
|
||||
auth []amqp.Authentication
|
||||
}
|
||||
|
||||
type client struct {
|
||||
conn *amqp.Connection
|
||||
channel *amqp.Channel
|
||||
config *ClientConfig
|
||||
}
|
||||
|
||||
// Connect opens a connection to one of the brokers at random
|
||||
func Connect(config *ClientConfig) (*client, error) {
|
||||
client := &client{
|
||||
config: config,
|
||||
}
|
||||
|
||||
p := rand.Perm(len(config.brokers))
|
||||
for _, n := range p {
|
||||
broker := config.brokers[n]
|
||||
log.Printf("D! Output [amqp] connecting to %q", broker)
|
||||
conn, err := amqp.DialConfig(
|
||||
broker, amqp.Config{
|
||||
TLSClientConfig: config.tlsConfig,
|
||||
SASL: config.auth, // if nil, it will be PLAIN taken from url
|
||||
Dial: func(network, addr string) (net.Conn, error) {
|
||||
return net.DialTimeout(network, addr, config.timeout)
|
||||
},
|
||||
})
|
||||
if err == nil {
|
||||
client.conn = conn
|
||||
log.Printf("D! Output [amqp] connected to %q", broker)
|
||||
break
|
||||
}
|
||||
log.Printf("D! Output [amqp] error connecting to %q", broker)
|
||||
}
|
||||
|
||||
if client.conn == nil {
|
||||
return nil, errors.New("could not connect to any broker")
|
||||
}
|
||||
|
||||
channel, err := client.conn.Channel()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error opening channel: %v", err)
|
||||
}
|
||||
client.channel = channel
|
||||
|
||||
err = client.DeclareExchange()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func (c *client) DeclareExchange() error {
|
||||
var err error
|
||||
if c.config.exchangePassive {
|
||||
err = c.channel.ExchangeDeclarePassive(
|
||||
c.config.exchange,
|
||||
c.config.exchangeType,
|
||||
c.config.exchangeDurable,
|
||||
false, // delete when unused
|
||||
false, // internal
|
||||
false, // no-wait
|
||||
c.config.exchangeArguments,
|
||||
)
|
||||
} else {
|
||||
err = c.channel.ExchangeDeclare(
|
||||
c.config.exchange,
|
||||
c.config.exchangeType,
|
||||
c.config.exchangeDurable,
|
||||
false, // delete when unused
|
||||
false, // internal
|
||||
false, // no-wait
|
||||
c.config.exchangeArguments,
|
||||
)
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("error declaring exchange: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *client) Publish(key string, body []byte) error {
|
||||
// Note that since the channel is not in confirm mode, the absence of
|
||||
// an error does not indicate successful delivery.
|
||||
return c.channel.Publish(
|
||||
c.config.exchange, // exchange
|
||||
key, // routing key
|
||||
false, // mandatory
|
||||
false, // immediate
|
||||
amqp.Publishing{
|
||||
Headers: c.config.headers,
|
||||
ContentType: "text/plain",
|
||||
Body: body,
|
||||
DeliveryMode: c.config.deliveryMode,
|
||||
})
|
||||
}
|
||||
|
||||
func (c *client) Close() error {
|
||||
if c.conn == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := c.conn.Close()
|
||||
if err != nil && err != amqp.ErrClosed {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user