Allow connecting to prometheus via unix socket (#4798)

This commit is contained in:
Greg 2018-11-02 18:51:40 -06:00 committed by Daniel Nelson
parent ad5fcf8efb
commit ddcbfe79bb
3 changed files with 45 additions and 18 deletions

View File

@ -28,6 +28,8 @@ in Prometheus format.
# insecure_skip_verify = false # insecure_skip_verify = false
``` ```
`urls` can contain a unix socket as well. If a different path is required (default is `/metrics` for both http[s] and unix) for a unix socket, add `path` as a query parameter as follows: `unix:///var/run/prometheus.sock?path=/custom/metrics`
#### Kubernetes Service Discovery #### Kubernetes Service Discovery
URLs listed in the `kubernetes_services` parameter will be expanded URLs listed in the `kubernetes_services` parameter will be expanded

View File

@ -125,7 +125,7 @@ func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) {
// Returns one of the errors encountered while gather stats (if any). // Returns one of the errors encountered while gather stats (if any).
func (p *Prometheus) Gather(acc telegraf.Accumulator) error { func (p *Prometheus) Gather(acc telegraf.Accumulator) error {
if p.client == nil { if p.client == nil {
client, err := p.createHttpClient() client, err := p.createHTTPClient()
if err != nil { if err != nil {
return err return err
} }
@ -151,16 +151,7 @@ func (p *Prometheus) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
var tr = &http.Transport{ func (p *Prometheus) createHTTPClient() (*http.Client, error) {
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}
var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
func (p *Prometheus) createHttpClient() (*http.Client, error) {
tlsCfg, err := p.ClientConfig.TLSConfig() tlsCfg, err := p.ClientConfig.TLSConfig()
if err != nil { if err != nil {
return nil, err return nil, err
@ -178,11 +169,39 @@ func (p *Prometheus) createHttpClient() (*http.Client, error) {
} }
func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error { func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error {
var req, err = http.NewRequest("GET", u.URL.String(), nil) var req *http.Request
req.Header.Add("Accept", acceptHeader) var err error
var token []byte var uClient *http.Client
var resp *http.Response if u.URL.Scheme == "unix" {
path := u.URL.Query().Get("path")
if path == "" {
path = "/metrics"
}
req, err = http.NewRequest("GET", "http://localhost"+path, nil)
// ignore error because it's been handled before getting here
tlsCfg, _ := p.ClientConfig.TLSConfig()
uClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
DisableKeepAlives: true,
Dial: func(network, addr string) (net.Conn, error) {
c, err := net.Dial("unix", u.URL.Path)
return c, err
},
},
Timeout: p.ResponseTimeout.Duration,
}
} else {
if u.URL.Path == "" {
u.URL.Path = "/metrics"
}
req, err = http.NewRequest("GET", u.URL.String(), nil)
}
req.Header.Add("Accept", acceptHeader)
var token []byte
if p.BearerToken != "" { if p.BearerToken != "" {
token, err = ioutil.ReadFile(p.BearerToken) token, err = ioutil.ReadFile(p.BearerToken)
if err != nil { if err != nil {
@ -191,11 +210,17 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
req.Header.Set("Authorization", "Bearer "+string(token)) req.Header.Set("Authorization", "Bearer "+string(token))
} }
var resp *http.Response
if u.URL.Scheme != "unix" {
resp, err = p.client.Do(req) resp, err = p.client.Do(req)
} else {
resp, err = uClient.Do(req)
}
if err != nil { if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", u.URL, err) return fmt.Errorf("error making HTTP request to %s: %s", u.URL, err)
} }
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status %s", u.URL, resp.Status) return fmt.Errorf("%s returned HTTP status %s", u.URL, resp.Status)
} }
@ -210,7 +235,7 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
return fmt.Errorf("error reading metrics for %s: %s", return fmt.Errorf("error reading metrics for %s: %s",
u.URL, err) u.URL, err)
} }
// Add (or not) collected metrics
for _, metric := range metrics { for _, metric := range metrics {
tags := metric.Tags() tags := metric.Tags()
// strip user and password from URL // strip user and password from URL

View File

@ -50,7 +50,7 @@ func TestPrometheusGeneratesMetrics(t *testing.T) {
assert.True(t, acc.HasFloatField("test_metric", "value")) assert.True(t, acc.HasFloatField("test_metric", "value"))
assert.True(t, acc.HasTimestamp("test_metric", time.Unix(1490802350, 0))) assert.True(t, acc.HasTimestamp("test_metric", time.Unix(1490802350, 0)))
assert.False(t, acc.HasTag("test_metric", "address")) assert.False(t, acc.HasTag("test_metric", "address"))
assert.True(t, acc.TagValue("test_metric", "url") == ts.URL) assert.True(t, acc.TagValue("test_metric", "url") == ts.URL+"/metrics")
} }
func TestPrometheusGeneratesMetricsWithHostNameTag(t *testing.T) { func TestPrometheusGeneratesMetricsWithHostNameTag(t *testing.T) {