Add support for sending HTTP Basic Auth in influxdb input (#6668)

This commit is contained in:
Daniel Nelson 2019-11-20 12:20:48 -08:00 committed by reimda
parent 70ff63060a
commit 8e0eb5a7db
3 changed files with 95 additions and 5 deletions

View File

@ -20,6 +20,10 @@ InfluxDB-formatted endpoints. See below for more information.
"http://localhost:8086/debug/vars" "http://localhost:8086/debug/vars"
] ]
## Username and password to send using HTTP Basic Authentication.
# username = ""
# password = ""
## Optional TLS Config ## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem" # tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem" # tls_cert = "/etc/telegraf/cert.pem"

View File

@ -1,9 +1,10 @@
package influxdb package influxdb
import ( import (
"bytes"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "io"
"net/http" "net/http"
"sync" "sync"
"time" "time"
@ -14,9 +15,28 @@ import (
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
const (
maxErrorResponseBodyLength = 1024
)
type APIError struct {
StatusCode int
Reason string
Description string `json:"error"`
}
func (e *APIError) Error() string {
if e.Description != "" {
return e.Reason + ": " + e.Description
}
return e.Reason
}
type InfluxDB struct { type InfluxDB struct {
URLs []string `toml:"urls"` URLs []string `toml:"urls"`
Timeout internal.Duration Username string `toml:"username"`
Password string `toml:"password"`
Timeout internal.Duration `toml:"timeout"`
tls.ClientConfig tls.ClientConfig
client *http.Client client *http.Client
@ -38,6 +58,10 @@ func (*InfluxDB) SampleConfig() string {
"http://localhost:8086/debug/vars" "http://localhost:8086/debug/vars"
] ]
## Username and password to send using HTTP Basic Authentication.
# username = ""
# password = ""
## Optional TLS Config ## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem" # tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem" # tls_cert = "/etc/telegraf/cert.pem"
@ -75,7 +99,7 @@ func (i *InfluxDB) Gather(acc telegraf.Accumulator) error {
go func(url string) { go func(url string) {
defer wg.Done() defer wg.Done()
if err := i.gatherURL(acc, url); err != nil { if err := i.gatherURL(acc, url); err != nil {
acc.AddError(fmt.Errorf("[url=%s]: %s", url, err)) acc.AddError(err)
} }
}(u) }(u)
} }
@ -135,12 +159,27 @@ func (i *InfluxDB) gatherURL(
shardCounter := 0 shardCounter := 0
now := time.Now() now := time.Now()
resp, err := i.client.Get(url) req, err := http.NewRequest("GET", url, nil)
if err != nil {
return err
}
if i.Username != "" || i.Password != "" {
req.SetBasicAuth(i.Username, i.Password)
}
req.Header.Set("User-Agent", "Telegraf/"+internal.Version())
resp, err := i.client.Do(req)
if err != nil { if err != nil {
return err return err
} }
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return readResponseError(resp)
}
// It would be nice to be able to decode into a map[string]point, but // It would be nice to be able to decode into a map[string]point, but
// we'll get a decoder error like: // we'll get a decoder error like:
// `json: cannot unmarshal array into Go value of type influxdb.point` // `json: cannot unmarshal array into Go value of type influxdb.point`
@ -255,6 +294,27 @@ func (i *InfluxDB) gatherURL(
return nil return nil
} }
func readResponseError(resp *http.Response) error {
apiError := &APIError{
StatusCode: resp.StatusCode,
Reason: resp.Status,
}
var buf bytes.Buffer
r := io.LimitReader(resp.Body, maxErrorResponseBodyLength)
_, err := buf.ReadFrom(r)
if err != nil {
return apiError
}
err = json.Unmarshal(buf.Bytes(), apiError)
if err != nil {
return apiError
}
return apiError
}
func init() { func init() {
inputs.Add("influxdb", func() telegraf.Input { inputs.Add("influxdb", func() telegraf.Input {
return &InfluxDB{ return &InfluxDB{

View File

@ -1,6 +1,7 @@
package influxdb_test package influxdb_test
import ( import (
"fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"testing" "testing"
@ -178,6 +179,31 @@ func TestErrorHandling404(t *testing.T) {
require.Error(t, acc.GatherError(plugin.Gather)) require.Error(t, acc.GatherError(plugin.Gather))
} }
func TestErrorResponse(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte(`{"error": "unable to parse authentication credentials"}`))
}))
defer ts.Close()
plugin := &influxdb.InfluxDB{
URLs: []string{ts.URL},
}
var acc testutil.Accumulator
err := plugin.Gather(&acc)
require.NoError(t, err)
expected := []error{
&influxdb.APIError{
StatusCode: http.StatusUnauthorized,
Reason: fmt.Sprintf("%d %s", http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized)),
Description: "unable to parse authentication credentials",
},
}
require.Equal(t, expected, acc.Errors)
}
const basicJSON = ` const basicJSON = `
{ {
"_1": { "_1": {