New object: ErrChan for concurrent err handling

This commit is contained in:
Cameron Sparr
2016-06-02 12:34:03 +01:00
parent 1aabd38eb2
commit 2c448e22e1
6 changed files with 75 additions and 54 deletions

View File

@@ -2,14 +2,13 @@ package elasticsearch
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
)
@@ -102,7 +101,7 @@ func (e *Elasticsearch) Description() string {
// Gather reads the stats from Elasticsearch and writes it to the
// Accumulator.
func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
errChan := make(chan error, len(e.Servers))
errChan := errchan.New(len(e.Servers))
var wg sync.WaitGroup
wg.Add(len(e.Servers))
@@ -116,7 +115,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
url = s + statsPath
}
if err := e.gatherNodeStats(url, acc); err != nil {
errChan <- err
errChan.C <- err
return
}
if e.ClusterHealth {
@@ -126,17 +125,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
}
wg.Wait()
close(errChan)
// Get all errors and return them as one giant error
errStrings := []string{}
for err := range errChan {
errStrings = append(errStrings, err.Error())
}
if len(errStrings) == 0 {
return nil
}
return errors.New(strings.Join(errStrings, "\n"))
return errChan.Error()
}
func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error {