From 82166a36d02e21524c65ef8fcfeb1f0da55bc100 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 19 Jul 2016 14:03:28 +0100 Subject: [PATCH] Fix err race condition and partial failure issues closes #1439 closes #1440 closes #1441 closes #1442 closes #1443 closes #1444 closes #1445 --- CHANGELOG.md | 6 ++++++ plugins/inputs/dns_query/dns_query.go | 14 ++++++++------ plugins/inputs/dovecot/dovecot.go | 20 ++++++++------------ plugins/inputs/memcached/memcached.go | 12 +++++------- plugins/inputs/mongodb/mongodb.go | 10 ++++------ plugins/inputs/mysql/mysql.go | 25 ++++++++++++++----------- plugins/inputs/mysql/mysql_test.go | 1 - plugins/inputs/nginx/nginx.go | 8 ++++---- plugins/inputs/nsq/nsq.go | 9 ++++----- 9 files changed, 53 insertions(+), 52 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ca37b1e7..76263dc69 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ ## v1.0 [unreleased] +### Features + +### Bugfixes + +- [#1519](https://github.com/influxdata/telegraf/pull/1519): Fix error race conditions and partial failures. + ## v1.0 beta 3 [2016-07-18] ### Release Notes diff --git a/plugins/inputs/dns_query/dns_query.go b/plugins/inputs/dns_query/dns_query.go index 2231f2921..1bccc52c0 100644 --- a/plugins/inputs/dns_query/dns_query.go +++ b/plugins/inputs/dns_query/dns_query.go @@ -3,12 +3,14 @@ package dns_query import ( "errors" "fmt" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/inputs" "github.com/miekg/dns" "net" "strconv" "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/errchan" + "github.com/influxdata/telegraf/plugins/inputs" ) type DnsQuery struct { @@ -55,12 +57,12 @@ func (d *DnsQuery) Description() string { } func (d *DnsQuery) Gather(acc telegraf.Accumulator) error { d.setDefaultValues() + + errChan := errchan.New(len(d.Domains) * len(d.Servers)) for _, domain := range d.Domains { for _, server := range d.Servers { dnsQueryTime, err := d.getDnsQueryTime(domain, server) - if err != nil { - return err - } + errChan.C <- err tags := map[string]string{ "server": server, "domain": domain, @@ -72,7 +74,7 @@ func (d *DnsQuery) Gather(acc telegraf.Accumulator) error { } } - return nil + return errChan.Error() } func (d *DnsQuery) setDefaultValues() { diff --git a/plugins/inputs/dovecot/dovecot.go b/plugins/inputs/dovecot/dovecot.go index 0347016d1..56290e759 100644 --- a/plugins/inputs/dovecot/dovecot.go +++ b/plugins/inputs/dovecot/dovecot.go @@ -12,6 +12,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -51,7 +52,6 @@ const defaultPort = "24242" // Reads stats from all configured servers. func (d *Dovecot) Gather(acc telegraf.Accumulator) error { - if !validQuery[d.Type] { return fmt.Errorf("Error: %s is not a valid query type\n", d.Type) @@ -61,31 +61,27 @@ func (d *Dovecot) Gather(acc telegraf.Accumulator) error { d.Servers = append(d.Servers, "127.0.0.1:24242") } - var wg sync.WaitGroup - - var outerr error - if len(d.Filters) <= 0 { d.Filters = append(d.Filters, "") } - for _, serv := range d.Servers { + var wg sync.WaitGroup + errChan := errchan.New(len(d.Servers) * len(d.Filters)) + for _, server := range d.Servers { for _, filter := range d.Filters { wg.Add(1) - go func(serv string, filter string) { + go func(s string, f string) { defer wg.Done() - outerr = d.gatherServer(serv, acc, d.Type, filter) - }(serv, filter) + errChan.C <- d.gatherServer(s, acc, d.Type, f) + }(server, filter) } } wg.Wait() - - return outerr + return errChan.Error() } func (d *Dovecot) gatherServer(addr string, acc telegraf.Accumulator, qtype string, filter string) error { - _, _, err := net.SplitHostPort(addr) if err != nil { return fmt.Errorf("Error: %s on url %s\n", err, addr) diff --git a/plugins/inputs/memcached/memcached.go b/plugins/inputs/memcached/memcached.go index c631a1ed1..5ee538e93 100644 --- a/plugins/inputs/memcached/memcached.go +++ b/plugins/inputs/memcached/memcached.go @@ -9,6 +9,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -73,19 +74,16 @@ func (m *Memcached) Gather(acc telegraf.Accumulator) error { return m.gatherServer(":11211", false, acc) } + errChan := errchan.New(len(m.Servers) + len(m.UnixSockets)) for _, serverAddress := range m.Servers { - if err := m.gatherServer(serverAddress, false, acc); err != nil { - return err - } + errChan.C <- m.gatherServer(serverAddress, false, acc) } for _, unixAddress := range m.UnixSockets { - if err := m.gatherServer(unixAddress, true, acc); err != nil { - return err - } + errChan.C <- m.gatherServer(unixAddress, true, acc) } - return nil + return errChan.Error() } func (m *Memcached) gatherServer( diff --git a/plugins/inputs/mongodb/mongodb.go b/plugins/inputs/mongodb/mongodb.go index 0fdb90f74..a4bdabd96 100644 --- a/plugins/inputs/mongodb/mongodb.go +++ b/plugins/inputs/mongodb/mongodb.go @@ -10,6 +10,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" "gopkg.in/mgo.v2" ) @@ -55,9 +56,7 @@ func (m *MongoDB) Gather(acc telegraf.Accumulator) error { } var wg sync.WaitGroup - - var outerr error - + errChan := errchan.New(len(m.Servers)) for _, serv := range m.Servers { u, err := url.Parse(serv) if err != nil { @@ -73,13 +72,12 @@ func (m *MongoDB) Gather(acc telegraf.Accumulator) error { wg.Add(1) go func(srv *Server) { defer wg.Done() - outerr = m.gatherServer(srv, acc) + errChan.C <- m.gatherServer(srv, acc) }(m.getMongoServer(u)) } wg.Wait() - - return outerr + return errChan.Error() } func (m *MongoDB) getMongoServer(url *url.URL) *Server { diff --git a/plugins/inputs/mysql/mysql.go b/plugins/inputs/mysql/mysql.go index 5011e82b9..10b8c2f75 100644 --- a/plugins/inputs/mysql/mysql.go +++ b/plugins/inputs/mysql/mysql.go @@ -7,10 +7,12 @@ import ( "net/url" "strconv" "strings" + "sync" "time" _ "github.com/go-sql-driver/mysql" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -118,26 +120,27 @@ func (m *Mysql) InitMysql() { func (m *Mysql) Gather(acc telegraf.Accumulator) error { if len(m.Servers) == 0 { - // if we can't get stats in this case, thats fine, don't report - // an error. - m.gatherServer(localhost, acc) - return nil + // default to localhost if nothing specified. + return m.gatherServer(localhost, acc) } - // Initialise additional query intervals if !initDone { m.InitMysql() } + var wg sync.WaitGroup + errChan := errchan.New(len(m.Servers)) // Loop through each server and collect metrics - for _, serv := range m.Servers { - err := m.gatherServer(serv, acc) - if err != nil { - return err - } + for _, server := range m.Servers { + wg.Add(1) + go func(s string) { + defer wg.Done() + errChan.C <- m.gatherServer(s, acc) + }(server) } - return nil + wg.Wait() + return errChan.Error() } type mapping struct { diff --git a/plugins/inputs/mysql/mysql_test.go b/plugins/inputs/mysql/mysql_test.go index 989c21722..3ab9187b5 100644 --- a/plugins/inputs/mysql/mysql_test.go +++ b/plugins/inputs/mysql/mysql_test.go @@ -20,7 +20,6 @@ func TestMysqlDefaultsToLocal(t *testing.T) { } var acc testutil.Accumulator - err := m.Gather(&acc) require.NoError(t, err) diff --git a/plugins/inputs/nginx/nginx.go b/plugins/inputs/nginx/nginx.go index b15b539de..3fe8c04d1 100644 --- a/plugins/inputs/nginx/nginx.go +++ b/plugins/inputs/nginx/nginx.go @@ -12,6 +12,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -34,7 +35,7 @@ func (n *Nginx) Description() string { func (n *Nginx) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup - var outerr error + errChan := errchan.New(len(n.Urls)) for _, u := range n.Urls { addr, err := url.Parse(u) @@ -45,13 +46,12 @@ func (n *Nginx) Gather(acc telegraf.Accumulator) error { wg.Add(1) go func(addr *url.URL) { defer wg.Done() - outerr = n.gatherUrl(addr, acc) + errChan.C <- n.gatherUrl(addr, acc) }(addr) } wg.Wait() - - return outerr + return errChan.Error() } var tr = &http.Transport{ diff --git a/plugins/inputs/nsq/nsq.go b/plugins/inputs/nsq/nsq.go index 35ba76866..8bfd72788 100644 --- a/plugins/inputs/nsq/nsq.go +++ b/plugins/inputs/nsq/nsq.go @@ -32,6 +32,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -65,19 +66,17 @@ func (n *NSQ) Description() string { func (n *NSQ) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup - var outerr error - + errChan := errchan.New(len(n.Endpoints)) for _, e := range n.Endpoints { wg.Add(1) go func(e string) { defer wg.Done() - outerr = n.gatherEndpoint(e, acc) + errChan.C <- n.gatherEndpoint(e, acc) }(e) } wg.Wait() - - return outerr + return errChan.Error() } var tr = &http.Transport{