From bf4b009a217a46ef762ee8e4d5219ce9b007b508 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Thu, 2 Jun 2016 12:34:03 +0100 Subject: [PATCH] New object: ErrChan for concurrent err handling --- internal/errchan/errchan.go | 37 +++++++++++++++++++ plugins/inputs/cloudwatch/cloudwatch.go | 20 +++++----- plugins/inputs/elasticsearch/elasticsearch.go | 19 ++-------- plugins/inputs/exec/exec.go | 15 ++------ plugins/inputs/haproxy/haproxy.go | 16 ++++---- plugins/inputs/rabbitmq/rabbitmq.go | 22 +++++------ 6 files changed, 75 insertions(+), 54 deletions(-) create mode 100644 internal/errchan/errchan.go diff --git a/internal/errchan/errchan.go b/internal/errchan/errchan.go new file mode 100644 index 000000000..467a0f4a7 --- /dev/null +++ b/internal/errchan/errchan.go @@ -0,0 +1,37 @@ +package errchan + +import ( + "fmt" + "strings" +) + +type ErrChan struct { + C chan error +} + +// New returns an error channel of max length 'n' +// errors can be sent to the ErrChan.C channel, and will be returned when +// ErrChan.Error() is called. +func New(n int) *ErrChan { + return &ErrChan{ + C: make(chan error, n), + } +} + +// Error closes the ErrChan.C channel and returns an error if there are any +// non-nil errors, otherwise returns nil. +func (e *ErrChan) Error() error { + close(e.C) + + var out string + for err := range e.C { + if err != nil { + out += "[" + err.Error() + "], " + } + } + + if out != "" { + return fmt.Errorf("Errors encountered: " + strings.TrimRight(out, ", ")) + } + return nil +} diff --git a/plugins/inputs/cloudwatch/cloudwatch.go b/plugins/inputs/cloudwatch/cloudwatch.go index 1bd2d5c07..f3019eb4b 100644 --- a/plugins/inputs/cloudwatch/cloudwatch.go +++ b/plugins/inputs/cloudwatch/cloudwatch.go @@ -3,6 +3,7 @@ package cloudwatch import ( "fmt" "strings" + "sync" "time" "github.com/aws/aws-sdk-go/aws" @@ -12,6 +13,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" internalaws "github.com/influxdata/telegraf/internal/config/aws" + "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/internal/limiter" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -166,7 +168,7 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error { } metricCount := len(metrics) - var errChan = make(chan error, metricCount) + errChan := errchan.New(metricCount) now := time.Now() @@ -175,18 +177,18 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error { // http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html lmtr := limiter.NewRateLimiter(10, time.Second) defer lmtr.Stop() + var wg sync.WaitGroup + wg.Add(len(metrics)) for _, m := range metrics { <-lmtr.C - go c.gatherMetric(acc, m, now, errChan) + go func(inm *cloudwatch.Metric) { + defer wg.Done() + c.gatherMetric(acc, inm, now, errChan.C) + }(m) } + wg.Wait() - for i := 1; i <= metricCount; i++ { - err := <-errChan - if err != nil { - return err - } - } - return nil + return errChan.Error() } func init() { diff --git a/plugins/inputs/elasticsearch/elasticsearch.go b/plugins/inputs/elasticsearch/elasticsearch.go index 32bd58516..3839f6df6 100644 --- a/plugins/inputs/elasticsearch/elasticsearch.go +++ b/plugins/inputs/elasticsearch/elasticsearch.go @@ -2,14 +2,13 @@ package elasticsearch import ( "encoding/json" - "errors" "fmt" "net/http" - "strings" "sync" "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" ) @@ -102,7 +101,7 @@ func (e *Elasticsearch) Description() string { // Gather reads the stats from Elasticsearch and writes it to the // Accumulator. func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { - errChan := make(chan error, len(e.Servers)) + errChan := errchan.New(len(e.Servers)) var wg sync.WaitGroup wg.Add(len(e.Servers)) @@ -116,7 +115,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { url = s + statsPath } if err := e.gatherNodeStats(url, acc); err != nil { - errChan <- err + errChan.C <- err return } if e.ClusterHealth { @@ -126,17 +125,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { } wg.Wait() - close(errChan) - // Get all errors and return them as one giant error - errStrings := []string{} - for err := range errChan { - errStrings = append(errStrings, err.Error()) - } - - if len(errStrings) == 0 { - return nil - } - return errors.New(strings.Join(errStrings, "\n")) + return errChan.Error() } func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error { diff --git a/plugins/inputs/exec/exec.go b/plugins/inputs/exec/exec.go index 1f5f12203..415831960 100644 --- a/plugins/inputs/exec/exec.go +++ b/plugins/inputs/exec/exec.go @@ -14,6 +14,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/nagios" @@ -182,23 +183,15 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error { } } - e.errChan = make(chan error, len(commands)) + errChan := errchan.New(len(commands)) + e.errChan = errChan.C e.wg.Add(len(commands)) for _, command := range commands { go e.ProcessCommand(command, acc) } e.wg.Wait() - - select { - default: - close(e.errChan) - return nil - case err := <-e.errChan: - close(e.errChan) - return err - } - + return errChan.Error() } func init() { diff --git a/plugins/inputs/haproxy/haproxy.go b/plugins/inputs/haproxy/haproxy.go index 396e3c934..0a0b3da82 100644 --- a/plugins/inputs/haproxy/haproxy.go +++ b/plugins/inputs/haproxy/haproxy.go @@ -3,10 +3,7 @@ package haproxy import ( "encoding/csv" "fmt" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/inputs" "io" - "log" "net" "net/http" "net/url" @@ -14,6 +11,10 @@ import ( "strings" "sync" "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/errchan" + "github.com/influxdata/telegraf/plugins/inputs" ) //CSV format: https://cbonte.github.io/haproxy-dconv/configuration-1.5.html#9.1 @@ -114,18 +115,17 @@ func (g *haproxy) Gather(acc telegraf.Accumulator) error { } var wg sync.WaitGroup + errChan := errchan.New(len(g.Servers)) + wg.Add(len(g.Servers)) for _, server := range g.Servers { - wg.Add(1) go func(serv string) { defer wg.Done() - if err := g.gatherServer(serv, acc); err != nil { - log.Printf("HAProxy error gathering server: %s, %s", serv, err) - } + errChan.C <- g.gatherServer(serv, acc) }(server) } wg.Wait() - return nil + return errChan.Error() } func (g *haproxy) gatherServerSocket(addr string, acc telegraf.Accumulator) error { diff --git a/plugins/inputs/rabbitmq/rabbitmq.go b/plugins/inputs/rabbitmq/rabbitmq.go index bf6859002..18d666a08 100644 --- a/plugins/inputs/rabbitmq/rabbitmq.go +++ b/plugins/inputs/rabbitmq/rabbitmq.go @@ -5,9 +5,11 @@ import ( "fmt" "net/http" "strconv" + "sync" "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -129,20 +131,18 @@ func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error { } } - var errChan = make(chan error, len(gatherFunctions)) - + var wg sync.WaitGroup + wg.Add(len(gatherFunctions)) + errChan := errchan.New(len(gatherFunctions)) for _, f := range gatherFunctions { - go f(r, acc, errChan) + go func(gf gatherFunc) { + defer wg.Done() + gf(r, acc, errChan.C) + }(f) } + wg.Wait() - for i := 1; i <= len(gatherFunctions); i++ { - err := <-errChan - if err != nil { - return err - } - } - - return nil + return errChan.Error() } func (r *RabbitMQ) requestJSON(u string, target interface{}) error {