From 9e8500287534b1b30a5ba2ece3cfa4a848c3f272 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 9 May 2017 16:19:56 -0700 Subject: [PATCH 1/5] Fix apache input creation of transport on every gather. --- plugins/inputs/apache/apache.go | 45 ++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/plugins/inputs/apache/apache.go b/plugins/inputs/apache/apache.go index b50860cfb..1cf223e24 100644 --- a/plugins/inputs/apache/apache.go +++ b/plugins/inputs/apache/apache.go @@ -29,6 +29,8 @@ type Apache struct { SSLKey string `toml:"ssl_key"` // Use SSL but skip chain & host verification InsecureSkipVerify bool + + client *http.Client } var sampleConfig = ` @@ -66,6 +68,14 @@ func (n *Apache) Gather(acc telegraf.Accumulator) error { n.ResponseTimeout.Duration = time.Second * 5 } + if n.client == nil { + client, err := n.createHttpClient() + if err != nil { + return err + } + n.client = client + } + var wg sync.WaitGroup wg.Add(len(n.Urls)) for _, u := range n.Urls { @@ -85,31 +95,24 @@ func (n *Apache) Gather(acc telegraf.Accumulator) error { return nil } -func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error { - - var tr *http.Transport - - if addr.Scheme == "https" { - tlsCfg, err := internal.GetTLSConfig( - n.SSLCert, n.SSLKey, n.SSLCA, n.InsecureSkipVerify) - if err != nil { - return err - } - tr = &http.Transport{ - ResponseHeaderTimeout: time.Duration(3 * time.Second), - TLSClientConfig: tlsCfg, - } - } else { - tr = &http.Transport{ - ResponseHeaderTimeout: time.Duration(3 * time.Second), - } +func (n *Apache) createHttpClient() (*http.Client, error) { + tlsCfg, err := internal.GetTLSConfig( + n.SSLCert, n.SSLKey, n.SSLCA, n.InsecureSkipVerify) + if err != nil { + return nil, err } client := &http.Client{ - Transport: tr, - Timeout: n.ResponseTimeout.Duration, + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + }, + Timeout: n.ResponseTimeout.Duration, } + return client, nil +} + +func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error { req, err := http.NewRequest("GET", addr.String(), nil) if err != nil { return fmt.Errorf("error on new request to %s : %s\n", addr.String(), err) @@ -119,7 +122,7 @@ func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error { req.SetBasicAuth(n.Username, n.Password) } - resp, err := client.Do(req) + resp, err := n.client.Do(req) if err != nil { return fmt.Errorf("error on request to %s : %s\n", addr.String(), err) } From b6312cf13cbf1c28b848992e4c0fa7f751847df5 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 9 May 2017 16:20:43 -0700 Subject: [PATCH 2/5] Fix prometheus input creation of transport on every gather --- plugins/inputs/prometheus/prometheus.go | 48 +++++++++++++++---------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index 2e613e2c0..73e202d7a 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "io/ioutil" - "net" "net/http" "sync" "time" @@ -32,6 +31,8 @@ type Prometheus struct { SSLKey string `toml:"ssl_key"` // Use SSL but skip chain & host verification InsecureSkipVerify bool + + client *http.Client } var sampleConfig = ` @@ -65,6 +66,14 @@ var ErrProtocolError = errors.New("prometheus protocol error") // Reads stats from all configured servers accumulates stats. // Returns one of the errors encountered while gather stats (if any). func (p *Prometheus) Gather(acc telegraf.Accumulator) error { + if p.client == nil { + client, err := p.createHttpClient() + if err != nil { + return err + } + p.client = client + } + var wg sync.WaitGroup for _, serv := range p.Urls { @@ -89,29 +98,30 @@ var client = &http.Client{ Timeout: time.Duration(4 * time.Second), } +func (p *Prometheus) createHttpClient() (*http.Client, error) { + tlsCfg, err := internal.GetTLSConfig( + p.SSLCert, p.SSLKey, p.SSLCA, p.InsecureSkipVerify) + if err != nil { + return nil, err + } + + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + DisableKeepAlives: true, + }, + Timeout: p.ResponseTimeout.Duration, + } + + return client, nil +} + func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error { var req, err = http.NewRequest("GET", url, nil) req.Header.Add("Accept", acceptHeader) var token []byte var resp *http.Response - tlsCfg, err := internal.GetTLSConfig( - p.SSLCert, p.SSLKey, p.SSLCA, p.InsecureSkipVerify) - if err != nil { - return err - } - - var rt http.RoundTripper = &http.Transport{ - Dial: (&net.Dialer{ - Timeout: 5 * time.Second, - KeepAlive: 30 * time.Second, - }).Dial, - TLSHandshakeTimeout: 5 * time.Second, - TLSClientConfig: tlsCfg, - ResponseHeaderTimeout: p.ResponseTimeout.Duration, - DisableKeepAlives: true, - } - if p.BearerToken != "" { token, err = ioutil.ReadFile(p.BearerToken) if err != nil { @@ -120,7 +130,7 @@ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error { req.Header.Set("Authorization", "Bearer "+string(token)) } - resp, err = rt.RoundTrip(req) + resp, err = p.client.Do(req) if err != nil { return fmt.Errorf("error making HTTP request to %s: %s", url, err) } From 3381ac8f94a1da8dfc0d91d53d95582a905a05d2 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 9 May 2017 16:21:04 -0700 Subject: [PATCH 3/5] Fix http_response input creation of transport on every gather --- plugins/inputs/http_response/http_response.go | 34 ++-- .../http_response/http_response_test.go | 164 +++++++++++------- testutil/accumulator.go | 50 ++++++ 3 files changed, 166 insertions(+), 82 deletions(-) diff --git a/plugins/inputs/http_response/http_response.go b/plugins/inputs/http_response/http_response.go index 111e35518..7dd043043 100644 --- a/plugins/inputs/http_response/http_response.go +++ b/plugins/inputs/http_response/http_response.go @@ -25,7 +25,6 @@ type HTTPResponse struct { Headers map[string]string FollowRedirects bool ResponseStringMatch string - compiledStringMatch *regexp.Regexp // Path to CA file SSLCA string `toml:"ssl_ca"` @@ -35,6 +34,9 @@ type HTTPResponse struct { SSLKey string `toml:"ssl_key"` // Use SSL but skip chain & host verification InsecureSkipVerify bool + + compiledStringMatch *regexp.Regexp + client *http.Client } // Description returns the plugin Description @@ -88,13 +90,11 @@ func (h *HTTPResponse) createHttpClient() (*http.Client, error) { if err != nil { return nil, err } - tr := &http.Transport{ - ResponseHeaderTimeout: h.ResponseTimeout.Duration, - TLSClientConfig: tlsCfg, - } client := &http.Client{ - Transport: tr, - Timeout: h.ResponseTimeout.Duration, + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + }, + Timeout: h.ResponseTimeout.Duration, } if h.FollowRedirects == false { @@ -106,15 +106,10 @@ func (h *HTTPResponse) createHttpClient() (*http.Client, error) { } // HTTPGather gathers all fields and returns any errors it encounters -func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) { +func (h *HTTPResponse) httpGather() (map[string]interface{}, error) { // Prepare fields fields := make(map[string]interface{}) - client, err := h.createHttpClient() - if err != nil { - return nil, err - } - var body io.Reader if h.Body != "" { body = strings.NewReader(h.Body) @@ -133,7 +128,7 @@ func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) { // Start Timer start := time.Now() - resp, err := client.Do(request) + resp, err := h.client.Do(request) if err != nil { if h.FollowRedirects { return nil, err @@ -202,8 +197,17 @@ func (h *HTTPResponse) Gather(acc telegraf.Accumulator) error { // Prepare data tags := map[string]string{"server": h.Address, "method": h.Method} var fields map[string]interface{} + + if h.client == nil { + client, err := h.createHttpClient() + if err != nil { + return err + } + h.client = client + } + // Gather data - fields, err = h.HTTPGather() + fields, err = h.httpGather() if err != nil { return err } diff --git a/plugins/inputs/http_response/http_response_test.go b/plugins/inputs/http_response/http_response_test.go index b65b1f954..babb23dab 100644 --- a/plugins/inputs/http_response/http_response_test.go +++ b/plugins/inputs/http_response/http_response_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -73,13 +74,13 @@ func TestHeaders(t *testing.T) { "Host": "Hello", }, } - fields, err := h.HTTPGather() + var acc testutil.Accumulator + err := h.Gather(&acc) require.NoError(t, err) - assert.NotEmpty(t, fields) - if assert.NotNil(t, fields["http_response_code"]) { - assert.Equal(t, http.StatusOK, fields["http_response_code"]) - } - assert.NotNil(t, fields["response_time"]) + + value, ok := acc.IntField("http_response", "http_response_code") + require.True(t, ok) + require.Equal(t, http.StatusOK, value) } func TestFields(t *testing.T) { @@ -97,13 +98,14 @@ func TestFields(t *testing.T) { }, FollowRedirects: true, } - fields, err := h.HTTPGather() + + var acc testutil.Accumulator + err := h.Gather(&acc) require.NoError(t, err) - assert.NotEmpty(t, fields) - if assert.NotNil(t, fields["http_response_code"]) { - assert.Equal(t, http.StatusOK, fields["http_response_code"]) - } - assert.NotNil(t, fields["response_time"]) + + value, ok := acc.IntField("http_response", "http_response_code") + require.True(t, ok) + require.Equal(t, http.StatusOK, value) } func TestRedirects(t *testing.T) { @@ -121,12 +123,13 @@ func TestRedirects(t *testing.T) { }, FollowRedirects: true, } - fields, err := h.HTTPGather() + var acc testutil.Accumulator + err := h.Gather(&acc) require.NoError(t, err) - assert.NotEmpty(t, fields) - if assert.NotNil(t, fields["http_response_code"]) { - assert.Equal(t, http.StatusOK, fields["http_response_code"]) - } + + value, ok := acc.IntField("http_response", "http_response_code") + require.True(t, ok) + require.Equal(t, http.StatusOK, value) h = &HTTPResponse{ Address: ts.URL + "/badredirect", @@ -138,8 +141,12 @@ func TestRedirects(t *testing.T) { }, FollowRedirects: true, } - fields, err = h.HTTPGather() + acc = testutil.Accumulator{} + err = h.Gather(&acc) require.Error(t, err) + + value, ok = acc.IntField("http_response", "http_response_code") + require.False(t, ok) } func TestMethod(t *testing.T) { @@ -157,12 +164,13 @@ func TestMethod(t *testing.T) { }, FollowRedirects: true, } - fields, err := h.HTTPGather() + var acc testutil.Accumulator + err := h.Gather(&acc) require.NoError(t, err) - assert.NotEmpty(t, fields) - if assert.NotNil(t, fields["http_response_code"]) { - assert.Equal(t, http.StatusOK, fields["http_response_code"]) - } + + value, ok := acc.IntField("http_response", "http_response_code") + require.True(t, ok) + require.Equal(t, http.StatusOK, value) h = &HTTPResponse{ Address: ts.URL + "/mustbepostmethod", @@ -174,12 +182,13 @@ func TestMethod(t *testing.T) { }, FollowRedirects: true, } - fields, err = h.HTTPGather() + acc = testutil.Accumulator{} + err = h.Gather(&acc) require.NoError(t, err) - assert.NotEmpty(t, fields) - if assert.NotNil(t, fields["http_response_code"]) { - assert.Equal(t, http.StatusMethodNotAllowed, fields["http_response_code"]) - } + + value, ok = acc.IntField("http_response", "http_response_code") + require.True(t, ok) + require.Equal(t, http.StatusMethodNotAllowed, value) //check that lowercase methods work correctly h = &HTTPResponse{ @@ -192,12 +201,13 @@ func TestMethod(t *testing.T) { }, FollowRedirects: true, } - fields, err = h.HTTPGather() + acc = testutil.Accumulator{} + err = h.Gather(&acc) require.NoError(t, err) - assert.NotEmpty(t, fields) - if assert.NotNil(t, fields["http_response_code"]) { - assert.Equal(t, http.StatusMethodNotAllowed, fields["http_response_code"]) - } + + value, ok = acc.IntField("http_response", "http_response_code") + require.True(t, ok) + require.Equal(t, http.StatusMethodNotAllowed, value) } func TestBody(t *testing.T) { @@ -215,12 +225,13 @@ func TestBody(t *testing.T) { }, FollowRedirects: true, } - fields, err := h.HTTPGather() + var acc testutil.Accumulator + err := h.Gather(&acc) require.NoError(t, err) - assert.NotEmpty(t, fields) - if assert.NotNil(t, fields["http_response_code"]) { - assert.Equal(t, http.StatusOK, fields["http_response_code"]) - } + + value, ok := acc.IntField("http_response", "http_response_code") + require.True(t, ok) + require.Equal(t, http.StatusOK, value) h = &HTTPResponse{ Address: ts.URL + "/musthaveabody", @@ -231,12 +242,13 @@ func TestBody(t *testing.T) { }, FollowRedirects: true, } - fields, err = h.HTTPGather() + acc = testutil.Accumulator{} + err = h.Gather(&acc) require.NoError(t, err) - assert.NotEmpty(t, fields) - if assert.NotNil(t, fields["http_response_code"]) { - assert.Equal(t, http.StatusBadRequest, fields["http_response_code"]) - } + + value, ok = acc.IntField("http_response", "http_response_code") + require.True(t, ok) + require.Equal(t, http.StatusBadRequest, value) } func TestStringMatch(t *testing.T) { @@ -255,15 +267,18 @@ func TestStringMatch(t *testing.T) { }, FollowRedirects: true, } - fields, err := h.HTTPGather() + var acc testutil.Accumulator + err := h.Gather(&acc) require.NoError(t, err) - assert.NotEmpty(t, fields) - if assert.NotNil(t, fields["http_response_code"]) { - assert.Equal(t, http.StatusOK, fields["http_response_code"]) - } - assert.Equal(t, 1, fields["response_string_match"]) - assert.NotNil(t, fields["response_time"]) + value, ok := acc.IntField("http_response", "http_response_code") + require.True(t, ok) + require.Equal(t, http.StatusOK, value) + value, ok = acc.IntField("http_response", "response_string_match") + require.True(t, ok) + require.Equal(t, 1, value) + _, ok = acc.FloatField("http_response", "response_time") + require.True(t, ok) } func TestStringMatchJson(t *testing.T) { @@ -282,15 +297,18 @@ func TestStringMatchJson(t *testing.T) { }, FollowRedirects: true, } - fields, err := h.HTTPGather() + var acc testutil.Accumulator + err := h.Gather(&acc) require.NoError(t, err) - assert.NotEmpty(t, fields) - if assert.NotNil(t, fields["http_response_code"]) { - assert.Equal(t, http.StatusOK, fields["http_response_code"]) - } - assert.Equal(t, 1, fields["response_string_match"]) - assert.NotNil(t, fields["response_time"]) + value, ok := acc.IntField("http_response", "http_response_code") + require.True(t, ok) + require.Equal(t, http.StatusOK, value) + value, ok = acc.IntField("http_response", "response_string_match") + require.True(t, ok) + require.Equal(t, 1, value) + _, ok = acc.FloatField("http_response", "response_time") + require.True(t, ok) } func TestStringMatchFail(t *testing.T) { @@ -309,18 +327,26 @@ func TestStringMatchFail(t *testing.T) { }, FollowRedirects: true, } - fields, err := h.HTTPGather() - require.NoError(t, err) - assert.NotEmpty(t, fields) - if assert.NotNil(t, fields["http_response_code"]) { - assert.Equal(t, http.StatusOK, fields["http_response_code"]) - } - assert.Equal(t, 0, fields["response_string_match"]) - assert.NotNil(t, fields["response_time"]) + var acc testutil.Accumulator + err := h.Gather(&acc) + require.NoError(t, err) + + value, ok := acc.IntField("http_response", "http_response_code") + require.True(t, ok) + require.Equal(t, http.StatusOK, value) + value, ok = acc.IntField("http_response", "response_string_match") + require.True(t, ok) + require.Equal(t, 0, value) + _, ok = acc.FloatField("http_response", "response_time") + require.True(t, ok) } func TestTimeout(t *testing.T) { + if testing.Short() { + t.Skip("Skipping test with sleep in short mode.") + } + mux := setUpTestMux() ts := httptest.NewServer(mux) defer ts.Close() @@ -335,6 +361,10 @@ func TestTimeout(t *testing.T) { }, FollowRedirects: true, } - _, err := h.HTTPGather() - require.Error(t, err) + var acc testutil.Accumulator + err := h.Gather(&acc) + require.NoError(t, err) + + ok := acc.HasIntField("http_response", "http_response_code") + require.False(t, ok) } diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 2ff5bc667..c0f2caf22 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -417,3 +417,53 @@ func (a *Accumulator) HasMeasurement(measurement string) bool { } return false } + +func (a *Accumulator) IntField(measurement string, field string) (int, bool) { + a.Lock() + defer a.Unlock() + for _, p := range a.Metrics { + if p.Measurement == measurement { + for fieldname, value := range p.Fields { + if fieldname == field { + v, ok := value.(int) + return v, ok + } + } + } + } + + return 0, false +} + +func (a *Accumulator) FloatField(measurement string, field string) (float64, bool) { + a.Lock() + defer a.Unlock() + for _, p := range a.Metrics { + if p.Measurement == measurement { + for fieldname, value := range p.Fields { + if fieldname == field { + v, ok := value.(float64) + return v, ok + } + } + } + } + + return 0.0, false +} + +func (a *Accumulator) StringField(measurement string, field string) (string, bool) { + a.Lock() + defer a.Unlock() + for _, p := range a.Metrics { + if p.Measurement == measurement { + for fieldname, value := range p.Fields { + if fieldname == field { + v, ok := value.(string) + return v, ok + } + } + } + } + return "", false +} From f42768ed2e795dd7ba61a81f19f08a1c64962bc6 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 10 May 2017 13:11:33 -0700 Subject: [PATCH 4/5] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 55baaccef..9184bf4d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - [#2749](https://github.com/influxdata/telegraf/pull/2749): Fixed sqlserver input to work with case sensitive server collation. - [#2716](https://github.com/influxdata/telegraf/pull/2716): Systemd does not see all shutdowns as failures +- [#2782](https://github.com/influxdata/telegraf/pull/2782): Reuse transports in input plugins ## v1.3 [unreleased] From 1cc7fe7f3dd4e5c7355911db135bb57e7093fedb Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 10 May 2017 14:40:55 -0700 Subject: [PATCH 5/5] Ensure keep-alive is not used in http_response input. Using Keep-Alive would change the timing for already established connections. Previous to this commit, Keep-Alive worked only when using a response_string_match due to failure to close the request body. --- plugins/inputs/http_response/http_response.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/http_response/http_response.go b/plugins/inputs/http_response/http_response.go index 7dd043043..cd3d735d2 100644 --- a/plugins/inputs/http_response/http_response.go +++ b/plugins/inputs/http_response/http_response.go @@ -92,7 +92,8 @@ func (h *HTTPResponse) createHttpClient() (*http.Client, error) { } client := &http.Client{ Transport: &http.Transport{ - TLSClientConfig: tlsCfg, + DisableKeepAlives: true, + TLSClientConfig: tlsCfg, }, Timeout: h.ResponseTimeout.Duration, } @@ -140,6 +141,11 @@ func (h *HTTPResponse) httpGather() (map[string]interface{}, error) { return nil, err } } + defer func() { + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() + }() + fields["response_time"] = time.Since(start).Seconds() fields["http_response_code"] = resp.StatusCode