From 322871ddfa2158705fffa5be72b7f099dbdfb21c Mon Sep 17 00:00:00 2001 From: Pierre Fersing Date: Mon, 29 Feb 2016 17:52:58 +0100 Subject: [PATCH] Improve timeout in input plugins --- plugins/inputs/apache/apache.go | 5 ++- plugins/inputs/couchdb/couchdb.go | 12 ++++++- plugins/inputs/disque/disque.go | 8 ++++- plugins/inputs/dovecot/dovecot.go | 7 +++- plugins/inputs/elasticsearch/elasticsearch.go | 7 +++- plugins/inputs/haproxy/haproxy.go | 7 ++-- plugins/inputs/httpjson/httpjson.go | 7 +++- plugins/inputs/influxdb/influxdb.go | 12 ++++++- plugins/inputs/jolokia/jolokia.go | 8 ++++- plugins/inputs/mailchimp/chimp_api.go | 6 +++- plugins/inputs/mesos/mesos.go | 12 ++++++- plugins/inputs/mysql/mysql.go | 23 ++++++++++++ plugins/inputs/mysql/mysql_test.go | 35 +++++++++++++++++++ plugins/inputs/nginx/nginx.go | 5 ++- plugins/inputs/nsq/nsq.go | 5 ++- plugins/inputs/prometheus/prometheus.go | 12 ++++++- plugins/inputs/rabbitmq/rabbitmq.go | 6 +++- plugins/inputs/raindrops/raindrops.go | 9 +++-- plugins/inputs/redis/redis.go | 8 ++++- plugins/inputs/riak/riak.go | 8 ++++- plugins/inputs/zookeeper/zookeeper.go | 3 ++ 21 files changed, 184 insertions(+), 21 deletions(-) diff --git a/plugins/inputs/apache/apache.go b/plugins/inputs/apache/apache.go index b6e3e50f1..eba5a1188 100644 --- a/plugins/inputs/apache/apache.go +++ b/plugins/inputs/apache/apache.go @@ -58,7 +58,10 @@ var tr = &http.Transport{ ResponseHeaderTimeout: time.Duration(3 * time.Second), } -var client = &http.Client{Transport: tr} +var client = &http.Client{ + Transport: tr, + Timeout: time.Duration(4 * time.Second), +} func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error { resp, err := client.Get(addr.String()) diff --git a/plugins/inputs/couchdb/couchdb.go b/plugins/inputs/couchdb/couchdb.go index ba64e4a6d..bf241649a 100644 --- a/plugins/inputs/couchdb/couchdb.go +++ b/plugins/inputs/couchdb/couchdb.go @@ -10,6 +10,7 @@ import ( "reflect" "strings" "sync" + "time" ) // Schema: @@ -112,9 +113,18 @@ func (c *CouchDB) Gather(accumulator telegraf.Accumulator) error { } +var tr = &http.Transport{ + ResponseHeaderTimeout: time.Duration(3 * time.Second), +} + +var client = &http.Client{ + Transport: tr, + Timeout: time.Duration(4 * time.Second), +} + func (c *CouchDB) fetchAndInsertData(accumulator telegraf.Accumulator, host string) error { - response, error := http.Get(host) + response, error := client.Get(host) if error != nil { return error } diff --git a/plugins/inputs/disque/disque.go b/plugins/inputs/disque/disque.go index a311b6739..822e5924f 100644 --- a/plugins/inputs/disque/disque.go +++ b/plugins/inputs/disque/disque.go @@ -9,6 +9,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" @@ -30,6 +31,8 @@ var sampleConfig = ` servers = ["localhost"] ` +var defaultTimeout = 5 * time.Second + func (r *Disque) SampleConfig() string { return sampleConfig } @@ -107,7 +110,7 @@ func (g *Disque) gatherServer(addr *url.URL, acc telegraf.Accumulator) error { addr.Host = addr.Host + ":" + defaultPort } - c, err := net.Dial("tcp", addr.Host) + c, err := net.DialTimeout("tcp", addr.Host, defaultTimeout) if err != nil { return fmt.Errorf("Unable to connect to disque server '%s': %s", addr.Host, err) } @@ -132,6 +135,9 @@ func (g *Disque) gatherServer(addr *url.URL, acc telegraf.Accumulator) error { g.c = c } + // Extend connection + g.c.SetDeadline(time.Now().Add(defaultTimeout)) + g.c.Write([]byte("info\r\n")) r := bufio.NewReader(g.c) diff --git a/plugins/inputs/dovecot/dovecot.go b/plugins/inputs/dovecot/dovecot.go index 75829f595..3a6607da9 100644 --- a/plugins/inputs/dovecot/dovecot.go +++ b/plugins/inputs/dovecot/dovecot.go @@ -34,6 +34,8 @@ var sampleConfig = ` domains = [] ` +var defaultTimeout = time.Second * time.Duration(5) + func (d *Dovecot) SampleConfig() string { return sampleConfig } const defaultPort = "24242" @@ -74,12 +76,15 @@ func (d *Dovecot) gatherServer(addr string, acc telegraf.Accumulator, doms map[s return fmt.Errorf("Error: %s on url %s\n", err, addr) } - c, err := net.Dial("tcp", addr) + c, err := net.DialTimeout("tcp", addr, defaultTimeout) if err != nil { return fmt.Errorf("Unable to connect to dovecot server '%s': %s", addr, err) } defer c.Close() + // Extend connection + c.SetDeadline(time.Now().Add(defaultTimeout)) + c.Write([]byte("EXPORT\tdomain\n\n")) var buf bytes.Buffer io.Copy(&buf, c) diff --git a/plugins/inputs/elasticsearch/elasticsearch.go b/plugins/inputs/elasticsearch/elasticsearch.go index aae97f4d7..32bd58516 100644 --- a/plugins/inputs/elasticsearch/elasticsearch.go +++ b/plugins/inputs/elasticsearch/elasticsearch.go @@ -81,7 +81,12 @@ type Elasticsearch struct { // NewElasticsearch return a new instance of Elasticsearch func NewElasticsearch() *Elasticsearch { - return &Elasticsearch{client: http.DefaultClient} + tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)} + client := &http.Client{ + Transport: tr, + Timeout: time.Duration(4 * time.Second), + } + return &Elasticsearch{client: client} } // SampleConfig returns sample configuration for this plugin. diff --git a/plugins/inputs/haproxy/haproxy.go b/plugins/inputs/haproxy/haproxy.go index 233cd8481..b1402d8ec 100644 --- a/plugins/inputs/haproxy/haproxy.go +++ b/plugins/inputs/haproxy/haproxy.go @@ -129,8 +129,11 @@ func (g *haproxy) Gather(acc telegraf.Accumulator) error { func (g *haproxy) gatherServer(addr string, acc telegraf.Accumulator) error { if g.client == nil { - - client := &http.Client{} + tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)} + client := &http.Client{ + Transport: tr, + Timeout: time.Duration(4 * time.Second), + } g.client = client } diff --git a/plugins/inputs/httpjson/httpjson.go b/plugins/inputs/httpjson/httpjson.go index c055f66de..c07a9602a 100644 --- a/plugins/inputs/httpjson/httpjson.go +++ b/plugins/inputs/httpjson/httpjson.go @@ -244,6 +244,11 @@ func (h *HttpJson) sendRequest(serverURL string) (string, float64, error) { func init() { inputs.Add("httpjson", func() telegraf.Input { - return &HttpJson{client: RealHTTPClient{client: &http.Client{}}} + tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)} + client := &http.Client{ + Transport: tr, + Timeout: time.Duration(4 * time.Second), + } + return &HttpJson{client: RealHTTPClient{client: client}} }) } diff --git a/plugins/inputs/influxdb/influxdb.go b/plugins/inputs/influxdb/influxdb.go index 63a3c1854..5af9a0731 100644 --- a/plugins/inputs/influxdb/influxdb.go +++ b/plugins/inputs/influxdb/influxdb.go @@ -7,6 +7,7 @@ import ( "net/http" "strings" "sync" + "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" @@ -70,6 +71,15 @@ type point struct { Values map[string]interface{} `json:"values"` } +var tr = &http.Transport{ + ResponseHeaderTimeout: time.Duration(3 * time.Second), +} + +var client = &http.Client{ + Transport: tr, + Timeout: time.Duration(4 * time.Second), +} + // Gathers data from a particular URL // Parameters: // acc : The telegraf Accumulator to use @@ -81,7 +91,7 @@ func (i *InfluxDB) gatherURL( acc telegraf.Accumulator, url string, ) error { - resp, err := http.Get(url) + resp, err := client.Get(url) if err != nil { return err } diff --git a/plugins/inputs/jolokia/jolokia.go b/plugins/inputs/jolokia/jolokia.go index 2e0bba6d5..a65f5ff8f 100644 --- a/plugins/inputs/jolokia/jolokia.go +++ b/plugins/inputs/jolokia/jolokia.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "net/http" "net/url" + "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" @@ -160,6 +161,11 @@ func (j *Jolokia) Gather(acc telegraf.Accumulator) error { func init() { inputs.Add("jolokia", func() telegraf.Input { - return &Jolokia{jClient: &JolokiaClientImpl{client: &http.Client{}}} + tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)} + client := &http.Client{ + Transport: tr, + Timeout: time.Duration(4 * time.Second), + } + return &Jolokia{jClient: &JolokiaClientImpl{client: client}} }) } diff --git a/plugins/inputs/mailchimp/chimp_api.go b/plugins/inputs/mailchimp/chimp_api.go index fe2c56d0c..75c9a30d7 100644 --- a/plugins/inputs/mailchimp/chimp_api.go +++ b/plugins/inputs/mailchimp/chimp_api.go @@ -10,6 +10,7 @@ import ( "net/url" "regexp" "sync" + "time" ) const ( @@ -120,7 +121,10 @@ func (a *ChimpAPI) GetReport(campaignID string) (Report, error) { } func runChimp(api *ChimpAPI, params ReportsParams) ([]byte, error) { - client := &http.Client{Transport: api.Transport} + client := &http.Client{ + Transport: api.Transport, + Timeout: time.Duration(4 * time.Second), + } var b bytes.Buffer req, err := http.NewRequest("GET", api.url.String(), &b) diff --git a/plugins/inputs/mesos/mesos.go b/plugins/inputs/mesos/mesos.go index 5bcda7970..ccb76daae 100644 --- a/plugins/inputs/mesos/mesos.go +++ b/plugins/inputs/mesos/mesos.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" @@ -261,6 +262,15 @@ func (m *Mesos) removeGroup(j *map[string]interface{}) { } } +var tr = &http.Transport{ + ResponseHeaderTimeout: time.Duration(3 * time.Second), +} + +var client = &http.Client{ + Transport: tr, + Timeout: time.Duration(4 * time.Second), +} + // This should not belong to the object func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error { var jsonOut map[string]interface{} @@ -282,7 +292,7 @@ func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error { ts := strconv.Itoa(m.Timeout) + "ms" - resp, err := http.Get("http://" + a + "/metrics/snapshot?timeout=" + ts) + resp, err := client.Get("http://" + a + "/metrics/snapshot?timeout=" + ts) if err != nil { return err diff --git a/plugins/inputs/mysql/mysql.go b/plugins/inputs/mysql/mysql.go index b2e2729a9..cd9e7ae28 100644 --- a/plugins/inputs/mysql/mysql.go +++ b/plugins/inputs/mysql/mysql.go @@ -2,8 +2,10 @@ package mysql import ( "database/sql" + "net/url" "strconv" "strings" + "time" _ "github.com/go-sql-driver/mysql" "github.com/influxdata/telegraf" @@ -26,6 +28,8 @@ var sampleConfig = ` servers = ["tcp(127.0.0.1:3306)/"] ` +var defaultTimeout = time.Second * time.Duration(5) + func (m *Mysql) SampleConfig() string { return sampleConfig } @@ -122,6 +126,10 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error { serv = "" } + serv, err := dsnAddTimeout(serv) + if err != nil { + return err + } db, err := sql.Open("mysql", serv) if err != nil { return err @@ -207,6 +215,21 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error { return nil } +func dsnAddTimeout(dsn string) (string, error) { + u, err := url.Parse(dsn) + if err != nil { + return "", err + } + v := u.Query() + + // Only override timeout if not already defined + if _, ok := v["timeout"]; ok == false { + v.Add("timeout", defaultTimeout.String()) + u.RawQuery = v.Encode() + } + return u.String(), nil +} + func init() { inputs.Add("mysql", func() telegraf.Input { return &Mysql{} diff --git a/plugins/inputs/mysql/mysql_test.go b/plugins/inputs/mysql/mysql_test.go index 855e8ba52..dffc328fa 100644 --- a/plugins/inputs/mysql/mysql_test.go +++ b/plugins/inputs/mysql/mysql_test.go @@ -84,3 +84,38 @@ func TestMysqlParseDSN(t *testing.T) { } } } + +func TestMysqlDNSAddTimeout(t *testing.T) { + tests := []struct { + input string + output string + }{ + { + "", + "?timeout=5s", + }, + { + "127.0.0.1", + "127.0.0.1?timeout=5s", + }, + { + "tcp(192.168.1.1:3306)/", + "tcp(192.168.1.1:3306)/?timeout=5s", + }, + { + "root:passwd@tcp(192.168.1.1:3306)/?tls=false", + "root:passwd@tcp(192.168.1.1:3306)/?timeout=5s&tls=false", + }, + { + "root:passwd@tcp(192.168.1.1:3306)/?tls=false&timeout=10s", + "root:passwd@tcp(192.168.1.1:3306)/?tls=false&timeout=10s", + }, + } + + for _, test := range tests { + output, _ := parseDSN(test.input) + if output != test.output { + t.Errorf("Expected %s, got %s\n", test.output, output) + } + } +} diff --git a/plugins/inputs/nginx/nginx.go b/plugins/inputs/nginx/nginx.go index 3b008fbf3..c13ba39f3 100644 --- a/plugins/inputs/nginx/nginx.go +++ b/plugins/inputs/nginx/nginx.go @@ -58,7 +58,10 @@ var tr = &http.Transport{ ResponseHeaderTimeout: time.Duration(3 * time.Second), } -var client = &http.Client{Transport: tr} +var client = &http.Client{ + Transport: tr, + Timeout: time.Duration(4 * time.Second), +} func (n *Nginx) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error { resp, err := client.Get(addr.String()) diff --git a/plugins/inputs/nsq/nsq.go b/plugins/inputs/nsq/nsq.go index 6b3be66f2..35ba76866 100644 --- a/plugins/inputs/nsq/nsq.go +++ b/plugins/inputs/nsq/nsq.go @@ -84,7 +84,10 @@ var tr = &http.Transport{ ResponseHeaderTimeout: time.Duration(3 * time.Second), } -var client = &http.Client{Transport: tr} +var client = &http.Client{ + Transport: tr, + Timeout: time.Duration(4 * time.Second), +} func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error { u, err := buildURL(e) diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index 188e6b914..5873b27cc 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -10,6 +10,7 @@ import ( "io" "net/http" "sync" + "time" ) type Prometheus struct { @@ -51,8 +52,17 @@ func (g *Prometheus) Gather(acc telegraf.Accumulator) error { return outerr } +var tr = &http.Transport{ + ResponseHeaderTimeout: time.Duration(3 * time.Second), +} + +var client = &http.Client{ + Transport: tr, + Timeout: time.Duration(4 * time.Second), +} + func (g *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error { - resp, err := http.Get(url) + resp, err := client.Get(url) if err != nil { return fmt.Errorf("error making HTTP request to %s: %s", url, err) } diff --git a/plugins/inputs/rabbitmq/rabbitmq.go b/plugins/inputs/rabbitmq/rabbitmq.go index e51d65e15..4d119282d 100644 --- a/plugins/inputs/rabbitmq/rabbitmq.go +++ b/plugins/inputs/rabbitmq/rabbitmq.go @@ -122,7 +122,11 @@ func (r *RabbitMQ) Description() string { func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error { if r.Client == nil { - r.Client = &http.Client{} + tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)} + r.Client = &http.Client{ + Transport: tr, + Timeout: time.Duration(4 * time.Second), + } } var errChan = make(chan error, len(gatherFunctions)) diff --git a/plugins/inputs/raindrops/raindrops.go b/plugins/inputs/raindrops/raindrops.go index fed22b693..6851f5d93 100644 --- a/plugins/inputs/raindrops/raindrops.go +++ b/plugins/inputs/raindrops/raindrops.go @@ -177,8 +177,11 @@ func (r *Raindrops) getTags(addr *url.URL) map[string]string { func init() { inputs.Add("raindrops", func() telegraf.Input { - return &Raindrops{http_client: &http.Client{Transport: &http.Transport{ - ResponseHeaderTimeout: time.Duration(3 * time.Second), - }}} + return &Raindrops{http_client: &http.Client{ + Transport: &http.Transport{ + ResponseHeaderTimeout: time.Duration(3 * time.Second), + }, + Timeout: time.Duration(4 * time.Second), + }} }) } diff --git a/plugins/inputs/redis/redis.go b/plugins/inputs/redis/redis.go index b8862f6bc..859b23a22 100644 --- a/plugins/inputs/redis/redis.go +++ b/plugins/inputs/redis/redis.go @@ -9,6 +9,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" @@ -30,6 +31,8 @@ var sampleConfig = ` servers = ["tcp://localhost:6379"] ` +var defaultTimeout = 5 * time.Second + func (r *Redis) SampleConfig() string { return sampleConfig } @@ -120,12 +123,15 @@ func (r *Redis) gatherServer(addr *url.URL, acc telegraf.Accumulator) error { addr.Host = addr.Host + ":" + defaultPort } - c, err := net.Dial("tcp", addr.Host) + c, err := net.DialTimeout("tcp", addr.Host, defaultTimeout) if err != nil { return fmt.Errorf("Unable to connect to redis server '%s': %s", addr.Host, err) } defer c.Close() + // Extend connection + c.SetDeadline(time.Now().Add(defaultTimeout)) + if addr.User != nil { pwd, set := addr.User.Password() if set && pwd != "" { diff --git a/plugins/inputs/riak/riak.go b/plugins/inputs/riak/riak.go index 6750c75a0..56231176b 100644 --- a/plugins/inputs/riak/riak.go +++ b/plugins/inputs/riak/riak.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "net/url" + "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" @@ -20,7 +21,12 @@ type Riak struct { // NewRiak return a new instance of Riak with a default http client func NewRiak() *Riak { - return &Riak{client: http.DefaultClient} + tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)} + client := &http.Client{ + Transport: tr, + Timeout: time.Duration(4 * time.Second), + } + return &Riak{client: client} } // Type riakStats represents the data that is received from Riak diff --git a/plugins/inputs/zookeeper/zookeeper.go b/plugins/inputs/zookeeper/zookeeper.go index 0f2b2e06f..54defc56f 100644 --- a/plugins/inputs/zookeeper/zookeeper.go +++ b/plugins/inputs/zookeeper/zookeeper.go @@ -67,6 +67,9 @@ func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error } defer c.Close() + // Extend connection + c.SetDeadline(time.Now().Add(defaultTimeout)) + fmt.Fprintf(c, "%s\n", "mntr") rdr := bufio.NewReader(c) scanner := bufio.NewScanner(rdr)