diff --git a/plugins/inputs/elasticsearch/elasticsearch.go b/plugins/inputs/elasticsearch/elasticsearch.go index 9b59537c0..304e0e3d7 100644 --- a/plugins/inputs/elasticsearch/elasticsearch.go +++ b/plugins/inputs/elasticsearch/elasticsearch.go @@ -2,8 +2,11 @@ package elasticsearch import ( "encoding/json" + "errors" "fmt" "net/http" + "strings" + "sync" "time" "github.com/influxdata/telegraf/internal" @@ -93,21 +96,41 @@ func (e *Elasticsearch) Description() string { // Gather reads the stats from Elasticsearch and writes it to the // Accumulator. func (e *Elasticsearch) Gather(acc inputs.Accumulator) error { + errChan := make(chan error, len(e.Servers)) + var wg sync.WaitGroup + wg.Add(len(e.Servers)) + for _, serv := range e.Servers { - var url string - if e.Local { - url = serv + statsPathLocal - } else { - url = serv + statsPath - } - if err := e.gatherNodeStats(url, acc); err != nil { - return err - } - if e.ClusterHealth { - e.gatherClusterStats(fmt.Sprintf("%s/_cluster/health?level=indices", serv), acc) - } + go func(s string, acc inputs.Accumulator) { + defer wg.Done() + var url string + if e.Local { + url = s + statsPathLocal + } else { + url = s + statsPath + } + if err := e.gatherNodeStats(url, acc); err != nil { + errChan <- err + return + } + if e.ClusterHealth { + e.gatherClusterStats(fmt.Sprintf("%s/_cluster/health?level=indices", s), acc) + } + }(serv, acc) } - return nil + + wg.Wait() + close(errChan) + // Get all errors and return them as one giant error + errStrings := []string{} + for err := range errChan { + errStrings = append(errStrings, err.Error()) + } + + if len(errStrings) == 0 { + return nil + } + return errors.New(strings.Join(errStrings, "\n")) } func (e *Elasticsearch) gatherNodeStats(url string, acc inputs.Accumulator) error {