Fix err race condition and partial failure issues

closes #1439
closes #1440
closes #1441
closes #1442
closes #1443
closes #1444
closes #1445
This commit is contained in:
Cameron Sparr 2016-07-19 14:03:28 +01:00
parent cbf5a55c7d
commit 82166a36d0
9 changed files with 53 additions and 52 deletions

View File

@ -1,5 +1,11 @@
## v1.0 [unreleased] ## v1.0 [unreleased]
### Features
### Bugfixes
- [#1519](https://github.com/influxdata/telegraf/pull/1519): Fix error race conditions and partial failures.
## v1.0 beta 3 [2016-07-18] ## v1.0 beta 3 [2016-07-18]
### Release Notes ### Release Notes

View File

@ -3,12 +3,14 @@ package dns_query
import ( import (
"errors" "errors"
"fmt" "fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/miekg/dns" "github.com/miekg/dns"
"net" "net"
"strconv" "strconv"
"time" "time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs"
) )
type DnsQuery struct { type DnsQuery struct {
@ -55,12 +57,12 @@ func (d *DnsQuery) Description() string {
} }
func (d *DnsQuery) Gather(acc telegraf.Accumulator) error { func (d *DnsQuery) Gather(acc telegraf.Accumulator) error {
d.setDefaultValues() d.setDefaultValues()
errChan := errchan.New(len(d.Domains) * len(d.Servers))
for _, domain := range d.Domains { for _, domain := range d.Domains {
for _, server := range d.Servers { for _, server := range d.Servers {
dnsQueryTime, err := d.getDnsQueryTime(domain, server) dnsQueryTime, err := d.getDnsQueryTime(domain, server)
if err != nil { errChan.C <- err
return err
}
tags := map[string]string{ tags := map[string]string{
"server": server, "server": server,
"domain": domain, "domain": domain,
@ -72,7 +74,7 @@ func (d *DnsQuery) Gather(acc telegraf.Accumulator) error {
} }
} }
return nil return errChan.Error()
} }
func (d *DnsQuery) setDefaultValues() { func (d *DnsQuery) setDefaultValues() {

View File

@ -12,6 +12,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -51,7 +52,6 @@ const defaultPort = "24242"
// Reads stats from all configured servers. // Reads stats from all configured servers.
func (d *Dovecot) Gather(acc telegraf.Accumulator) error { func (d *Dovecot) Gather(acc telegraf.Accumulator) error {
if !validQuery[d.Type] { if !validQuery[d.Type] {
return fmt.Errorf("Error: %s is not a valid query type\n", return fmt.Errorf("Error: %s is not a valid query type\n",
d.Type) d.Type)
@ -61,31 +61,27 @@ func (d *Dovecot) Gather(acc telegraf.Accumulator) error {
d.Servers = append(d.Servers, "127.0.0.1:24242") d.Servers = append(d.Servers, "127.0.0.1:24242")
} }
var wg sync.WaitGroup
var outerr error
if len(d.Filters) <= 0 { if len(d.Filters) <= 0 {
d.Filters = append(d.Filters, "") d.Filters = append(d.Filters, "")
} }
for _, serv := range d.Servers { var wg sync.WaitGroup
errChan := errchan.New(len(d.Servers) * len(d.Filters))
for _, server := range d.Servers {
for _, filter := range d.Filters { for _, filter := range d.Filters {
wg.Add(1) wg.Add(1)
go func(serv string, filter string) { go func(s string, f string) {
defer wg.Done() defer wg.Done()
outerr = d.gatherServer(serv, acc, d.Type, filter) errChan.C <- d.gatherServer(s, acc, d.Type, f)
}(serv, filter) }(server, filter)
} }
} }
wg.Wait() wg.Wait()
return errChan.Error()
return outerr
} }
func (d *Dovecot) gatherServer(addr string, acc telegraf.Accumulator, qtype string, filter string) error { func (d *Dovecot) gatherServer(addr string, acc telegraf.Accumulator, qtype string, filter string) error {
_, _, err := net.SplitHostPort(addr) _, _, err := net.SplitHostPort(addr)
if err != nil { if err != nil {
return fmt.Errorf("Error: %s on url %s\n", err, addr) return fmt.Errorf("Error: %s on url %s\n", err, addr)

View File

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -73,19 +74,16 @@ func (m *Memcached) Gather(acc telegraf.Accumulator) error {
return m.gatherServer(":11211", false, acc) return m.gatherServer(":11211", false, acc)
} }
errChan := errchan.New(len(m.Servers) + len(m.UnixSockets))
for _, serverAddress := range m.Servers { for _, serverAddress := range m.Servers {
if err := m.gatherServer(serverAddress, false, acc); err != nil { errChan.C <- m.gatherServer(serverAddress, false, acc)
return err
}
} }
for _, unixAddress := range m.UnixSockets { for _, unixAddress := range m.UnixSockets {
if err := m.gatherServer(unixAddress, true, acc); err != nil { errChan.C <- m.gatherServer(unixAddress, true, acc)
return err
}
} }
return nil return errChan.Error()
} }
func (m *Memcached) gatherServer( func (m *Memcached) gatherServer(

View File

@ -10,6 +10,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"gopkg.in/mgo.v2" "gopkg.in/mgo.v2"
) )
@ -55,9 +56,7 @@ func (m *MongoDB) Gather(acc telegraf.Accumulator) error {
} }
var wg sync.WaitGroup var wg sync.WaitGroup
errChan := errchan.New(len(m.Servers))
var outerr error
for _, serv := range m.Servers { for _, serv := range m.Servers {
u, err := url.Parse(serv) u, err := url.Parse(serv)
if err != nil { if err != nil {
@ -73,13 +72,12 @@ func (m *MongoDB) Gather(acc telegraf.Accumulator) error {
wg.Add(1) wg.Add(1)
go func(srv *Server) { go func(srv *Server) {
defer wg.Done() defer wg.Done()
outerr = m.gatherServer(srv, acc) errChan.C <- m.gatherServer(srv, acc)
}(m.getMongoServer(u)) }(m.getMongoServer(u))
} }
wg.Wait() wg.Wait()
return errChan.Error()
return outerr
} }
func (m *MongoDB) getMongoServer(url *url.URL) *Server { func (m *MongoDB) getMongoServer(url *url.URL) *Server {

View File

@ -7,10 +7,12 @@ import (
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -118,26 +120,27 @@ func (m *Mysql) InitMysql() {
func (m *Mysql) Gather(acc telegraf.Accumulator) error { func (m *Mysql) Gather(acc telegraf.Accumulator) error {
if len(m.Servers) == 0 { if len(m.Servers) == 0 {
// if we can't get stats in this case, thats fine, don't report // default to localhost if nothing specified.
// an error. return m.gatherServer(localhost, acc)
m.gatherServer(localhost, acc)
return nil
} }
// Initialise additional query intervals // Initialise additional query intervals
if !initDone { if !initDone {
m.InitMysql() m.InitMysql()
} }
var wg sync.WaitGroup
errChan := errchan.New(len(m.Servers))
// Loop through each server and collect metrics // Loop through each server and collect metrics
for _, serv := range m.Servers { for _, server := range m.Servers {
err := m.gatherServer(serv, acc) wg.Add(1)
if err != nil { go func(s string) {
return err defer wg.Done()
} errChan.C <- m.gatherServer(s, acc)
}(server)
} }
return nil wg.Wait()
return errChan.Error()
} }
type mapping struct { type mapping struct {

View File

@ -20,7 +20,6 @@ func TestMysqlDefaultsToLocal(t *testing.T) {
} }
var acc testutil.Accumulator var acc testutil.Accumulator
err := m.Gather(&acc) err := m.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)

View File

@ -12,6 +12,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -34,7 +35,7 @@ func (n *Nginx) Description() string {
func (n *Nginx) Gather(acc telegraf.Accumulator) error { func (n *Nginx) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup var wg sync.WaitGroup
var outerr error errChan := errchan.New(len(n.Urls))
for _, u := range n.Urls { for _, u := range n.Urls {
addr, err := url.Parse(u) addr, err := url.Parse(u)
@ -45,13 +46,12 @@ func (n *Nginx) Gather(acc telegraf.Accumulator) error {
wg.Add(1) wg.Add(1)
go func(addr *url.URL) { go func(addr *url.URL) {
defer wg.Done() defer wg.Done()
outerr = n.gatherUrl(addr, acc) errChan.C <- n.gatherUrl(addr, acc)
}(addr) }(addr)
} }
wg.Wait() wg.Wait()
return errChan.Error()
return outerr
} }
var tr = &http.Transport{ var tr = &http.Transport{

View File

@ -32,6 +32,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -65,19 +66,17 @@ func (n *NSQ) Description() string {
func (n *NSQ) Gather(acc telegraf.Accumulator) error { func (n *NSQ) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup var wg sync.WaitGroup
var outerr error errChan := errchan.New(len(n.Endpoints))
for _, e := range n.Endpoints { for _, e := range n.Endpoints {
wg.Add(1) wg.Add(1)
go func(e string) { go func(e string) {
defer wg.Done() defer wg.Done()
outerr = n.gatherEndpoint(e, acc) errChan.C <- n.gatherEndpoint(e, acc)
}(e) }(e)
} }
wg.Wait() wg.Wait()
return errChan.Error()
return outerr
} }
var tr = &http.Transport{ var tr = &http.Transport{