New object: ErrChan for concurrent err handling

This commit is contained in:
Cameron Sparr 2016-06-02 12:34:03 +01:00 committed by Vladimir Sagan
parent 80ef807ecf
commit bf4b009a21
6 changed files with 75 additions and 54 deletions

View File

@ -0,0 +1,37 @@
package errchan
import (
"fmt"
"strings"
)
type ErrChan struct {
C chan error
}
// New returns an error channel of max length 'n'
// errors can be sent to the ErrChan.C channel, and will be returned when
// ErrChan.Error() is called.
func New(n int) *ErrChan {
return &ErrChan{
C: make(chan error, n),
}
}
// Error closes the ErrChan.C channel and returns an error if there are any
// non-nil errors, otherwise returns nil.
func (e *ErrChan) Error() error {
close(e.C)
var out string
for err := range e.C {
if err != nil {
out += "[" + err.Error() + "], "
}
}
if out != "" {
return fmt.Errorf("Errors encountered: " + strings.TrimRight(out, ", "))
}
return nil
}

View File

@ -3,6 +3,7 @@ package cloudwatch
import ( import (
"fmt" "fmt"
"strings" "strings"
"sync"
"time" "time"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
@ -12,6 +13,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
internalaws "github.com/influxdata/telegraf/internal/config/aws" internalaws "github.com/influxdata/telegraf/internal/config/aws"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/internal/limiter" "github.com/influxdata/telegraf/internal/limiter"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -166,7 +168,7 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
} }
metricCount := len(metrics) metricCount := len(metrics)
var errChan = make(chan error, metricCount) errChan := errchan.New(metricCount)
now := time.Now() now := time.Now()
@ -175,18 +177,18 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
// http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html // http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
lmtr := limiter.NewRateLimiter(10, time.Second) lmtr := limiter.NewRateLimiter(10, time.Second)
defer lmtr.Stop() defer lmtr.Stop()
var wg sync.WaitGroup
wg.Add(len(metrics))
for _, m := range metrics { for _, m := range metrics {
<-lmtr.C <-lmtr.C
go c.gatherMetric(acc, m, now, errChan) go func(inm *cloudwatch.Metric) {
defer wg.Done()
c.gatherMetric(acc, inm, now, errChan.C)
}(m)
} }
wg.Wait()
for i := 1; i <= metricCount; i++ { return errChan.Error()
err := <-errChan
if err != nil {
return err
}
}
return nil
} }
func init() { func init() {

View File

@ -2,14 +2,13 @@ package elasticsearch
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"net/http" "net/http"
"strings"
"sync" "sync"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
) )
@ -102,7 +101,7 @@ func (e *Elasticsearch) Description() string {
// Gather reads the stats from Elasticsearch and writes it to the // Gather reads the stats from Elasticsearch and writes it to the
// Accumulator. // Accumulator.
func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
errChan := make(chan error, len(e.Servers)) errChan := errchan.New(len(e.Servers))
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(e.Servers)) wg.Add(len(e.Servers))
@ -116,7 +115,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
url = s + statsPath url = s + statsPath
} }
if err := e.gatherNodeStats(url, acc); err != nil { if err := e.gatherNodeStats(url, acc); err != nil {
errChan <- err errChan.C <- err
return return
} }
if e.ClusterHealth { if e.ClusterHealth {
@ -126,17 +125,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
} }
wg.Wait() wg.Wait()
close(errChan) return errChan.Error()
// Get all errors and return them as one giant error
errStrings := []string{}
for err := range errChan {
errStrings = append(errStrings, err.Error())
}
if len(errStrings) == 0 {
return nil
}
return errors.New(strings.Join(errStrings, "\n"))
} }
func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error { func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error {

View File

@ -14,6 +14,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/nagios" "github.com/influxdata/telegraf/plugins/parsers/nagios"
@ -182,23 +183,15 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error {
} }
} }
e.errChan = make(chan error, len(commands)) errChan := errchan.New(len(commands))
e.errChan = errChan.C
e.wg.Add(len(commands)) e.wg.Add(len(commands))
for _, command := range commands { for _, command := range commands {
go e.ProcessCommand(command, acc) go e.ProcessCommand(command, acc)
} }
e.wg.Wait() e.wg.Wait()
return errChan.Error()
select {
default:
close(e.errChan)
return nil
case err := <-e.errChan:
close(e.errChan)
return err
}
} }
func init() { func init() {

View File

@ -3,10 +3,7 @@ package haproxy
import ( import (
"encoding/csv" "encoding/csv"
"fmt" "fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"io" "io"
"log"
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
@ -14,6 +11,10 @@ import (
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs"
) )
//CSV format: https://cbonte.github.io/haproxy-dconv/configuration-1.5.html#9.1 //CSV format: https://cbonte.github.io/haproxy-dconv/configuration-1.5.html#9.1
@ -114,18 +115,17 @@ func (g *haproxy) Gather(acc telegraf.Accumulator) error {
} }
var wg sync.WaitGroup var wg sync.WaitGroup
errChan := errchan.New(len(g.Servers))
wg.Add(len(g.Servers))
for _, server := range g.Servers { for _, server := range g.Servers {
wg.Add(1)
go func(serv string) { go func(serv string) {
defer wg.Done() defer wg.Done()
if err := g.gatherServer(serv, acc); err != nil { errChan.C <- g.gatherServer(serv, acc)
log.Printf("HAProxy error gathering server: %s, %s", serv, err)
}
}(server) }(server)
} }
wg.Wait() wg.Wait()
return nil return errChan.Error()
} }
func (g *haproxy) gatherServerSocket(addr string, acc telegraf.Accumulator) error { func (g *haproxy) gatherServerSocket(addr string, acc telegraf.Accumulator) error {

View File

@ -5,9 +5,11 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"strconv" "strconv"
"sync"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -129,20 +131,18 @@ func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
} }
} }
var errChan = make(chan error, len(gatherFunctions)) var wg sync.WaitGroup
wg.Add(len(gatherFunctions))
errChan := errchan.New(len(gatherFunctions))
for _, f := range gatherFunctions { for _, f := range gatherFunctions {
go f(r, acc, errChan) go func(gf gatherFunc) {
defer wg.Done()
gf(r, acc, errChan.C)
}(f)
} }
wg.Wait()
for i := 1; i <= len(gatherFunctions); i++ { return errChan.Error()
err := <-errChan
if err != nil {
return err
}
}
return nil
} }
func (r *RabbitMQ) requestJSON(u string, target interface{}) error { func (r *RabbitMQ) requestJSON(u string, target interface{}) error {