Add method, basic auth, and tls support to http output

This commit is contained in:
Daniel Nelson 2018-05-14 17:11:44 -07:00
parent ca11b6328e
commit 797c62c790
3 changed files with 351 additions and 285 deletions

View File

@ -1,28 +1,41 @@
# HTTP Output Plugin
This plugin writes to a HTTP Server using the `POST Method`.
Data collected from telegraf is sent in the Request Body.
This plugin sends metrics in a HTTP message encoded using one of the output
data formats. For data_formats that support batching, metrics are sent in batch format.
### Configuration:
```toml
# Send telegraf metrics to HTTP Server(s)
# A plugin that can transmit metrics over HTTP
[[outputs.http]]
## It requires a url name.
## Will be transmitted telegraf metrics to the HTTP Server using the below URL.
## Note that not support the HTTPS.
## URL is the address to send metrics to
url = "http://127.0.0.1:8080/metric"
## Configure dial timeout in seconds. Default : 3
timeout = 3
## http_headers option can add a custom header to the request.
## Content-Type is required http header in http plugin.
## so content-type of HTTP specification (plain/text, application/json, etc...) must be filled out.
[outputs.http.headers]
Content-Type = "plain/text"
## Timeout for HTTP message
# timeout = "5s"
## HTTP method, one of: "POST" or "PUT"
# method = "POST"
## HTTP Basic Auth credentials
# username = "username"
# password = "pa$$word"
## Additional HTTP headers
# [outputs.http.headers]
# # Should be set manually to "application/json" for json data_format
# Content-Type = "text/plain; charset=utf-8"
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Data format to output.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
# data_format = "influx"
```

View File

