Merge branch 'reuse-transport'

This commit is contained in:
Daniel Nelson 2017-05-10 18:19:21 -07:00
commit a47aa0dcc2
No known key found for this signature in database
GPG Key ID: CAAD59C9444F6155
6 changed files with 226 additions and 122 deletions

View File

@ -9,6 +9,7 @@
- [#2749](https://github.com/influxdata/telegraf/pull/2749): Fixed sqlserver input to work with case sensitive server collation. - [#2749](https://github.com/influxdata/telegraf/pull/2749): Fixed sqlserver input to work with case sensitive server collation.
- [#2716](https://github.com/influxdata/telegraf/pull/2716): Systemd does not see all shutdowns as failures - [#2716](https://github.com/influxdata/telegraf/pull/2716): Systemd does not see all shutdowns as failures
- [#2782](https://github.com/influxdata/telegraf/pull/2782): Reuse transports in input plugins
## v1.3 [unreleased] ## v1.3 [unreleased]

View File

@ -29,6 +29,8 @@ type Apache struct {
SSLKey string `toml:"ssl_key"` SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification // Use SSL but skip chain & host verification
InsecureSkipVerify bool InsecureSkipVerify bool
client *http.Client
} }
var sampleConfig = ` var sampleConfig = `
@ -66,6 +68,14 @@ func (n *Apache) Gather(acc telegraf.Accumulator) error {
n.ResponseTimeout.Duration = time.Second * 5 n.ResponseTimeout.Duration = time.Second * 5
} }
if n.client == nil {
client, err := n.createHttpClient()
if err != nil {
return err
}
n.client = client
}
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(n.Urls)) wg.Add(len(n.Urls))
for _, u := range n.Urls { for _, u := range n.Urls {
@ -85,31 +95,24 @@ func (n *Apache) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error { func (n *Apache) createHttpClient() (*http.Client, error) {
tlsCfg, err := internal.GetTLSConfig(
var tr *http.Transport n.SSLCert, n.SSLKey, n.SSLCA, n.InsecureSkipVerify)
if err != nil {
if addr.Scheme == "https" { return nil, err
tlsCfg, err := internal.GetTLSConfig(
n.SSLCert, n.SSLKey, n.SSLCA, n.InsecureSkipVerify)
if err != nil {
return err
}
tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
TLSClientConfig: tlsCfg,
}
} else {
tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}
} }
client := &http.Client{ client := &http.Client{
Transport: tr, Transport: &http.Transport{
Timeout: n.ResponseTimeout.Duration, TLSClientConfig: tlsCfg,
},
Timeout: n.ResponseTimeout.Duration,
} }
return client, nil
}
func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
req, err := http.NewRequest("GET", addr.String(), nil) req, err := http.NewRequest("GET", addr.String(), nil)
if err != nil { if err != nil {
return fmt.Errorf("error on new request to %s : %s\n", addr.String(), err) return fmt.Errorf("error on new request to %s : %s\n", addr.String(), err)
@ -119,7 +122,7 @@ func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
req.SetBasicAuth(n.Username, n.Password) req.SetBasicAuth(n.Username, n.Password)
} }
resp, err := client.Do(req) resp, err := n.client.Do(req)
if err != nil { if err != nil {
return fmt.Errorf("error on request to %s : %s\n", addr.String(), err) return fmt.Errorf("error on request to %s : %s\n", addr.String(), err)
} }

View File

@ -25,7 +25,6 @@ type HTTPResponse struct {
Headers map[string]string Headers map[string]string
FollowRedirects bool FollowRedirects bool
ResponseStringMatch string ResponseStringMatch string
compiledStringMatch *regexp.Regexp
// Path to CA file // Path to CA file
SSLCA string `toml:"ssl_ca"` SSLCA string `toml:"ssl_ca"`
@ -35,6 +34,9 @@ type HTTPResponse struct {
SSLKey string `toml:"ssl_key"` SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification // Use SSL but skip chain & host verification
InsecureSkipVerify bool InsecureSkipVerify bool
compiledStringMatch *regexp.Regexp
client *http.Client
} }
// Description returns the plugin Description // Description returns the plugin Description
@ -88,13 +90,12 @@ func (h *HTTPResponse) createHttpClient() (*http.Client, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
tr := &http.Transport{
ResponseHeaderTimeout: h.ResponseTimeout.Duration,
TLSClientConfig: tlsCfg,
}
client := &http.Client{ client := &http.Client{
Transport: tr, Transport: &http.Transport{
Timeout: h.ResponseTimeout.Duration, DisableKeepAlives: true,
TLSClientConfig: tlsCfg,
},
Timeout: h.ResponseTimeout.Duration,
} }
if h.FollowRedirects == false { if h.FollowRedirects == false {
@ -106,15 +107,10 @@ func (h *HTTPResponse) createHttpClient() (*http.Client, error) {
} }
// HTTPGather gathers all fields and returns any errors it encounters // HTTPGather gathers all fields and returns any errors it encounters
func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) { func (h *HTTPResponse) httpGather() (map[string]interface{}, error) {
// Prepare fields // Prepare fields
fields := make(map[string]interface{}) fields := make(map[string]interface{})
client, err := h.createHttpClient()
if err != nil {
return nil, err
}
var body io.Reader var body io.Reader
if h.Body != "" { if h.Body != "" {
body = strings.NewReader(h.Body) body = strings.NewReader(h.Body)
@ -133,7 +129,7 @@ func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) {
// Start Timer // Start Timer
start := time.Now() start := time.Now()
resp, err := client.Do(request) resp, err := h.client.Do(request)
if err != nil { if err != nil {
if h.FollowRedirects { if h.FollowRedirects {
return nil, err return nil, err
@ -145,6 +141,11 @@ func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) {
return nil, err return nil, err
} }
} }
defer func() {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}()
fields["response_time"] = time.Since(start).Seconds() fields["response_time"] = time.Since(start).Seconds()
fields["http_response_code"] = resp.StatusCode fields["http_response_code"] = resp.StatusCode
@ -202,8 +203,17 @@ func (h *HTTPResponse) Gather(acc telegraf.Accumulator) error {
// Prepare data // Prepare data
tags := map[string]string{"server": h.Address, "method": h.Method} tags := map[string]string{"server": h.Address, "method": h.Method}
var fields map[string]interface{} var fields map[string]interface{}
if h.client == nil {
client, err := h.createHttpClient()
if err != nil {
return err
}
h.client = client
}
// Gather data // Gather data
fields, err = h.HTTPGather() fields, err = h.httpGather()
if err != nil { if err != nil {
return err return err
} }

View File

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -73,13 +74,13 @@ func TestHeaders(t *testing.T) {
"Host": "Hello", "Host": "Hello",
}, },
} }
fields, err := h.HTTPGather() var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) { value, ok := acc.IntField("http_response", "http_response_code")
assert.Equal(t, http.StatusOK, fields["http_response_code"]) require.True(t, ok)
} require.Equal(t, http.StatusOK, value)
assert.NotNil(t, fields["response_time"])
} }
func TestFields(t *testing.T) { func TestFields(t *testing.T) {
@ -97,13 +98,14 @@ func TestFields(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err := h.HTTPGather()
var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) { value, ok := acc.IntField("http_response", "http_response_code")
assert.Equal(t, http.StatusOK, fields["http_response_code"]) require.True(t, ok)
} require.Equal(t, http.StatusOK, value)
assert.NotNil(t, fields["response_time"])
} }
func TestRedirects(t *testing.T) { func TestRedirects(t *testing.T) {
@ -121,12 +123,13 @@ func TestRedirects(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err := h.HTTPGather() var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) { value, ok := acc.IntField("http_response", "http_response_code")
assert.Equal(t, http.StatusOK, fields["http_response_code"]) require.True(t, ok)
} require.Equal(t, http.StatusOK, value)
h = &HTTPResponse{ h = &HTTPResponse{
Address: ts.URL + "/badredirect", Address: ts.URL + "/badredirect",
@ -138,8 +141,12 @@ func TestRedirects(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err = h.HTTPGather() acc = testutil.Accumulator{}
err = h.Gather(&acc)
require.Error(t, err) require.Error(t, err)
value, ok = acc.IntField("http_response", "http_response_code")
require.False(t, ok)
} }
func TestMethod(t *testing.T) { func TestMethod(t *testing.T) {
@ -157,12 +164,13 @@ func TestMethod(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err := h.HTTPGather() var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) { value, ok := acc.IntField("http_response", "http_response_code")
assert.Equal(t, http.StatusOK, fields["http_response_code"]) require.True(t, ok)
} require.Equal(t, http.StatusOK, value)
h = &HTTPResponse{ h = &HTTPResponse{
Address: ts.URL + "/mustbepostmethod", Address: ts.URL + "/mustbepostmethod",
@ -174,12 +182,13 @@ func TestMethod(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err = h.HTTPGather() acc = testutil.Accumulator{}
err = h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) { value, ok = acc.IntField("http_response", "http_response_code")
assert.Equal(t, http.StatusMethodNotAllowed, fields["http_response_code"]) require.True(t, ok)
} require.Equal(t, http.StatusMethodNotAllowed, value)
//check that lowercase methods work correctly //check that lowercase methods work correctly
h = &HTTPResponse{ h = &HTTPResponse{
@ -192,12 +201,13 @@ func TestMethod(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err = h.HTTPGather() acc = testutil.Accumulator{}
err = h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) { value, ok = acc.IntField("http_response", "http_response_code")
assert.Equal(t, http.StatusMethodNotAllowed, fields["http_response_code"]) require.True(t, ok)
} require.Equal(t, http.StatusMethodNotAllowed, value)
} }
func TestBody(t *testing.T) { func TestBody(t *testing.T) {
@ -215,12 +225,13 @@ func TestBody(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err := h.HTTPGather() var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) { value, ok := acc.IntField("http_response", "http_response_code")
assert.Equal(t, http.StatusOK, fields["http_response_code"]) require.True(t, ok)
} require.Equal(t, http.StatusOK, value)
h = &HTTPResponse{ h = &HTTPResponse{
Address: ts.URL + "/musthaveabody", Address: ts.URL + "/musthaveabody",
@ -231,12 +242,13 @@ func TestBody(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err = h.HTTPGather() acc = testutil.Accumulator{}
err = h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) { value, ok = acc.IntField("http_response", "http_response_code")
assert.Equal(t, http.StatusBadRequest, fields["http_response_code"]) require.True(t, ok)
} require.Equal(t, http.StatusBadRequest, value)
} }
func TestStringMatch(t *testing.T) { func TestStringMatch(t *testing.T) {
@ -255,15 +267,18 @@ func TestStringMatch(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err := h.HTTPGather() var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
assert.Equal(t, 1, fields["response_string_match"])
assert.NotNil(t, fields["response_time"])
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
value, ok = acc.IntField("http_response", "response_string_match")
require.True(t, ok)
require.Equal(t, 1, value)
_, ok = acc.FloatField("http_response", "response_time")
require.True(t, ok)
} }
func TestStringMatchJson(t *testing.T) { func TestStringMatchJson(t *testing.T) {
@ -282,15 +297,18 @@ func TestStringMatchJson(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err := h.HTTPGather() var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
assert.Equal(t, 1, fields["response_string_match"])
assert.NotNil(t, fields["response_time"])
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
value, ok = acc.IntField("http_response", "response_string_match")
require.True(t, ok)
require.Equal(t, 1, value)
_, ok = acc.FloatField("http_response", "response_time")
require.True(t, ok)
} }
func TestStringMatchFail(t *testing.T) { func TestStringMatchFail(t *testing.T) {
@ -309,18 +327,26 @@ func TestStringMatchFail(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err := h.HTTPGather()
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
assert.Equal(t, 0, fields["response_string_match"])
assert.NotNil(t, fields["response_time"])
var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err)
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
value, ok = acc.IntField("http_response", "response_string_match")
require.True(t, ok)
require.Equal(t, 0, value)
_, ok = acc.FloatField("http_response", "response_time")
require.True(t, ok)
} }
func TestTimeout(t *testing.T) { func TestTimeout(t *testing.T) {
if testing.Short() {
t.Skip("Skipping test with sleep in short mode.")
}
mux := setUpTestMux() mux := setUpTestMux()
ts := httptest.NewServer(mux) ts := httptest.NewServer(mux)
defer ts.Close() defer ts.Close()
@ -335,6 +361,10 @@ func TestTimeout(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
_, err := h.HTTPGather() var acc testutil.Accumulator
require.Error(t, err) err := h.Gather(&acc)
require.NoError(t, err)
ok := acc.HasIntField("http_response", "http_response_code")
require.False(t, ok)
} }

View File

@ -4,7 +4,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net"
"net/http" "net/http"
"sync" "sync"
"time" "time"
@ -32,6 +31,8 @@ type Prometheus struct {
SSLKey string `toml:"ssl_key"` SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification // Use SSL but skip chain & host verification
InsecureSkipVerify bool InsecureSkipVerify bool
client *http.Client
} }
var sampleConfig = ` var sampleConfig = `
@ -65,6 +66,14 @@ var ErrProtocolError = errors.New("prometheus protocol error")
// Reads stats from all configured servers accumulates stats. // Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any). // Returns one of the errors encountered while gather stats (if any).
func (p *Prometheus) Gather(acc telegraf.Accumulator) error { func (p *Prometheus) Gather(acc telegraf.Accumulator) error {
if p.client == nil {
client, err := p.createHttpClient()
if err != nil {
return err
}
p.client = client
}
var wg sync.WaitGroup var wg sync.WaitGroup
for _, serv := range p.Urls { for _, serv := range p.Urls {
@ -89,29 +98,30 @@ var client = &http.Client{
Timeout: time.Duration(4 * time.Second), Timeout: time.Duration(4 * time.Second),
} }
func (p *Prometheus) createHttpClient() (*http.Client, error) {
tlsCfg, err := internal.GetTLSConfig(
p.SSLCert, p.SSLKey, p.SSLCA, p.InsecureSkipVerify)
if err != nil {
return nil, err
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
DisableKeepAlives: true,
},
Timeout: p.ResponseTimeout.Duration,
}
return client, nil
}
func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error { func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
var req, err = http.NewRequest("GET", url, nil) var req, err = http.NewRequest("GET", url, nil)
req.Header.Add("Accept", acceptHeader) req.Header.Add("Accept", acceptHeader)
var token []byte var token []byte
var resp *http.Response var resp *http.Response
tlsCfg, err := internal.GetTLSConfig(
p.SSLCert, p.SSLKey, p.SSLCA, p.InsecureSkipVerify)
if err != nil {
return err
}
var rt http.RoundTripper = &http.Transport{
Dial: (&net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 5 * time.Second,
TLSClientConfig: tlsCfg,
ResponseHeaderTimeout: p.ResponseTimeout.Duration,
DisableKeepAlives: true,
}
if p.BearerToken != "" { if p.BearerToken != "" {
token, err = ioutil.ReadFile(p.BearerToken) token, err = ioutil.ReadFile(p.BearerToken)
if err != nil { if err != nil {
@ -120,7 +130,7 @@ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
req.Header.Set("Authorization", "Bearer "+string(token)) req.Header.Set("Authorization", "Bearer "+string(token))
} }
resp, err = rt.RoundTrip(req) resp, err = p.client.Do(req)
if err != nil { if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", url, err) return fmt.Errorf("error making HTTP request to %s: %s", url, err)
} }

View File

@ -417,3 +417,53 @@ func (a *Accumulator) HasMeasurement(measurement string) bool {
} }
return false return false
} }
func (a *Accumulator) IntField(measurement string, field string) (int, bool) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
v, ok := value.(int)
return v, ok
}
}
}
}
return 0, false
}
func (a *Accumulator) FloatField(measurement string, field string) (float64, bool) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
v, ok := value.(float64)
return v, ok
}
}
}
}
return 0.0, false
}
func (a *Accumulator) StringField(measurement string, field string) (string, bool) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
v, ok := value.(string)
return v, ok
}
}
}
}
return "", false
}