135 lines
2.9 KiB
Go
135 lines
2.9 KiB
Go
|
package amqp
|
||
|
|
||
|
import (
|
||
|
"crypto/tls"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"log"
|
||
|
"math/rand"
|
||
|
"net"
|
||
|
"time"
|
||
|
|
||
|
"github.com/streadway/amqp"
|
||
|
)
|
||
|
|
||
|
type ClientConfig struct {
|
||
|
brokers []string
|
||
|
exchange string
|
||
|
exchangeType string
|
||
|
exchangePassive bool
|
||
|
exchangeDurable bool
|
||
|
exchangeArguments amqp.Table
|
||
|
headers amqp.Table
|
||
|
deliveryMode uint8
|
||
|
tlsConfig *tls.Config
|
||
|
timeout time.Duration
|
||
|
auth []amqp.Authentication
|
||
|
}
|
||
|
|
||
|
type client struct {
|
||
|
conn *amqp.Connection
|
||
|
channel *amqp.Channel
|
||
|
config *ClientConfig
|
||
|
}
|
||
|
|
||
|
// Connect opens a connection to one of the brokers at random
|
||
|
func Connect(config *ClientConfig) (*client, error) {
|
||
|
client := &client{
|
||
|
config: config,
|
||
|
}
|
||
|
|
||
|
p := rand.Perm(len(config.brokers))
|
||
|
for _, n := range p {
|
||
|
broker := config.brokers[n]
|
||
|
log.Printf("D! Output [amqp] connecting to %q", broker)
|
||
|
conn, err := amqp.DialConfig(
|
||
|
broker, amqp.Config{
|
||
|
TLSClientConfig: config.tlsConfig,
|
||
|
SASL: config.auth, // if nil, it will be PLAIN taken from url
|
||
|
Dial: func(network, addr string) (net.Conn, error) {
|
||
|
return net.DialTimeout(network, addr, config.timeout)
|
||
|
},
|
||
|
})
|
||
|
if err == nil {
|
||
|
client.conn = conn
|
||
|
log.Printf("D! Output [amqp] connected to %q", broker)
|
||
|
break
|
||
|
}
|
||
|
log.Printf("D! Output [amqp] error connecting to %q", broker)
|
||
|
}
|
||
|
|
||
|
if client.conn == nil {
|
||
|
return nil, errors.New("could not connect to any broker")
|
||
|
}
|
||
|
|
||
|
channel, err := client.conn.Channel()
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("error opening channel: %v", err)
|
||
|
}
|
||
|
client.channel = channel
|
||
|
|
||
|
err = client.DeclareExchange()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return client, nil
|
||
|
}
|
||
|
|
||
|
func (c *client) DeclareExchange() error {
|
||
|
var err error
|
||
|
if c.config.exchangePassive {
|
||
|
err = c.channel.ExchangeDeclarePassive(
|
||
|
c.config.exchange,
|
||
|
c.config.exchangeType,
|
||
|
c.config.exchangeDurable,
|
||
|
false, // delete when unused
|
||
|
false, // internal
|
||
|
false, // no-wait
|
||
|
c.config.exchangeArguments,
|
||
|
)
|
||
|
} else {
|
||
|
err = c.channel.ExchangeDeclare(
|
||
|
c.config.exchange,
|
||
|
c.config.exchangeType,
|
||
|
c.config.exchangeDurable,
|
||
|
false, // delete when unused
|
||
|
false, // internal
|
||
|
false, // no-wait
|
||
|
c.config.exchangeArguments,
|
||
|
)
|
||
|
}
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("error declaring exchange: %v", err)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (c *client) Publish(key string, body []byte) error {
|
||
|
// Note that since the channel is not in confirm mode, the absence of
|
||
|
// an error does not indicate successful delivery.
|
||
|
return c.channel.Publish(
|
||
|
c.config.exchange, // exchange
|
||
|
key, // routing key
|
||
|
false, // mandatory
|
||
|
false, // immediate
|
||
|
amqp.Publishing{
|
||
|
Headers: c.config.headers,
|
||
|
ContentType: "text/plain",
|
||
|
Body: body,
|
||
|
DeliveryMode: c.config.deliveryMode,
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func (c *client) Close() error {
|
||
|
if c.conn == nil {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
err := c.conn.Close()
|
||
|
if err != nil && err != amqp.ErrClosed {
|
||
|
return err
|
||
|
}
|
||
|
return nil
|
||
|
}
|