Improve timeout in input plugins
This commit is contained in:
parent
43b7ce4f6d
commit
322871ddfa
|
@ -58,7 +58,10 @@ var tr = &http.Transport{
|
||||||
ResponseHeaderTimeout: time.Duration(3 * time.Second),
|
ResponseHeaderTimeout: time.Duration(3 * time.Second),
|
||||||
}
|
}
|
||||||
|
|
||||||
var client = &http.Client{Transport: tr}
|
var client = &http.Client{
|
||||||
|
Transport: tr,
|
||||||
|
Timeout: time.Duration(4 * time.Second),
|
||||||
|
}
|
||||||
|
|
||||||
func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
|
func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
|
||||||
resp, err := client.Get(addr.String())
|
resp, err := client.Get(addr.String())
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Schema:
|
// Schema:
|
||||||
|
@ -112,9 +113,18 @@ func (c *CouchDB) Gather(accumulator telegraf.Accumulator) error {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var tr = &http.Transport{
|
||||||
|
ResponseHeaderTimeout: time.Duration(3 * time.Second),
|
||||||
|
}
|
||||||
|
|
||||||
|
var client = &http.Client{
|
||||||
|
Transport: tr,
|
||||||
|
Timeout: time.Duration(4 * time.Second),
|
||||||
|
}
|
||||||
|
|
||||||
func (c *CouchDB) fetchAndInsertData(accumulator telegraf.Accumulator, host string) error {
|
func (c *CouchDB) fetchAndInsertData(accumulator telegraf.Accumulator, host string) error {
|
||||||
|
|
||||||
response, error := http.Get(host)
|
response, error := client.Get(host)
|
||||||
if error != nil {
|
if error != nil {
|
||||||
return error
|
return error
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
@ -30,6 +31,8 @@ var sampleConfig = `
|
||||||
servers = ["localhost"]
|
servers = ["localhost"]
|
||||||
`
|
`
|
||||||
|
|
||||||
|
var defaultTimeout = 5 * time.Second
|
||||||
|
|
||||||
func (r *Disque) SampleConfig() string {
|
func (r *Disque) SampleConfig() string {
|
||||||
return sampleConfig
|
return sampleConfig
|
||||||
}
|
}
|
||||||
|
@ -107,7 +110,7 @@ func (g *Disque) gatherServer(addr *url.URL, acc telegraf.Accumulator) error {
|
||||||
addr.Host = addr.Host + ":" + defaultPort
|
addr.Host = addr.Host + ":" + defaultPort
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := net.Dial("tcp", addr.Host)
|
c, err := net.DialTimeout("tcp", addr.Host, defaultTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Unable to connect to disque server '%s': %s", addr.Host, err)
|
return fmt.Errorf("Unable to connect to disque server '%s': %s", addr.Host, err)
|
||||||
}
|
}
|
||||||
|
@ -132,6 +135,9 @@ func (g *Disque) gatherServer(addr *url.URL, acc telegraf.Accumulator) error {
|
||||||
g.c = c
|
g.c = c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Extend connection
|
||||||
|
g.c.SetDeadline(time.Now().Add(defaultTimeout))
|
||||||
|
|
||||||
g.c.Write([]byte("info\r\n"))
|
g.c.Write([]byte("info\r\n"))
|
||||||
|
|
||||||
r := bufio.NewReader(g.c)
|
r := bufio.NewReader(g.c)
|
||||||
|
|
|
@ -34,6 +34,8 @@ var sampleConfig = `
|
||||||
domains = []
|
domains = []
|
||||||
`
|
`
|
||||||
|
|
||||||
|
var defaultTimeout = time.Second * time.Duration(5)
|
||||||
|
|
||||||
func (d *Dovecot) SampleConfig() string { return sampleConfig }
|
func (d *Dovecot) SampleConfig() string { return sampleConfig }
|
||||||
|
|
||||||
const defaultPort = "24242"
|
const defaultPort = "24242"
|
||||||
|
@ -74,12 +76,15 @@ func (d *Dovecot) gatherServer(addr string, acc telegraf.Accumulator, doms map[s
|
||||||
return fmt.Errorf("Error: %s on url %s\n", err, addr)
|
return fmt.Errorf("Error: %s on url %s\n", err, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := net.Dial("tcp", addr)
|
c, err := net.DialTimeout("tcp", addr, defaultTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Unable to connect to dovecot server '%s': %s", addr, err)
|
return fmt.Errorf("Unable to connect to dovecot server '%s': %s", addr, err)
|
||||||
}
|
}
|
||||||
defer c.Close()
|
defer c.Close()
|
||||||
|
|
||||||
|
// Extend connection
|
||||||
|
c.SetDeadline(time.Now().Add(defaultTimeout))
|
||||||
|
|
||||||
c.Write([]byte("EXPORT\tdomain\n\n"))
|
c.Write([]byte("EXPORT\tdomain\n\n"))
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
io.Copy(&buf, c)
|
io.Copy(&buf, c)
|
||||||
|
|
|
@ -81,7 +81,12 @@ type Elasticsearch struct {
|
||||||
|
|
||||||
// NewElasticsearch return a new instance of Elasticsearch
|
// NewElasticsearch return a new instance of Elasticsearch
|
||||||
func NewElasticsearch() *Elasticsearch {
|
func NewElasticsearch() *Elasticsearch {
|
||||||
return &Elasticsearch{client: http.DefaultClient}
|
tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)}
|
||||||
|
client := &http.Client{
|
||||||
|
Transport: tr,
|
||||||
|
Timeout: time.Duration(4 * time.Second),
|
||||||
|
}
|
||||||
|
return &Elasticsearch{client: client}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SampleConfig returns sample configuration for this plugin.
|
// SampleConfig returns sample configuration for this plugin.
|
||||||
|
|
|
@ -129,8 +129,11 @@ func (g *haproxy) Gather(acc telegraf.Accumulator) error {
|
||||||
|
|
||||||
func (g *haproxy) gatherServer(addr string, acc telegraf.Accumulator) error {
|
func (g *haproxy) gatherServer(addr string, acc telegraf.Accumulator) error {
|
||||||
if g.client == nil {
|
if g.client == nil {
|
||||||
|
tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)}
|
||||||
client := &http.Client{}
|
client := &http.Client{
|
||||||
|
Transport: tr,
|
||||||
|
Timeout: time.Duration(4 * time.Second),
|
||||||
|
}
|
||||||
g.client = client
|
g.client = client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -244,6 +244,11 @@ func (h *HttpJson) sendRequest(serverURL string) (string, float64, error) {
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
inputs.Add("httpjson", func() telegraf.Input {
|
inputs.Add("httpjson", func() telegraf.Input {
|
||||||
return &HttpJson{client: RealHTTPClient{client: &http.Client{}}}
|
tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)}
|
||||||
|
client := &http.Client{
|
||||||
|
Transport: tr,
|
||||||
|
Timeout: time.Duration(4 * time.Second),
|
||||||
|
}
|
||||||
|
return &HttpJson{client: RealHTTPClient{client: client}}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
@ -70,6 +71,15 @@ type point struct {
|
||||||
Values map[string]interface{} `json:"values"`
|
Values map[string]interface{} `json:"values"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var tr = &http.Transport{
|
||||||
|
ResponseHeaderTimeout: time.Duration(3 * time.Second),
|
||||||
|
}
|
||||||
|
|
||||||
|
var client = &http.Client{
|
||||||
|
Transport: tr,
|
||||||
|
Timeout: time.Duration(4 * time.Second),
|
||||||
|
}
|
||||||
|
|
||||||
// Gathers data from a particular URL
|
// Gathers data from a particular URL
|
||||||
// Parameters:
|
// Parameters:
|
||||||
// acc : The telegraf Accumulator to use
|
// acc : The telegraf Accumulator to use
|
||||||
|
@ -81,7 +91,7 @@ func (i *InfluxDB) gatherURL(
|
||||||
acc telegraf.Accumulator,
|
acc telegraf.Accumulator,
|
||||||
url string,
|
url string,
|
||||||
) error {
|
) error {
|
||||||
resp, err := http.Get(url)
|
resp, err := client.Get(url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
@ -160,6 +161,11 @@ func (j *Jolokia) Gather(acc telegraf.Accumulator) error {
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
inputs.Add("jolokia", func() telegraf.Input {
|
inputs.Add("jolokia", func() telegraf.Input {
|
||||||
return &Jolokia{jClient: &JolokiaClientImpl{client: &http.Client{}}}
|
tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)}
|
||||||
|
client := &http.Client{
|
||||||
|
Transport: tr,
|
||||||
|
Timeout: time.Duration(4 * time.Second),
|
||||||
|
}
|
||||||
|
return &Jolokia{jClient: &JolokiaClientImpl{client: client}}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"net/url"
|
"net/url"
|
||||||
"regexp"
|
"regexp"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -120,7 +121,10 @@ func (a *ChimpAPI) GetReport(campaignID string) (Report, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func runChimp(api *ChimpAPI, params ReportsParams) ([]byte, error) {
|
func runChimp(api *ChimpAPI, params ReportsParams) ([]byte, error) {
|
||||||
client := &http.Client{Transport: api.Transport}
|
client := &http.Client{
|
||||||
|
Transport: api.Transport,
|
||||||
|
Timeout: time.Duration(4 * time.Second),
|
||||||
|
}
|
||||||
|
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
req, err := http.NewRequest("GET", api.url.String(), &b)
|
req, err := http.NewRequest("GET", api.url.String(), &b)
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
@ -261,6 +262,15 @@ func (m *Mesos) removeGroup(j *map[string]interface{}) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var tr = &http.Transport{
|
||||||
|
ResponseHeaderTimeout: time.Duration(3 * time.Second),
|
||||||
|
}
|
||||||
|
|
||||||
|
var client = &http.Client{
|
||||||
|
Transport: tr,
|
||||||
|
Timeout: time.Duration(4 * time.Second),
|
||||||
|
}
|
||||||
|
|
||||||
// This should not belong to the object
|
// This should not belong to the object
|
||||||
func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error {
|
func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error {
|
||||||
var jsonOut map[string]interface{}
|
var jsonOut map[string]interface{}
|
||||||
|
@ -282,7 +292,7 @@ func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error {
|
||||||
|
|
||||||
ts := strconv.Itoa(m.Timeout) + "ms"
|
ts := strconv.Itoa(m.Timeout) + "ms"
|
||||||
|
|
||||||
resp, err := http.Get("http://" + a + "/metrics/snapshot?timeout=" + ts)
|
resp, err := client.Get("http://" + a + "/metrics/snapshot?timeout=" + ts)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -2,8 +2,10 @@ package mysql
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
_ "github.com/go-sql-driver/mysql"
|
_ "github.com/go-sql-driver/mysql"
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
@ -26,6 +28,8 @@ var sampleConfig = `
|
||||||
servers = ["tcp(127.0.0.1:3306)/"]
|
servers = ["tcp(127.0.0.1:3306)/"]
|
||||||
`
|
`
|
||||||
|
|
||||||
|
var defaultTimeout = time.Second * time.Duration(5)
|
||||||
|
|
||||||
func (m *Mysql) SampleConfig() string {
|
func (m *Mysql) SampleConfig() string {
|
||||||
return sampleConfig
|
return sampleConfig
|
||||||
}
|
}
|
||||||
|
@ -122,6 +126,10 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error {
|
||||||
serv = ""
|
serv = ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
serv, err := dsnAddTimeout(serv)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
db, err := sql.Open("mysql", serv)
|
db, err := sql.Open("mysql", serv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -207,6 +215,21 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func dsnAddTimeout(dsn string) (string, error) {
|
||||||
|
u, err := url.Parse(dsn)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
v := u.Query()
|
||||||
|
|
||||||
|
// Only override timeout if not already defined
|
||||||
|
if _, ok := v["timeout"]; ok == false {
|
||||||
|
v.Add("timeout", defaultTimeout.String())
|
||||||
|
u.RawQuery = v.Encode()
|
||||||
|
}
|
||||||
|
return u.String(), nil
|
||||||
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
inputs.Add("mysql", func() telegraf.Input {
|
inputs.Add("mysql", func() telegraf.Input {
|
||||||
return &Mysql{}
|
return &Mysql{}
|
||||||
|
|
|
@ -84,3 +84,38 @@ func TestMysqlParseDSN(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMysqlDNSAddTimeout(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
input string
|
||||||
|
output string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"",
|
||||||
|
"?timeout=5s",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"127.0.0.1",
|
||||||
|
"127.0.0.1?timeout=5s",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"tcp(192.168.1.1:3306)/",
|
||||||
|
"tcp(192.168.1.1:3306)/?timeout=5s",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"root:passwd@tcp(192.168.1.1:3306)/?tls=false",
|
||||||
|
"root:passwd@tcp(192.168.1.1:3306)/?timeout=5s&tls=false",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"root:passwd@tcp(192.168.1.1:3306)/?tls=false&timeout=10s",
|
||||||
|
"root:passwd@tcp(192.168.1.1:3306)/?tls=false&timeout=10s",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
output, _ := parseDSN(test.input)
|
||||||
|
if output != test.output {
|
||||||
|
t.Errorf("Expected %s, got %s\n", test.output, output)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -58,7 +58,10 @@ var tr = &http.Transport{
|
||||||
ResponseHeaderTimeout: time.Duration(3 * time.Second),
|
ResponseHeaderTimeout: time.Duration(3 * time.Second),
|
||||||
}
|
}
|
||||||
|
|
||||||
var client = &http.Client{Transport: tr}
|
var client = &http.Client{
|
||||||
|
Transport: tr,
|
||||||
|
Timeout: time.Duration(4 * time.Second),
|
||||||
|
}
|
||||||
|
|
||||||
func (n *Nginx) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
|
func (n *Nginx) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
|
||||||
resp, err := client.Get(addr.String())
|
resp, err := client.Get(addr.String())
|
||||||
|
|
|
@ -84,7 +84,10 @@ var tr = &http.Transport{
|
||||||
ResponseHeaderTimeout: time.Duration(3 * time.Second),
|
ResponseHeaderTimeout: time.Duration(3 * time.Second),
|
||||||
}
|
}
|
||||||
|
|
||||||
var client = &http.Client{Transport: tr}
|
var client = &http.Client{
|
||||||
|
Transport: tr,
|
||||||
|
Timeout: time.Duration(4 * time.Second),
|
||||||
|
}
|
||||||
|
|
||||||
func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error {
|
func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error {
|
||||||
u, err := buildURL(e)
|
u, err := buildURL(e)
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Prometheus struct {
|
type Prometheus struct {
|
||||||
|
@ -51,8 +52,17 @@ func (g *Prometheus) Gather(acc telegraf.Accumulator) error {
|
||||||
return outerr
|
return outerr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var tr = &http.Transport{
|
||||||
|
ResponseHeaderTimeout: time.Duration(3 * time.Second),
|
||||||
|
}
|
||||||
|
|
||||||
|
var client = &http.Client{
|
||||||
|
Transport: tr,
|
||||||
|
Timeout: time.Duration(4 * time.Second),
|
||||||
|
}
|
||||||
|
|
||||||
func (g *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
|
func (g *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
|
||||||
resp, err := http.Get(url)
|
resp, err := client.Get(url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
|
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -122,7 +122,11 @@ func (r *RabbitMQ) Description() string {
|
||||||
|
|
||||||
func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
|
func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
|
||||||
if r.Client == nil {
|
if r.Client == nil {
|
||||||
r.Client = &http.Client{}
|
tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)}
|
||||||
|
r.Client = &http.Client{
|
||||||
|
Transport: tr,
|
||||||
|
Timeout: time.Duration(4 * time.Second),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var errChan = make(chan error, len(gatherFunctions))
|
var errChan = make(chan error, len(gatherFunctions))
|
||||||
|
|
|
@ -177,8 +177,11 @@ func (r *Raindrops) getTags(addr *url.URL) map[string]string {
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
inputs.Add("raindrops", func() telegraf.Input {
|
inputs.Add("raindrops", func() telegraf.Input {
|
||||||
return &Raindrops{http_client: &http.Client{Transport: &http.Transport{
|
return &Raindrops{http_client: &http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
ResponseHeaderTimeout: time.Duration(3 * time.Second),
|
ResponseHeaderTimeout: time.Duration(3 * time.Second),
|
||||||
}}}
|
},
|
||||||
|
Timeout: time.Duration(4 * time.Second),
|
||||||
|
}}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
@ -30,6 +31,8 @@ var sampleConfig = `
|
||||||
servers = ["tcp://localhost:6379"]
|
servers = ["tcp://localhost:6379"]
|
||||||
`
|
`
|
||||||
|
|
||||||
|
var defaultTimeout = 5 * time.Second
|
||||||
|
|
||||||
func (r *Redis) SampleConfig() string {
|
func (r *Redis) SampleConfig() string {
|
||||||
return sampleConfig
|
return sampleConfig
|
||||||
}
|
}
|
||||||
|
@ -120,12 +123,15 @@ func (r *Redis) gatherServer(addr *url.URL, acc telegraf.Accumulator) error {
|
||||||
addr.Host = addr.Host + ":" + defaultPort
|
addr.Host = addr.Host + ":" + defaultPort
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := net.Dial("tcp", addr.Host)
|
c, err := net.DialTimeout("tcp", addr.Host, defaultTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Unable to connect to redis server '%s': %s", addr.Host, err)
|
return fmt.Errorf("Unable to connect to redis server '%s': %s", addr.Host, err)
|
||||||
}
|
}
|
||||||
defer c.Close()
|
defer c.Close()
|
||||||
|
|
||||||
|
// Extend connection
|
||||||
|
c.SetDeadline(time.Now().Add(defaultTimeout))
|
||||||
|
|
||||||
if addr.User != nil {
|
if addr.User != nil {
|
||||||
pwd, set := addr.User.Password()
|
pwd, set := addr.User.Password()
|
||||||
if set && pwd != "" {
|
if set && pwd != "" {
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
@ -20,7 +21,12 @@ type Riak struct {
|
||||||
|
|
||||||
// NewRiak return a new instance of Riak with a default http client
|
// NewRiak return a new instance of Riak with a default http client
|
||||||
func NewRiak() *Riak {
|
func NewRiak() *Riak {
|
||||||
return &Riak{client: http.DefaultClient}
|
tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)}
|
||||||
|
client := &http.Client{
|
||||||
|
Transport: tr,
|
||||||
|
Timeout: time.Duration(4 * time.Second),
|
||||||
|
}
|
||||||
|
return &Riak{client: client}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Type riakStats represents the data that is received from Riak
|
// Type riakStats represents the data that is received from Riak
|
||||||
|
|
|
@ -67,6 +67,9 @@ func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error
|
||||||
}
|
}
|
||||||
defer c.Close()
|
defer c.Close()
|
||||||
|
|
||||||
|
// Extend connection
|
||||||
|
c.SetDeadline(time.Now().Add(defaultTimeout))
|
||||||
|
|
||||||
fmt.Fprintf(c, "%s\n", "mntr")
|
fmt.Fprintf(c, "%s\n", "mntr")
|
||||||
rdr := bufio.NewReader(c)
|
rdr := bufio.NewReader(c)
|
||||||
scanner := bufio.NewScanner(rdr)
|
scanner := bufio.NewScanner(rdr)
|
||||||
|
|
Loading…
Reference in New Issue