Merge branch 'reuse-transport'
This commit is contained in:
commit
0ed404e7ba
|
@ -9,6 +9,7 @@
|
||||||
|
|
||||||
- [#2749](https://github.com/influxdata/telegraf/pull/2749): Fixed sqlserver input to work with case sensitive server collation.
|
- [#2749](https://github.com/influxdata/telegraf/pull/2749): Fixed sqlserver input to work with case sensitive server collation.
|
||||||
- [#2716](https://github.com/influxdata/telegraf/pull/2716): Systemd does not see all shutdowns as failures
|
- [#2716](https://github.com/influxdata/telegraf/pull/2716): Systemd does not see all shutdowns as failures
|
||||||
|
- [#2782](https://github.com/influxdata/telegraf/pull/2782): Reuse transports in input plugins
|
||||||
|
|
||||||
## v1.3 [unreleased]
|
## v1.3 [unreleased]
|
||||||
|
|
||||||
|
|
|
@ -29,6 +29,8 @@ type Apache struct {
|
||||||
SSLKey string `toml:"ssl_key"`
|
SSLKey string `toml:"ssl_key"`
|
||||||
// Use SSL but skip chain & host verification
|
// Use SSL but skip chain & host verification
|
||||||
InsecureSkipVerify bool
|
InsecureSkipVerify bool
|
||||||
|
|
||||||
|
client *http.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
var sampleConfig = `
|
var sampleConfig = `
|
||||||
|
@ -66,6 +68,14 @@ func (n *Apache) Gather(acc telegraf.Accumulator) error {
|
||||||
n.ResponseTimeout.Duration = time.Second * 5
|
n.ResponseTimeout.Duration = time.Second * 5
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if n.client == nil {
|
||||||
|
client, err := n.createHttpClient()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
n.client = client
|
||||||
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(len(n.Urls))
|
wg.Add(len(n.Urls))
|
||||||
for _, u := range n.Urls {
|
for _, u := range n.Urls {
|
||||||
|
@ -85,31 +95,24 @@ func (n *Apache) Gather(acc telegraf.Accumulator) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
|
func (n *Apache) createHttpClient() (*http.Client, error) {
|
||||||
|
|
||||||
var tr *http.Transport
|
|
||||||
|
|
||||||
if addr.Scheme == "https" {
|
|
||||||
tlsCfg, err := internal.GetTLSConfig(
|
tlsCfg, err := internal.GetTLSConfig(
|
||||||
n.SSLCert, n.SSLKey, n.SSLCA, n.InsecureSkipVerify)
|
n.SSLCert, n.SSLKey, n.SSLCA, n.InsecureSkipVerify)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
|
||||||
tr = &http.Transport{
|
|
||||||
ResponseHeaderTimeout: time.Duration(3 * time.Second),
|
|
||||||
TLSClientConfig: tlsCfg,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
tr = &http.Transport{
|
|
||||||
ResponseHeaderTimeout: time.Duration(3 * time.Second),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
client := &http.Client{
|
client := &http.Client{
|
||||||
Transport: tr,
|
Transport: &http.Transport{
|
||||||
|
TLSClientConfig: tlsCfg,
|
||||||
|
},
|
||||||
Timeout: n.ResponseTimeout.Duration,
|
Timeout: n.ResponseTimeout.Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return client, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
|
||||||
req, err := http.NewRequest("GET", addr.String(), nil)
|
req, err := http.NewRequest("GET", addr.String(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error on new request to %s : %s\n", addr.String(), err)
|
return fmt.Errorf("error on new request to %s : %s\n", addr.String(), err)
|
||||||
|
@ -119,7 +122,7 @@ func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
|
||||||
req.SetBasicAuth(n.Username, n.Password)
|
req.SetBasicAuth(n.Username, n.Password)
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := client.Do(req)
|
resp, err := n.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error on request to %s : %s\n", addr.String(), err)
|
return fmt.Errorf("error on request to %s : %s\n", addr.String(), err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,6 @@ type HTTPResponse struct {
|
||||||
Headers map[string]string
|
Headers map[string]string
|
||||||
FollowRedirects bool
|
FollowRedirects bool
|
||||||
ResponseStringMatch string
|
ResponseStringMatch string
|
||||||
compiledStringMatch *regexp.Regexp
|
|
||||||
|
|
||||||
// Path to CA file
|
// Path to CA file
|
||||||
SSLCA string `toml:"ssl_ca"`
|
SSLCA string `toml:"ssl_ca"`
|
||||||
|
@ -35,6 +34,9 @@ type HTTPResponse struct {
|
||||||
SSLKey string `toml:"ssl_key"`
|
SSLKey string `toml:"ssl_key"`
|
||||||
// Use SSL but skip chain & host verification
|
// Use SSL but skip chain & host verification
|
||||||
InsecureSkipVerify bool
|
InsecureSkipVerify bool
|
||||||
|
|
||||||
|
compiledStringMatch *regexp.Regexp
|
||||||
|
client *http.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
// Description returns the plugin Description
|
// Description returns the plugin Description
|
||||||
|
@ -88,12 +90,11 @@ func (h *HTTPResponse) createHttpClient() (*http.Client, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
tr := &http.Transport{
|
|
||||||
ResponseHeaderTimeout: h.ResponseTimeout.Duration,
|
|
||||||
TLSClientConfig: tlsCfg,
|
|
||||||
}
|
|
||||||
client := &http.Client{
|
client := &http.Client{
|
||||||
Transport: tr,
|
Transport: &http.Transport{
|
||||||
|
DisableKeepAlives: true,
|
||||||
|
TLSClientConfig: tlsCfg,
|
||||||
|
},
|
||||||
Timeout: h.ResponseTimeout.Duration,
|
Timeout: h.ResponseTimeout.Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,15 +107,10 @@ func (h *HTTPResponse) createHttpClient() (*http.Client, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// HTTPGather gathers all fields and returns any errors it encounters
|
// HTTPGather gathers all fields and returns any errors it encounters
|
||||||
func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) {
|
func (h *HTTPResponse) httpGather() (map[string]interface{}, error) {
|
||||||
// Prepare fields
|
// Prepare fields
|
||||||
fields := make(map[string]interface{})
|
fields := make(map[string]interface{})
|
||||||
|
|
||||||
client, err := h.createHttpClient()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var body io.Reader
|
var body io.Reader
|
||||||
if h.Body != "" {
|
if h.Body != "" {
|
||||||
body = strings.NewReader(h.Body)
|
body = strings.NewReader(h.Body)
|
||||||
|
@ -133,7 +129,7 @@ func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) {
|
||||||
|
|
||||||
// Start Timer
|
// Start Timer
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
resp, err := client.Do(request)
|
resp, err := h.client.Do(request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if h.FollowRedirects {
|
if h.FollowRedirects {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -145,6 +141,11 @@ func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
defer func() {
|
||||||
|
io.Copy(ioutil.Discard, resp.Body)
|
||||||
|
resp.Body.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
fields["response_time"] = time.Since(start).Seconds()
|
fields["response_time"] = time.Since(start).Seconds()
|
||||||
fields["http_response_code"] = resp.StatusCode
|
fields["http_response_code"] = resp.StatusCode
|
||||||
|
|
||||||
|
@ -202,8 +203,17 @@ func (h *HTTPResponse) Gather(acc telegraf.Accumulator) error {
|
||||||
// Prepare data
|
// Prepare data
|
||||||
tags := map[string]string{"server": h.Address, "method": h.Method}
|
tags := map[string]string{"server": h.Address, "method": h.Method}
|
||||||
var fields map[string]interface{}
|
var fields map[string]interface{}
|
||||||
|
|
||||||
|
if h.client == nil {
|
||||||
|
client, err := h.createHttpClient()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
h.client = client
|
||||||
|
}
|
||||||
|
|
||||||
// Gather data
|
// Gather data
|
||||||
fields, err = h.HTTPGather()
|
fields, err = h.httpGather()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -73,13 +74,13 @@ func TestHeaders(t *testing.T) {
|
||||||
"Host": "Hello",
|
"Host": "Hello",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
fields, err := h.HTTPGather()
|
var acc testutil.Accumulator
|
||||||
|
err := h.Gather(&acc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.NotEmpty(t, fields)
|
|
||||||
if assert.NotNil(t, fields["http_response_code"]) {
|
value, ok := acc.IntField("http_response", "http_response_code")
|
||||||
assert.Equal(t, http.StatusOK, fields["http_response_code"])
|
require.True(t, ok)
|
||||||
}
|
require.Equal(t, http.StatusOK, value)
|
||||||
assert.NotNil(t, fields["response_time"])
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFields(t *testing.T) {
|
func TestFields(t *testing.T) {
|
||||||
|
@ -97,13 +98,14 @@ func TestFields(t *testing.T) {
|
||||||
},
|
},
|
||||||
FollowRedirects: true,
|
FollowRedirects: true,
|
||||||
}
|
}
|
||||||
fields, err := h.HTTPGather()
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
err := h.Gather(&acc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.NotEmpty(t, fields)
|
|
||||||
if assert.NotNil(t, fields["http_response_code"]) {
|
value, ok := acc.IntField("http_response", "http_response_code")
|
||||||
assert.Equal(t, http.StatusOK, fields["http_response_code"])
|
require.True(t, ok)
|
||||||
}
|
require.Equal(t, http.StatusOK, value)
|
||||||
assert.NotNil(t, fields["response_time"])
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRedirects(t *testing.T) {
|
func TestRedirects(t *testing.T) {
|
||||||
|
@ -121,12 +123,13 @@ func TestRedirects(t *testing.T) {
|
||||||
},
|
},
|
||||||
FollowRedirects: true,
|
FollowRedirects: true,
|
||||||
}
|
}
|
||||||
fields, err := h.HTTPGather()
|
var acc testutil.Accumulator
|
||||||
|
err := h.Gather(&acc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.NotEmpty(t, fields)
|
|
||||||
if assert.NotNil(t, fields["http_response_code"]) {
|
value, ok := acc.IntField("http_response", "http_response_code")
|
||||||
assert.Equal(t, http.StatusOK, fields["http_response_code"])
|
require.True(t, ok)
|
||||||
}
|
require.Equal(t, http.StatusOK, value)
|
||||||
|
|
||||||
h = &HTTPResponse{
|
h = &HTTPResponse{
|
||||||
Address: ts.URL + "/badredirect",
|
Address: ts.URL + "/badredirect",
|
||||||
|
@ -138,8 +141,12 @@ func TestRedirects(t *testing.T) {
|
||||||
},
|
},
|
||||||
FollowRedirects: true,
|
FollowRedirects: true,
|
||||||
}
|
}
|
||||||
fields, err = h.HTTPGather()
|
acc = testutil.Accumulator{}
|
||||||
|
err = h.Gather(&acc)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
|
|
||||||
|
value, ok = acc.IntField("http_response", "http_response_code")
|
||||||
|
require.False(t, ok)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMethod(t *testing.T) {
|
func TestMethod(t *testing.T) {
|
||||||
|
@ -157,12 +164,13 @@ func TestMethod(t *testing.T) {
|
||||||
},
|
},
|
||||||
FollowRedirects: true,
|
FollowRedirects: true,
|
||||||
}
|
}
|
||||||
fields, err := h.HTTPGather()
|
var acc testutil.Accumulator
|
||||||
|
err := h.Gather(&acc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.NotEmpty(t, fields)
|
|
||||||
if assert.NotNil(t, fields["http_response_code"]) {
|
value, ok := acc.IntField("http_response", "http_response_code")
|
||||||
assert.Equal(t, http.StatusOK, fields["http_response_code"])
|
require.True(t, ok)
|
||||||
}
|
require.Equal(t, http.StatusOK, value)
|
||||||
|
|
||||||
h = &HTTPResponse{
|
h = &HTTPResponse{
|
||||||
Address: ts.URL + "/mustbepostmethod",
|
Address: ts.URL + "/mustbepostmethod",
|
||||||
|
@ -174,12 +182,13 @@ func TestMethod(t *testing.T) {
|
||||||
},
|
},
|
||||||
FollowRedirects: true,
|
FollowRedirects: true,
|
||||||
}
|
}
|
||||||
fields, err = h.HTTPGather()
|
acc = testutil.Accumulator{}
|
||||||
|
err = h.Gather(&acc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.NotEmpty(t, fields)
|
|
||||||
if assert.NotNil(t, fields["http_response_code"]) {
|
value, ok = acc.IntField("http_response", "http_response_code")
|
||||||
assert.Equal(t, http.StatusMethodNotAllowed, fields["http_response_code"])
|
require.True(t, ok)
|
||||||
}
|
require.Equal(t, http.StatusMethodNotAllowed, value)
|
||||||
|
|
||||||
//check that lowercase methods work correctly
|
//check that lowercase methods work correctly
|
||||||
h = &HTTPResponse{
|
h = &HTTPResponse{
|
||||||
|
@ -192,12 +201,13 @@ func TestMethod(t *testing.T) {
|
||||||
},
|
},
|
||||||
FollowRedirects: true,
|
FollowRedirects: true,
|
||||||
}
|
}
|
||||||
fields, err = h.HTTPGather()
|
acc = testutil.Accumulator{}
|
||||||
|
err = h.Gather(&acc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.NotEmpty(t, fields)
|
|
||||||
if assert.NotNil(t, fields["http_response_code"]) {
|
value, ok = acc.IntField("http_response", "http_response_code")
|
||||||
assert.Equal(t, http.StatusMethodNotAllowed, fields["http_response_code"])
|
require.True(t, ok)
|
||||||
}
|
require.Equal(t, http.StatusMethodNotAllowed, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBody(t *testing.T) {
|
func TestBody(t *testing.T) {
|
||||||
|
@ -215,12 +225,13 @@ func TestBody(t *testing.T) {
|
||||||
},
|
},
|
||||||
FollowRedirects: true,
|
FollowRedirects: true,
|
||||||
}
|
}
|
||||||
fields, err := h.HTTPGather()
|
var acc testutil.Accumulator
|
||||||
|
err := h.Gather(&acc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.NotEmpty(t, fields)
|
|
||||||
if assert.NotNil(t, fields["http_response_code"]) {
|
value, ok := acc.IntField("http_response", "http_response_code")
|
||||||
assert.Equal(t, http.StatusOK, fields["http_response_code"])
|
require.True(t, ok)
|
||||||
}
|
require.Equal(t, http.StatusOK, value)
|
||||||
|
|
||||||
h = &HTTPResponse{
|
h = &HTTPResponse{
|
||||||
Address: ts.URL + "/musthaveabody",
|
Address: ts.URL + "/musthaveabody",
|
||||||
|
@ -231,12 +242,13 @@ func TestBody(t *testing.T) {
|
||||||
},
|
},
|
||||||
FollowRedirects: true,
|
FollowRedirects: true,
|
||||||
}
|
}
|
||||||
fields, err = h.HTTPGather()
|
acc = testutil.Accumulator{}
|
||||||
|
err = h.Gather(&acc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.NotEmpty(t, fields)
|
|
||||||
if assert.NotNil(t, fields["http_response_code"]) {
|
value, ok = acc.IntField("http_response", "http_response_code")
|
||||||
assert.Equal(t, http.StatusBadRequest, fields["http_response_code"])
|
require.True(t, ok)
|
||||||
}
|
require.Equal(t, http.StatusBadRequest, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStringMatch(t *testing.T) {
|
func TestStringMatch(t *testing.T) {
|
||||||
|
@ -255,15 +267,18 @@ func TestStringMatch(t *testing.T) {
|
||||||
},
|
},
|
||||||
FollowRedirects: true,
|
FollowRedirects: true,
|
||||||
}
|
}
|
||||||
fields, err := h.HTTPGather()
|
var acc testutil.Accumulator
|
||||||
|
err := h.Gather(&acc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.NotEmpty(t, fields)
|
|
||||||
if assert.NotNil(t, fields["http_response_code"]) {
|
|
||||||
assert.Equal(t, http.StatusOK, fields["http_response_code"])
|
|
||||||
}
|
|
||||||
assert.Equal(t, 1, fields["response_string_match"])
|
|
||||||
assert.NotNil(t, fields["response_time"])
|
|
||||||
|
|
||||||
|
value, ok := acc.IntField("http_response", "http_response_code")
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, http.StatusOK, value)
|
||||||
|
value, ok = acc.IntField("http_response", "response_string_match")
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, 1, value)
|
||||||
|
_, ok = acc.FloatField("http_response", "response_time")
|
||||||
|
require.True(t, ok)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStringMatchJson(t *testing.T) {
|
func TestStringMatchJson(t *testing.T) {
|
||||||
|
@ -282,15 +297,18 @@ func TestStringMatchJson(t *testing.T) {
|
||||||
},
|
},
|
||||||
FollowRedirects: true,
|
FollowRedirects: true,
|
||||||
}
|
}
|
||||||
fields, err := h.HTTPGather()
|
var acc testutil.Accumulator
|
||||||
|
err := h.Gather(&acc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.NotEmpty(t, fields)
|
|
||||||
if assert.NotNil(t, fields["http_response_code"]) {
|
|
||||||
assert.Equal(t, http.StatusOK, fields["http_response_code"])
|
|
||||||
}
|
|
||||||
assert.Equal(t, 1, fields["response_string_match"])
|
|
||||||
assert.NotNil(t, fields["response_time"])
|
|
||||||
|
|
||||||
|
value, ok := acc.IntField("http_response", "http_response_code")
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, http.StatusOK, value)
|
||||||
|
value, ok = acc.IntField("http_response", "response_string_match")
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, 1, value)
|
||||||
|
_, ok = acc.FloatField("http_response", "response_time")
|
||||||
|
require.True(t, ok)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStringMatchFail(t *testing.T) {
|
func TestStringMatchFail(t *testing.T) {
|
||||||
|
@ -309,18 +327,26 @@ func TestStringMatchFail(t *testing.T) {
|
||||||
},
|
},
|
||||||
FollowRedirects: true,
|
FollowRedirects: true,
|
||||||
}
|
}
|
||||||
fields, err := h.HTTPGather()
|
|
||||||
require.NoError(t, err)
|
|
||||||
assert.NotEmpty(t, fields)
|
|
||||||
if assert.NotNil(t, fields["http_response_code"]) {
|
|
||||||
assert.Equal(t, http.StatusOK, fields["http_response_code"])
|
|
||||||
}
|
|
||||||
assert.Equal(t, 0, fields["response_string_match"])
|
|
||||||
assert.NotNil(t, fields["response_time"])
|
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
err := h.Gather(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
value, ok := acc.IntField("http_response", "http_response_code")
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, http.StatusOK, value)
|
||||||
|
value, ok = acc.IntField("http_response", "response_string_match")
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, 0, value)
|
||||||
|
_, ok = acc.FloatField("http_response", "response_time")
|
||||||
|
require.True(t, ok)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTimeout(t *testing.T) {
|
func TestTimeout(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping test with sleep in short mode.")
|
||||||
|
}
|
||||||
|
|
||||||
mux := setUpTestMux()
|
mux := setUpTestMux()
|
||||||
ts := httptest.NewServer(mux)
|
ts := httptest.NewServer(mux)
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
@ -335,6 +361,10 @@ func TestTimeout(t *testing.T) {
|
||||||
},
|
},
|
||||||
FollowRedirects: true,
|
FollowRedirects: true,
|
||||||
}
|
}
|
||||||
_, err := h.HTTPGather()
|
var acc testutil.Accumulator
|
||||||
require.Error(t, err)
|
err := h.Gather(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
ok := acc.HasIntField("http_response", "http_response_code")
|
||||||
|
require.False(t, ok)
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -32,6 +31,8 @@ type Prometheus struct {
|
||||||
SSLKey string `toml:"ssl_key"`
|
SSLKey string `toml:"ssl_key"`
|
||||||
// Use SSL but skip chain & host verification
|
// Use SSL but skip chain & host verification
|
||||||
InsecureSkipVerify bool
|
InsecureSkipVerify bool
|
||||||
|
|
||||||
|
client *http.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
var sampleConfig = `
|
var sampleConfig = `
|
||||||
|
@ -65,6 +66,14 @@ var ErrProtocolError = errors.New("prometheus protocol error")
|
||||||
// Reads stats from all configured servers accumulates stats.
|
// Reads stats from all configured servers accumulates stats.
|
||||||
// Returns one of the errors encountered while gather stats (if any).
|
// Returns one of the errors encountered while gather stats (if any).
|
||||||
func (p *Prometheus) Gather(acc telegraf.Accumulator) error {
|
func (p *Prometheus) Gather(acc telegraf.Accumulator) error {
|
||||||
|
if p.client == nil {
|
||||||
|
client, err := p.createHttpClient()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
p.client = client
|
||||||
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
for _, serv := range p.Urls {
|
for _, serv := range p.Urls {
|
||||||
|
@ -89,29 +98,30 @@ var client = &http.Client{
|
||||||
Timeout: time.Duration(4 * time.Second),
|
Timeout: time.Duration(4 * time.Second),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Prometheus) createHttpClient() (*http.Client, error) {
|
||||||
|
tlsCfg, err := internal.GetTLSConfig(
|
||||||
|
p.SSLCert, p.SSLKey, p.SSLCA, p.InsecureSkipVerify)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
client := &http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
TLSClientConfig: tlsCfg,
|
||||||
|
DisableKeepAlives: true,
|
||||||
|
},
|
||||||
|
Timeout: p.ResponseTimeout.Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
return client, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
|
func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
|
||||||
var req, err = http.NewRequest("GET", url, nil)
|
var req, err = http.NewRequest("GET", url, nil)
|
||||||
req.Header.Add("Accept", acceptHeader)
|
req.Header.Add("Accept", acceptHeader)
|
||||||
var token []byte
|
var token []byte
|
||||||
var resp *http.Response
|
var resp *http.Response
|
||||||
|
|
||||||
tlsCfg, err := internal.GetTLSConfig(
|
|
||||||
p.SSLCert, p.SSLKey, p.SSLCA, p.InsecureSkipVerify)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var rt http.RoundTripper = &http.Transport{
|
|
||||||
Dial: (&net.Dialer{
|
|
||||||
Timeout: 5 * time.Second,
|
|
||||||
KeepAlive: 30 * time.Second,
|
|
||||||
}).Dial,
|
|
||||||
TLSHandshakeTimeout: 5 * time.Second,
|
|
||||||
TLSClientConfig: tlsCfg,
|
|
||||||
ResponseHeaderTimeout: p.ResponseTimeout.Duration,
|
|
||||||
DisableKeepAlives: true,
|
|
||||||
}
|
|
||||||
|
|
||||||
if p.BearerToken != "" {
|
if p.BearerToken != "" {
|
||||||
token, err = ioutil.ReadFile(p.BearerToken)
|
token, err = ioutil.ReadFile(p.BearerToken)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -120,7 +130,7 @@ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
|
||||||
req.Header.Set("Authorization", "Bearer "+string(token))
|
req.Header.Set("Authorization", "Bearer "+string(token))
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err = rt.RoundTrip(req)
|
resp, err = p.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
|
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -417,3 +417,53 @@ func (a *Accumulator) HasMeasurement(measurement string) bool {
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *Accumulator) IntField(measurement string, field string) (int, bool) {
|
||||||
|
a.Lock()
|
||||||
|
defer a.Unlock()
|
||||||
|
for _, p := range a.Metrics {
|
||||||
|
if p.Measurement == measurement {
|
||||||
|
for fieldname, value := range p.Fields {
|
||||||
|
if fieldname == field {
|
||||||
|
v, ok := value.(int)
|
||||||
|
return v, ok
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Accumulator) FloatField(measurement string, field string) (float64, bool) {
|
||||||
|
a.Lock()
|
||||||
|
defer a.Unlock()
|
||||||
|
for _, p := range a.Metrics {
|
||||||
|
if p.Measurement == measurement {
|
||||||
|
for fieldname, value := range p.Fields {
|
||||||
|
if fieldname == field {
|
||||||
|
v, ok := value.(float64)
|
||||||
|
return v, ok
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0.0, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Accumulator) StringField(measurement string, field string) (string, bool) {
|
||||||
|
a.Lock()
|
||||||
|
defer a.Unlock()
|
||||||
|
for _, p := range a.Metrics {
|
||||||
|
if p.Measurement == measurement {
|
||||||
|
for fieldname, value := range p.Fields {
|
||||||
|
if fieldname == field {
|
||||||
|
v, ok := value.(string)
|
||||||
|
return v, ok
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue