telegraf/Godeps/_workspace/src/gopkg.in/dancannon/gorethink.v1/cluster.go

412 lines
9.0 KiB
Go

package gorethink
import (
"fmt"
"math"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/Sirupsen/logrus"
"github.com/cenkalti/backoff"
)
// A Cluster represents a connection to a RethinkDB cluster, a cluster is created
// by the Session and should rarely be created manually.
//
// The cluster keeps track of all nodes in the cluster and if requested can listen
// for cluster changes and start tracking a new node if one appears. Currently
// nodes are removed from the pool if they become unhealthy (100 failed queries).
// This should hopefully soon be replaced by a backoff system.
type Cluster struct {
opts *ConnectOpts
mu sync.RWMutex
seeds []Host // Initial host nodes specified by user.
nodes []*Node // Active nodes in cluster.
closed bool
nodeIndex int64
}
// NewCluster creates a new cluster by connecting to the given hosts.
func NewCluster(hosts []Host, opts *ConnectOpts) (*Cluster, error) {
c := &Cluster{
seeds: hosts,
opts: opts,
}
//Check that hosts in the ClusterConfig is not empty
c.connectNodes(c.getSeeds())
if !c.IsConnected() {
return nil, ErrNoConnectionsStarted
}
if opts.DiscoverHosts {
go c.discover()
}
return c, nil
}
// Query executes a ReQL query using the cluster to connect to the database
func (c *Cluster) Query(q Query) (cursor *Cursor, err error) {
node, err := c.GetRandomNode()
if err != nil {
return nil, err
}
return node.Query(q)
}
// Exec executes a ReQL query using the cluster to connect to the database
func (c *Cluster) Exec(q Query) (err error) {
node, err := c.GetRandomNode()
if err != nil {
return err
}
return node.Exec(q)
}
// SetMaxIdleConns sets the maximum number of connections in the idle
// connection pool.
func (c *Cluster) SetMaxIdleConns(n int) {
for _, node := range c.GetNodes() {
node.SetMaxIdleConns(n)
}
}
// SetMaxOpenConns sets the maximum number of open connections to the database.
func (c *Cluster) SetMaxOpenConns(n int) {
for _, node := range c.GetNodes() {
node.SetMaxOpenConns(n)
}
}
// Close closes the cluster
func (c *Cluster) Close(optArgs ...CloseOpts) error {
if c.closed {
return nil
}
for _, node := range c.GetNodes() {
err := node.Close(optArgs...)
if err != nil {
return err
}
}
c.closed = true
return nil
}
// discover attempts to find new nodes in the cluster using the current nodes
func (c *Cluster) discover() {
// Keep retrying with exponential backoff.
b := backoff.NewExponentialBackOff()
// Never finish retrying (max interval is still 60s)
b.MaxElapsedTime = 0
// Keep trying to discover new nodes
for {
backoff.RetryNotify(func() error {
// If no hosts try seeding nodes
if len(c.GetNodes()) == 0 {
c.connectNodes(c.getSeeds())
}
return c.listenForNodeChanges()
}, b, func(err error, wait time.Duration) {
log.Debugf("Error discovering hosts %s, waiting %s", err, wait)
})
}
}
// listenForNodeChanges listens for changes to node status using change feeds.
// This function will block until the query fails
func (c *Cluster) listenForNodeChanges() error {
// Start listening to changes from a random active node
node, err := c.GetRandomNode()
if err != nil {
return err
}
cursor, err := node.Query(newQuery(
DB("rethinkdb").Table("server_status").Changes(),
map[string]interface{}{},
c.opts,
))
if err != nil {
return err
}
// Keep reading node status updates from changefeed
var result struct {
NewVal nodeStatus `gorethink:"new_val"`
OldVal nodeStatus `gorethink:"old_val"`
}
for cursor.Next(&result) {
addr := fmt.Sprintf("%s:%d", result.NewVal.Network.Hostname, result.NewVal.Network.ReqlPort)
addr = strings.ToLower(addr)
switch result.NewVal.Status {
case "connected":
// Connect to node using exponential backoff (give up after waiting 5s)
// to give the node time to start-up.
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = time.Second * 5
backoff.Retry(func() error {
node, err := c.connectNodeWithStatus(result.NewVal)
if err == nil {
if !c.nodeExists(node) {
c.addNode(node)
log.WithFields(logrus.Fields{
"id": node.ID,
"host": node.Host.String(),
}).Debug("Connected to node")
}
}
return err
}, b)
}
}
return cursor.Err()
}
func (c *Cluster) connectNodes(hosts []Host) {
// Add existing nodes to map
nodeSet := map[string]*Node{}
for _, node := range c.GetNodes() {
nodeSet[node.ID] = node
}
// Attempt to connect to each seed host
for _, host := range hosts {
conn, err := NewConnection(host.String(), c.opts)
if err != nil {
log.Warnf("Error creating connection %s", err.Error())
continue
}
defer conn.Close()
_, cursor, err := conn.Query(newQuery(
DB("rethinkdb").Table("server_status"),
map[string]interface{}{},
c.opts,
))
if err != nil {
log.Warnf("Error fetching cluster status %s", err)
continue
}
if c.opts.DiscoverHosts {
var results []nodeStatus
err = cursor.All(&results)
if err != nil {
continue
}
for _, result := range results {
node, err := c.connectNodeWithStatus(result)
if err == nil {
if _, ok := nodeSet[node.ID]; !ok {
log.WithFields(logrus.Fields{
"id": node.ID,
"host": node.Host.String(),
}).Debug("Connected to node")
nodeSet[node.ID] = node
}
}
}
} else {
node, err := c.connectNode(host.String(), []Host{host})
if err == nil {
if _, ok := nodeSet[node.ID]; !ok {
log.WithFields(logrus.Fields{
"id": node.ID,
"host": node.Host.String(),
}).Debug("Connected to node")
nodeSet[node.ID] = node
}
}
}
}
nodes := []*Node{}
for _, node := range nodeSet {
nodes = append(nodes, node)
}
c.setNodes(nodes)
}
func (c *Cluster) connectNodeWithStatus(s nodeStatus) (*Node, error) {
aliases := make([]Host, len(s.Network.CanonicalAddresses))
for i, aliasAddress := range s.Network.CanonicalAddresses {
aliases[i] = NewHost(aliasAddress.Host, int(s.Network.ReqlPort))
}
return c.connectNode(s.ID, aliases)
}
func (c *Cluster) connectNode(id string, aliases []Host) (*Node, error) {
var pool *Pool
var err error
for len(aliases) > 0 {
pool, err = NewPool(aliases[0], c.opts)
if err != nil {
aliases = aliases[1:]
continue
}
err = pool.Ping()
if err != nil {
aliases = aliases[1:]
continue
}
// Ping successful so break out of loop
break
}
if err != nil {
return nil, err
}
if len(aliases) == 0 {
return nil, ErrInvalidNode
}
return newNode(id, aliases, c, pool), nil
}
// IsConnected returns true if cluster has nodes and is not already closed.
func (c *Cluster) IsConnected() bool {
c.mu.RLock()
closed := c.closed
c.mu.RUnlock()
return (len(c.GetNodes()) > 0) && !closed
}
// AddSeeds adds new seed hosts to the cluster.
func (c *Cluster) AddSeeds(hosts []Host) {
c.mu.Lock()
c.seeds = append(c.seeds, hosts...)
c.mu.Unlock()
}
func (c *Cluster) getSeeds() []Host {
c.mu.RLock()
seeds := c.seeds
c.mu.RUnlock()
return seeds
}
// GetRandomNode returns a random node on the cluster
// TODO(dancannon) replace with hostpool
func (c *Cluster) GetRandomNode() (*Node, error) {
if !c.IsConnected() {
return nil, ErrClusterClosed
}
// Must copy array reference for copy on write semantics to work.
nodeArray := c.GetNodes()
length := len(nodeArray)
for i := 0; i < length; i++ {
// Must handle concurrency with other non-tending goroutines, so nodeIndex is consistent.
index := int(math.Abs(float64(c.nextNodeIndex() % int64(length))))
node := nodeArray[index]
if !node.Closed() && node.IsHealthy() {
return node, nil
}
}
return nil, ErrNoConnections
}
// GetNodes returns a list of all nodes in the cluster
func (c *Cluster) GetNodes() []*Node {
c.mu.RLock()
nodes := c.nodes
c.mu.RUnlock()
return nodes
}
// GetHealthyNodes returns a list of all healthy nodes in the cluster
func (c *Cluster) GetHealthyNodes() []*Node {
c.mu.RLock()
nodes := []*Node{}
for _, node := range c.nodes {
if node.IsHealthy() {
nodes = append(nodes, node)
}
}
c.mu.RUnlock()
return nodes
}
func (c *Cluster) nodeExists(search *Node) bool {
for _, node := range c.GetNodes() {
if node.ID == search.ID {
return true
}
}
return false
}
func (c *Cluster) addNode(node *Node) {
c.mu.Lock()
c.nodes = append(c.nodes, node)
c.mu.Unlock()
}
func (c *Cluster) addNodes(nodesToAdd []*Node) {
c.mu.Lock()
c.nodes = append(c.nodes, nodesToAdd...)
c.mu.Unlock()
}
func (c *Cluster) setNodes(nodes []*Node) {
c.mu.Lock()
c.nodes = nodes
c.mu.Unlock()
}
func (c *Cluster) removeNode(nodeID string) {
nodes := c.GetNodes()
nodeArray := make([]*Node, len(nodes)-1)
count := 0
// Add nodes that are not in remove list.
for _, n := range nodes {
if n.ID != nodeID {
nodeArray[count] = n
count++
}
}
// Do sanity check to make sure assumptions are correct.
if count < len(nodeArray) {
// Resize array.
nodeArray2 := make([]*Node, count)
copy(nodeArray2, nodeArray)
nodeArray = nodeArray2
}
c.setNodes(nodeArray)
}
func (c *Cluster) nextNodeIndex() int64 {
return atomic.AddInt64(&c.nodeIndex, 1)
}