diff --git a/outputs/amqp/amqp.go b/outputs/amqp/amqp.go index 9ca2783f1..1bfc2327b 100644 --- a/outputs/amqp/amqp.go +++ b/outputs/amqp/amqp.go @@ -2,6 +2,8 @@ package amqp import ( "fmt" + "log" + "sync" "time" "github.com/influxdb/influxdb/client" @@ -18,6 +20,7 @@ type AMQP struct { RoutingTag string `toml:"routing_tag"` channel *amqp.Channel + sync.Mutex } var sampleConfig = ` @@ -31,6 +34,8 @@ var sampleConfig = ` ` func (q *AMQP) Connect() error { + q.Lock() + defer q.Unlock() connection, err := amqp.Dial(q.URL) if err != nil { return err @@ -53,6 +58,15 @@ func (q *AMQP) Connect() error { return fmt.Errorf("Failed to declare an exchange: %s", err) } q.channel = channel + go func() { + log.Printf("Closing: %s", <-connection.NotifyClose(make(chan *amqp.Error))) + log.Printf("Trying to reconnect") + for err := q.Connect(); err != nil; err = q.Connect() { + log.Println(err) + time.Sleep(10 * time.Second) + } + + }() return nil } @@ -69,6 +83,8 @@ func (q *AMQP) Description() string { } func (q *AMQP) Write(bp client.BatchPoints) error { + q.Lock() + defer q.Unlock() if len(bp.Points) == 0 { return nil }