Improve timeout in input plugins

This commit is contained in:
Pierre Fersing 2016-02-29 17:52:58 +01:00 committed by Michele Fadda
parent 7bb8f99f54
commit 1bd4e8f0f1
21 changed files with 184 additions and 21 deletions

View File

@ -58,7 +58,10 @@ var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}
var client = &http.Client{Transport: tr}
var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
resp, err := client.Get(addr.String())

View File

@ -10,6 +10,7 @@ import (
"reflect"
"strings"
"sync"
"time"
)
// Schema:
@ -112,9 +113,18 @@ func (c *CouchDB) Gather(accumulator telegraf.Accumulator) error {
}
var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}
var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
func (c *CouchDB) fetchAndInsertData(accumulator telegraf.Accumulator, host string) error {
response, error := http.Get(host)
response, error := client.Get(host)
if error != nil {
return error
}

View File

@ -9,6 +9,7 @@ import (
"strconv"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
@ -30,6 +31,8 @@ var sampleConfig = `
servers = ["localhost"]
`
var defaultTimeout = 5 * time.Second
func (r *Disque) SampleConfig() string {
return sampleConfig
}
@ -107,7 +110,7 @@ func (g *Disque) gatherServer(addr *url.URL, acc telegraf.Accumulator) error {
addr.Host = addr.Host + ":" + defaultPort
}
c, err := net.Dial("tcp", addr.Host)
c, err := net.DialTimeout("tcp", addr.Host, defaultTimeout)
if err != nil {
return fmt.Errorf("Unable to connect to disque server '%s': %s", addr.Host, err)
}
@ -132,6 +135,9 @@ func (g *Disque) gatherServer(addr *url.URL, acc telegraf.Accumulator) error {
g.c = c
}
// Extend connection
g.c.SetDeadline(time.Now().Add(defaultTimeout))
g.c.Write([]byte("info\r\n"))
r := bufio.NewReader(g.c)

View File

@ -34,6 +34,8 @@ var sampleConfig = `
domains = []
`
var defaultTimeout = time.Second * time.Duration(5)
func (d *Dovecot) SampleConfig() string { return sampleConfig }
const defaultPort = "24242"
@ -74,12 +76,15 @@ func (d *Dovecot) gatherServer(addr string, acc telegraf.Accumulator, doms map[s
return fmt.Errorf("Error: %s on url %s\n", err, addr)
}
c, err := net.Dial("tcp", addr)
c, err := net.DialTimeout("tcp", addr, defaultTimeout)
if err != nil {
return fmt.Errorf("Unable to connect to dovecot server '%s': %s", addr, err)
}
defer c.Close()
// Extend connection
c.SetDeadline(time.Now().Add(defaultTimeout))
c.Write([]byte("EXPORT\tdomain\n\n"))
var buf bytes.Buffer
io.Copy(&buf, c)

View File

@ -81,7 +81,12 @@ type Elasticsearch struct {
// NewElasticsearch return a new instance of Elasticsearch
func NewElasticsearch() *Elasticsearch {
return &Elasticsearch{client: http.DefaultClient}
tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)}
client := &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
return &Elasticsearch{client: client}
}
// SampleConfig returns sample configuration for this plugin.

View File

@ -129,8 +129,11 @@ func (g *haproxy) Gather(acc telegraf.Accumulator) error {
func (g *haproxy) gatherServer(addr string, acc telegraf.Accumulator) error {
if g.client == nil {
client := &http.Client{}
tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)}
client := &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
g.client = client
}

View File

@ -244,6 +244,11 @@ func (h *HttpJson) sendRequest(serverURL string) (string, float64, error) {
func init() {
inputs.Add("httpjson", func() telegraf.Input {
return &HttpJson{client: RealHTTPClient{client: &http.Client{}}}
tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)}
client := &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
return &HttpJson{client: RealHTTPClient{client: client}}
})
}

View File

@ -7,6 +7,7 @@ import (
"net/http"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
@ -70,6 +71,15 @@ type point struct {
Values map[string]interface{} `json:"values"`
}
var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}
var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
// Gathers data from a particular URL
// Parameters:
// acc : The telegraf Accumulator to use
@ -81,7 +91,7 @@ func (i *InfluxDB) gatherURL(
acc telegraf.Accumulator,
url string,
) error {
resp, err := http.Get(url)
resp, err := client.Get(url)
if err != nil {
return err
}

View File

@ -7,6 +7,7 @@ import (
"io/ioutil"
"net/http"
"net/url"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
@ -160,6 +161,11 @@ func (j *Jolokia) Gather(acc telegraf.Accumulator) error {
func init() {
inputs.Add("jolokia", func() telegraf.Input {
return &Jolokia{jClient: &JolokiaClientImpl{client: &http.Client{}}}
tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)}
client := &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
return &Jolokia{jClient: &JolokiaClientImpl{client: client}}
})
}

View File

@ -10,6 +10,7 @@ import (
"net/url"
"regexp"
"sync"
"time"
)
const (
@ -120,7 +121,10 @@ func (a *ChimpAPI) GetReport(campaignID string) (Report, error) {
}
func runChimp(api *ChimpAPI, params ReportsParams) ([]byte, error) {
client := &http.Client{Transport: api.Transport}
client := &http.Client{
Transport: api.Transport,
Timeout: time.Duration(4 * time.Second),
}
var b bytes.Buffer
req, err := http.NewRequest("GET", api.url.String(), &b)

View File

@ -10,6 +10,7 @@ import (
"strconv"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
@ -261,6 +262,15 @@ func (m *Mesos) removeGroup(j *map[string]interface{}) {
}
}
var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}
var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
// This should not belong to the object
func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error {
var jsonOut map[string]interface{}
@ -282,7 +292,7 @@ func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error {
ts := strconv.Itoa(m.Timeout) + "ms"
resp, err := http.Get("http://" + a + "/metrics/snapshot?timeout=" + ts)
resp, err := client.Get("http://" + a + "/metrics/snapshot?timeout=" + ts)
if err != nil {
return err

View File

@ -2,8 +2,10 @@ package mysql
import (
"database/sql"
"net/url"
"strconv"
"strings"
"time"
_ "github.com/go-sql-driver/mysql"
"github.com/influxdata/telegraf"
@ -26,6 +28,8 @@ var sampleConfig = `
servers = ["tcp(127.0.0.1:3306)/"]
`
var defaultTimeout = time.Second * time.Duration(5)
func (m *Mysql) SampleConfig() string {
return sampleConfig
}
@ -122,6 +126,10 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error {
serv = ""
}
serv, err := dsnAddTimeout(serv)
if err != nil {
return err
}
db, err := sql.Open("mysql", serv)
if err != nil {
return err
@ -207,6 +215,21 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error {
return nil
}
func dsnAddTimeout(dsn string) (string, error) {
u, err := url.Parse(dsn)
if err != nil {
return "", err
}
v := u.Query()
// Only override timeout if not already defined
if _, ok := v["timeout"]; ok == false {
v.Add("timeout", defaultTimeout.String())
u.RawQuery = v.Encode()
}
return u.String(), nil
}
func init() {
inputs.Add("mysql", func() telegraf.Input {
return &Mysql{}

View File

@ -84,3 +84,38 @@ func TestMysqlParseDSN(t *testing.T) {
}
}
}
func TestMysqlDNSAddTimeout(t *testing.T) {
tests := []struct {
input string
output string
}{
{
"",
"?timeout=5s",
},
{
"127.0.0.1",
"127.0.0.1?timeout=5s",
},
{
"tcp(192.168.1.1:3306)/",
"tcp(192.168.1.1:3306)/?timeout=5s",
},
{
"root:passwd@tcp(192.168.1.1:3306)/?tls=false",
"root:passwd@tcp(192.168.1.1:3306)/?timeout=5s&tls=false",
},
{
"root:passwd@tcp(192.168.1.1:3306)/?tls=false&timeout=10s",
"root:passwd@tcp(192.168.1.1:3306)/?tls=false&timeout=10s",
},
}
for _, test := range tests {
output, _ := parseDSN(test.input)
if output != test.output {
t.Errorf("Expected %s, got %s\n", test.output, output)
}
}
}

View File

@ -58,7 +58,10 @@ var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}
var client = &http.Client{Transport: tr}
var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
func (n *Nginx) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
resp, err := client.Get(addr.String())

View File

@ -84,7 +84,10 @@ var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}
var client = &http.Client{Transport: tr}
var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error {
u, err := buildURL(e)

View File

@ -10,6 +10,7 @@ import (
"io"
"net/http"
"sync"
"time"
)
type Prometheus struct {
@ -51,8 +52,17 @@ func (g *Prometheus) Gather(acc telegraf.Accumulator) error {
return outerr
}
var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}
var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
func (g *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
resp, err := http.Get(url)
resp, err := client.Get(url)
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
}

View File

@ -122,7 +122,11 @@ func (r *RabbitMQ) Description() string {
func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
if r.Client == nil {
r.Client = &http.Client{}
tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)}
r.Client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
}
var errChan = make(chan error, len(gatherFunctions))

View File

@ -177,8 +177,11 @@ func (r *Raindrops) getTags(addr *url.URL) map[string]string {
func init() {
inputs.Add("raindrops", func() telegraf.Input {
return &Raindrops{http_client: &http.Client{Transport: &http.Transport{
return &Raindrops{http_client: &http.Client{
Transport: &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}}}
},
Timeout: time.Duration(4 * time.Second),
}}
})
}

View File

@ -9,6 +9,7 @@ import (
"strconv"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
@ -30,6 +31,8 @@ var sampleConfig = `
servers = ["tcp://localhost:6379"]
`
var defaultTimeout = 5 * time.Second
func (r *Redis) SampleConfig() string {
return sampleConfig
}
@ -120,12 +123,15 @@ func (r *Redis) gatherServer(addr *url.URL, acc telegraf.Accumulator) error {
addr.Host = addr.Host + ":" + defaultPort
}
c, err := net.Dial("tcp", addr.Host)
c, err := net.DialTimeout("tcp", addr.Host, defaultTimeout)
if err != nil {
return fmt.Errorf("Unable to connect to redis server '%s': %s", addr.Host, err)
}
defer c.Close()
// Extend connection
c.SetDeadline(time.Now().Add(defaultTimeout))
if addr.User != nil {
pwd, set := addr.User.Password()
if set && pwd != "" {

View File

@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
"net/url"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
@ -20,7 +21,12 @@ type Riak struct {
// NewRiak return a new instance of Riak with a default http client
func NewRiak() *Riak {
return &Riak{client: http.DefaultClient}
tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)}
client := &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
return &Riak{client: client}
}
// Type riakStats represents the data that is received from Riak

View File

@ -67,6 +67,9 @@ func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error
}
defer c.Close()
// Extend connection
c.SetDeadline(time.Now().Add(defaultTimeout))
fmt.Fprintf(c, "%s\n", "mntr")
rdr := bufio.NewReader(c)
scanner := bufio.NewScanner(rdr)