Gather elasticsearch nodes in goroutines, handle errors

fixes #464
This commit is contained in:
Cameron Sparr 2016-01-21 16:52:49 -07:00
parent e910a03af4
commit f2ab5f61f5
1 changed files with 36 additions and 13 deletions

View File

@ -2,8 +2,11 @@ package elasticsearch
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"net/http" "net/http"
"strings"
"sync"
"time" "time"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
@ -93,21 +96,41 @@ func (e *Elasticsearch) Description() string {
// Gather reads the stats from Elasticsearch and writes it to the // Gather reads the stats from Elasticsearch and writes it to the
// Accumulator. // Accumulator.
func (e *Elasticsearch) Gather(acc inputs.Accumulator) error { func (e *Elasticsearch) Gather(acc inputs.Accumulator) error {
errChan := make(chan error, len(e.Servers))
var wg sync.WaitGroup
wg.Add(len(e.Servers))
for _, serv := range e.Servers { for _, serv := range e.Servers {
var url string go func(s string, acc inputs.Accumulator) {
if e.Local { defer wg.Done()
url = serv + statsPathLocal var url string
} else { if e.Local {
url = serv + statsPath url = s + statsPathLocal
} } else {
if err := e.gatherNodeStats(url, acc); err != nil { url = s + statsPath
return err }
} if err := e.gatherNodeStats(url, acc); err != nil {
if e.ClusterHealth { errChan <- err
e.gatherClusterStats(fmt.Sprintf("%s/_cluster/health?level=indices", serv), acc) return
} }
if e.ClusterHealth {
e.gatherClusterStats(fmt.Sprintf("%s/_cluster/health?level=indices", s), acc)
}
}(serv, acc)
} }
return nil
wg.Wait()
close(errChan)
// Get all errors and return them as one giant error
errStrings := []string{}
for err := range errChan {
errStrings = append(errStrings, err.Error())
}
if len(errStrings) == 0 {
return nil
}
return errors.New(strings.Join(errStrings, "\n"))
} }
func (e *Elasticsearch) gatherNodeStats(url string, acc inputs.Accumulator) error { func (e *Elasticsearch) gatherNodeStats(url string, acc inputs.Accumulator) error {