@ -3,103 +3,119 @@ package http
import (
"bytes"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)
var sampleConfig = `
## It requires a url name.
## Will be transmitted telegraf metrics to the HTTP Server using the below URL.
## Note that not support the HTTPS.
## URL is the address to send metrics to
url = "http://127.0.0.1:8080/metric"
## Configure dial timeout in seconds. Default : 3
timeout = 3
## http_headers option can add a custom header to the request.
## Content-Type is required http header in http plugin.
## so content-type of HTTP specification (plain/text, application/json, etc...) must be filled out.
[outputs.http.headers]
Content-Type = "plain/text"
## Timeout for HTTP message
# timeout = "5s"
## HTTP method, one of: "POST" or "PUT"
# method = "POST"
## HTTP Basic Auth credentials
# username = "username"
# password = "pa$$word"
## Additional HTTP headers
# [outputs.http.headers]
# # Should be set to "application/json" for json data_format
# Content-Type = "text/plain; charset=utf-8"
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Data format to output.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
# data_format = "influx"
`
const (
POST = "POST"
DEFAULT_TIME_OUT = 3
CONTENT_TYPE = "content-type"
defaultClientTimeout = 5 * time.Second
defaultContentType = "text/plain; charset=utf-8"
defaultMethod = http.MethodPost
)
type Http struct {
// http required option
type HTTP struct {
URL string `toml:"url"`
Headers map[string]string
Timeout internal.Duration `toml:"timeout"`
Method string `toml:"method"`
Username string `toml:"username"`
Password string `toml:"password"`
Headers map[string]string `toml:"headers"`
tls.ClientConfig
// Option with http default value
Timeout int `toml:"timeout"`
client http.Client
client *http.Client
serializer serializers.Serializer
}
func (h *Http) SetSerializer(serializer serializers.Serializer) {
func (h *HTTP) SetSerializer(serializer serializers.Serializer) {
h.serializer = serializer
}
// Connect to the Output
func (h *Http) Connect() error {
h.client = http.Client{
func (h *HTTP) Connect() error {
if h.Method == "" {
h.Method = http.MethodPost
}
h.Method = strings.ToUpper(h.Method)
if h.Method != http.MethodPost && h.Method != http.MethodPut {
return fmt.Errorf("invalid method [%s] %s", h.URL, h.Method)
}
if h.Timeout.Duration == 0 {
h.Timeout.Duration = defaultClientTimeout
}
tlsCfg, err := h.ClientConfig.TLSConfig()
if err != nil {
return err
}
h.client = &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
Proxy: http.ProxyFromEnvironment,
},
Timeout: time.Duration(h.Timeout) * time.Second,
}
var isValid bool
for k := range h.Headers {
if strings.ToLower(k) == CONTENT_TYPE {
isValid = true
}
}
if !isValid {
return fmt.Errorf("E! httpHeader require content-type!")
Timeout: h.Timeout.Duration,
}
return nil
}
// Close is not implemented. Because http.Client not provided connection close policy. Instead, uses the response.Body.Close() pattern.
func (h *Http) Close() error {
func (h *HTTP) Close() error {
return nil
}
// Description A plugin that can transmit metrics over HTTP
func (h *Http) Description() string {
func (h *HTTP) Description() string {
return "A plugin that can transmit metrics over HTTP"
}
// SampleConfig provides sample example for developer
func (h *Http) SampleConfig() string {
func (h *HTTP) SampleConfig() string {
return sampleConfig
}
// Writes metrics over HTTP POST
func (h *Http) Write(metrics []telegraf.Metric) error {
func (h *HTTP) Write(metrics []telegraf.Metric) error {
reqBody, err := h.serializer.SerializeBatch(metrics)
if err != nil {
return fmt.Errorf("E! Error serializing some metrics: %s", err.Error())
return err
}
if err := h.write(reqBody); err != nil {
@ -109,9 +125,10 @@ func (h *Http) Write(metrics []telegraf.Metric) error {
return nil
}
func (h *Http) write(reqBody []byte) error {
req, err := http.NewRequest(POST, h.URL, bytes.NewBuffer(reqBody))
func (h *HTTP) write(reqBody []byte) error {
req, err := http.NewRequest(h.Method, h.URL, bytes.NewBuffer(reqBody))
req.Header.Set("Content-Type", defaultContentType)
for k, v := range h.Headers {
req.Header.Set(k, v)
}
@ -123,20 +140,8 @@ func (h *Http) write(reqBody []byte) error {
defer resp.Body.Close()
_, err = ioutil.ReadAll(resp.Body)
if err := h.isOk(resp, err); err != nil {
return err
}
return nil
}
func (h *Http) isOk(resp *http.Response, err error) error {
if resp == nil || err != nil {
return fmt.Errorf("E! %s request failed! %s.", h.URL, err.Error())
}
if resp.StatusCode < 200 || resp.StatusCode > 209 {
return fmt.Errorf("received bad status code, %d\n", resp.StatusCode)
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("when writing to [%s] received status code: %d", h.URL, resp.StatusCode)
}
return nil
@ -144,8 +149,9 @@ func (h *Http) isOk(resp *http.Response, err error) error {
func init() {
outputs.Add("http", func() telegraf.Output {
return &Http{
Timeout: DEFAULT_TIME_OUT,
return &HTTP{
Timeout: internal.Duration{Duration: defaultClientTimeout},
Method: defaultMethod,
}
})
}

View File

@ -2,243 +2,290 @@ package http
import (
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/serializers/graphite"
"github.com/influxdata/telegraf/plugins/serializers/json"
"github.com/stretchr/testify/assert"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/stretchr/testify/require"
)
var (
cpuTags = map[string]string{
"host": "localhost",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
cpuField = map[string]interface{}{
"usage_idle": float64(91.5),
}
memTags = map[string]string{
"host": "localhost",
"cpu": "mem",
"datacenter": "us-west-2",
}
memField = map[string]interface{}{
"used": float64(91.5),
}
count int
)
type TestOkHandler struct {
T *testing.T
Expected []string
}
// The handler gets a new variable each time it receives a request, so it fetches an expected string based on global variable.
func (h TestOkHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
actual, _ := ioutil.ReadAll(r.Body)
assert.Equal(h.T, h.Expected[count], string(actual), fmt.Sprintf("%d Expected fail!", count))
count++
fmt.Fprint(w, "ok")
}
type TestNotFoundHandler struct {
}
func (h TestNotFoundHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.NotFound(w, r)
}
func TestWriteAllInputMetric(t *testing.T) {
now := time.Now()
server := httptest.NewServer(&TestOkHandler{
T: t,
Expected: []string{
fmt.Sprintf("telegraf.cpu0.us-west-2.localhost.cpu.usage_idle 91.5 %d\ntelegraf.mem.us-west-2.localhost.mem.used 91.5 %d\n", now.Unix(), now.Unix()),
func getMetric() telegraf.Metric {
m, err := metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
})
defer server.Close()
defer resetCount()
time.Unix(0, 0),
)
if err != nil {
panic(err)
}
return m
}
m1, _ := metric.New("cpu", cpuTags, cpuField, now)
m2, _ := metric.New("mem", memTags, memField, now)
metrics := []telegraf.Metric{m1, m2}
func TestInvalidMethod(t *testing.T) {
plugin := &HTTP{
URL: "",
Method: http.MethodGet,
}
http := &Http{
URL: server.URL,
Headers: map[string]string{
"Content-Type": "plain/text",
err := plugin.Connect()
require.Error(t, err)
}
func TestMethod(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)
tests := []struct {
name string
plugin *HTTP
expectedMethod string
connectError bool
}{
{
name: "default method is POST",
plugin: &HTTP{
URL: u.String(),
Method: defaultMethod,
},
expectedMethod: http.MethodPost,
},
{
name: "put is okay",
plugin: &HTTP{
URL: u.String(),
Method: http.MethodPut,
},
expectedMethod: http.MethodPut,
},
{
name: "get is invalid",
plugin: &HTTP{
URL: u.String(),
Method: http.MethodGet,
},
connectError: true,
},
{
name: "method is case insensitive",
plugin: &HTTP{
URL: u.String(),
Method: "poST",
},
expectedMethod: http.MethodPost,
},
}
http.SetSerializer(&graphite.GraphiteSerializer{
Prefix: "telegraf",
Template: "tags.measurement.field",
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, tt.expectedMethod, r.Method)
w.WriteHeader(http.StatusOK)
})
http.Connect()
err := http.Write(metrics)
serializer := influx.NewSerializer()
tt.plugin.SetSerializer(serializer)
err = tt.plugin.Connect()
if tt.connectError {
require.Error(t, err)
return
}
require.NoError(t, err)
assert.NoError(t, err)
err = tt.plugin.Write([]telegraf.Metric{getMetric()})
require.NoError(t, err)
})
}
}
func TestHttpWriteWithUnexpected404StatusCode(t *testing.T) {
now := time.Now()
func TestStatusCode(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()
server := httptest.NewServer(&TestNotFoundHandler{})
defer server.Close()
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)
m, _ := metric.New("cpu", cpuTags, cpuField, now)
metrics := []telegraf.Metric{m}
http := &Http{
URL: server.URL,
Headers: map[string]string{
"Content-Type": "application/json",
tests := []struct {
name string
plugin *HTTP
statusCode int
errFunc func(t *testing.T, err error)
}{
{
name: "success",
plugin: &HTTP{
URL: u.String(),
},
statusCode: http.StatusOK,
errFunc: func(t *testing.T, err error) {
require.NoError(t, err)
},
},
{
name: "1xx status is an error",
plugin: &HTTP{
URL: u.String(),
},
statusCode: 103,
errFunc: func(t *testing.T, err error) {
require.Error(t, err)
},
},
{
name: "3xx status is an error",
plugin: &HTTP{
URL: u.String(),
},
statusCode: http.StatusMultipleChoices,
errFunc: func(t *testing.T, err error) {
require.Error(t, err)
},
},
{
name: "4xx status is an error",
plugin: &HTTP{
URL: u.String(),
},
statusCode: http.StatusMultipleChoices,
errFunc: func(t *testing.T, err error) {
require.Error(t, err)
},
},
}
http.SetSerializer(&graphite.GraphiteSerializer{
Prefix: "telegraf",
Template: "tags.measurement.field",
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(tt.statusCode)
})
http.Connect()
err := http.Write(metrics)
serializer := influx.NewSerializer()
tt.plugin.SetSerializer(serializer)
err = tt.plugin.Connect()
require.NoError(t, err)
assert.Error(t, err)
err = tt.plugin.Write([]telegraf.Metric{getMetric()})
tt.errFunc(t, err)
})
}
}
func TestHttpWriteWithExpected404StatusCode(t *testing.T) {
now := time.Now()
func TestContentType(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()
server := httptest.NewServer(&TestNotFoundHandler{})
defer server.Close()
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)
m, _ := metric.New("cpu", cpuTags, cpuField, now)
metrics := []telegraf.Metric{m}
http := &Http{
URL: server.URL,
Headers: map[string]string{
"Content-Type": "application/json",
tests := []struct {
name string
plugin *HTTP
expected string
}{
{
name: "default is text plain",
plugin: &HTTP{
URL: u.String(),
},
expected: defaultContentType,
},
{
name: "overwrite content_type",
plugin: &HTTP{
URL: u.String(),
Headers: map[string]string{"Content-Type": "application/json"},
},
expected: "application/json",
},
}
http.SetSerializer(&graphite.GraphiteSerializer{
Prefix: "telegraf",
Template: "tags.measurement.field",
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, tt.expected, r.Header.Get("Content-Type"))
w.WriteHeader(http.StatusOK)
})
http.Connect()
err := http.Write(metrics)
serializer := influx.NewSerializer()
tt.plugin.SetSerializer(serializer)
err = tt.plugin.Connect()
require.NoError(t, err)
assert.Error(t, err)
err = tt.plugin.Write([]telegraf.Metric{getMetric()})
require.NoError(t, err)
})
}
}
func TestHttpWriteWithIncorrectServerPort(t *testing.T) {
now := time.Now()
func TestBasicAuth(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()
m, _ := metric.New("cpu", cpuTags, cpuField, now)
metrics := []telegraf.Metric{m}
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)
http := &Http{
URL: "http://127.0.0.1:56879/incorrect/url",
Headers: map[string]string{
"Content-Type": "application/json",
tests := []struct {
name string
plugin *HTTP
username string
password string
}{
{
name: "default",
plugin: &HTTP{
URL: u.String(),
},
},
{
name: "username only",
plugin: &HTTP{
URL: u.String(),
Username: "username",
},
},
{
name: "password only",
plugin: &HTTP{
URL: u.String(),
Password: "pa$$word",
},
},
{
name: "username and password",
plugin: &HTTP{
URL: u.String(),
Username: "username",
Password: "pa$$word",
},
},
}
http.SetSerializer(&graphite.GraphiteSerializer{
Prefix: "telegraf",
Template: "tags.measurement.field",
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
username, password, _ := r.BasicAuth()
require.Equal(t, tt.username, username)
require.Equal(t, tt.password, password)
w.WriteHeader(http.StatusOK)
})
http.Connect()
err := http.Write(metrics)
serializer := influx.NewSerializer()
tt.plugin.SetSerializer(serializer)
err = tt.plugin.Connect()
require.NoError(t, err)
assert.Error(t, err)
}
func TestHttpWriteWithHttpSerializer(t *testing.T) {
now := time.Now()
server := httptest.NewServer(&TestOkHandler{
T: t,
Expected: []string{
fmt.Sprintf("{\"metrics\":[{\"fields\":{\"usage_idle\":91.5},\"name\":\"cpu\",\"tags\":{\"cpu\":\"cpu0\",\"datacenter\":\"us-west-2\",\"host\":\"localhost\"},\"timestamp\":%d},{\"fields\":{\"usage_idle\":91.5},\"name\":\"cpu\",\"tags\":{\"cpu\":\"cpu0\",\"datacenter\":\"us-west-2\",\"host\":\"localhost\"},\"timestamp\":%d}]}", now.Unix(), now.Unix()),
},
err = tt.plugin.Write([]telegraf.Metric{getMetric()})
require.NoError(t, err)
})
defer server.Close()
http := &Http{
URL: server.URL,
Headers: map[string]string{
"Content-Type": "application/json",
},
}
jsonSerializer, _ := json.NewSerializer(time.Second)
http.SetSerializer(jsonSerializer)
m1, _ := metric.New("cpu", cpuTags, cpuField, now)
m2, _ := metric.New("cpu", cpuTags, cpuField, now)
metrics := []telegraf.Metric{m1, m2}
http.Connect()
err := http.Write(metrics)
assert.Nil(t, err)
}
func TestHttpWithoutContentType(t *testing.T) {
http := &Http{
URL: "http://127.0.0.1:56879/correct/url",
}
err := http.Connect()
assert.Error(t, err)
}
func TestHttpWithContentType(t *testing.T) {
http := &Http{
URL: "http://127.0.0.1:56879/correct/url",
Headers: map[string]string{
"Content-Type": "application/json",
},
}
err := http.Connect()
assert.Nil(t, err)
}
func TestImplementedInterfaceFunction(t *testing.T) {
http := &Http{
URL: "http://127.0.0.1:56879/incorrect/url",
Headers: map[string]string{
"Content-Type": "application/json",
},
}
assert.NotNil(t, http.SampleConfig())
assert.NotNil(t, http.Description())
}
func resetCount() {
count = 0
}