From 2cefe2fc2ab9d6e73093e765414c181b45c6677f Mon Sep 17 00:00:00 2001 From: Sergio Jimenez Date: Tue, 2 Feb 2016 02:17:38 +0100 Subject: [PATCH] plugin(mesos): Added goroutines. The plugin will iterate over the Servers slice and create a goroutine for each of them. --- plugins/inputs/mesos/mesos.go | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/plugins/inputs/mesos/mesos.go b/plugins/inputs/mesos/mesos.go index 1607b1b42..ef1b8269d 100644 --- a/plugins/inputs/mesos/mesos.go +++ b/plugins/inputs/mesos/mesos.go @@ -6,6 +6,8 @@ import ( "io/ioutil" "net" "net/http" + "strings" + "sync" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" @@ -196,15 +198,37 @@ func (m *Mesos) Description() string { } func (m *Mesos) Gather(acc telegraf.Accumulator) error { + var wg sync.WaitGroup + var errorChannel chan error + if len(m.Servers) == 0 { - return m.gatherMetrics("localhost:5050", acc) + m.Servers = []string{"localhost:5050"} } + errorChannel = make(chan error, len(m.Servers)*2) + for _, v := range m.Servers { - if err := m.gatherMetrics(v, acc); err != nil { - return err + wg.Add(1) + go func() { + errorChannel <- m.gatherMetrics(v, acc) + wg.Done() + return + }() + } + + wg.Wait() + close(errorChannel) + errorStrings := []string{} + + for err := range errorChannel { + if err != nil { + errorStrings = append(errorStrings, err.Error()) } } + + if len(errorStrings) > 0 { + return errors.New(strings.Join(errorStrings, "\n")) + } return nil }