Add method, basic auth, and tls support to http output

This commit is contained in:
Daniel Nelson 2018-05-14 17:11:44 -07:00
parent 190a4128c5
commit d5f57715dc
No known key found for this signature in database
GPG Key ID: CAAD59C9444F6155
3 changed files with 351 additions and 285 deletions

View File

@ -1,28 +1,41 @@
# HTTP Output Plugin # HTTP Output Plugin
This plugin writes to a HTTP Server using the `POST Method`. This plugin sends metrics in a HTTP message encoded using one of the output
data formats. For data_formats that support batching, metrics are sent in batch format.
Data collected from telegraf is sent in the Request Body.
### Configuration: ### Configuration:
```toml ```toml
# Send telegraf metrics to HTTP Server(s) # A plugin that can transmit metrics over HTTP
[[outputs.http]] [[outputs.http]]
## It requires a url name. ## URL is the address to send metrics to
## Will be transmitted telegraf metrics to the HTTP Server using the below URL.
## Note that not support the HTTPS.
url = "http://127.0.0.1:8080/metric" url = "http://127.0.0.1:8080/metric"
## Configure dial timeout in seconds. Default : 3
timeout = 3 ## Timeout for HTTP message
## http_headers option can add a custom header to the request. # timeout = "5s"
## Content-Type is required http header in http plugin.
## so content-type of HTTP specification (plain/text, application/json, etc...) must be filled out. ## HTTP method, one of: "POST" or "PUT"
[outputs.http.headers] # method = "POST"
Content-Type = "plain/text"
## HTTP Basic Auth credentials
# username = "username"
# password = "pa$$word"
## Additional HTTP headers
# [outputs.http.headers]
# # Should be set manually to "application/json" for json data_format
# Content-Type = "text/plain; charset=utf-8"
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Data format to output. ## Data format to output.
## Each data format has it's own unique set of configuration options, read ## Each data format has it's own unique set of configuration options, read
## more about them here: ## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx" # data_format = "influx"
``` ```

View File

@ -3,103 +3,119 @@ package http
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"strings" "strings"
"time" "time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
) )
var sampleConfig = ` var sampleConfig = `
## It requires a url name. ## URL is the address to send metrics to
## Will be transmitted telegraf metrics to the HTTP Server using the below URL.
## Note that not support the HTTPS.
url = "http://127.0.0.1:8080/metric" url = "http://127.0.0.1:8080/metric"
## Configure dial timeout in seconds. Default : 3
timeout = 3 ## Timeout for HTTP message
## http_headers option can add a custom header to the request. # timeout = "5s"
## Content-Type is required http header in http plugin.
## so content-type of HTTP specification (plain/text, application/json, etc...) must be filled out. ## HTTP method, one of: "POST" or "PUT"
[outputs.http.headers] # method = "POST"
Content-Type = "plain/text"
## HTTP Basic Auth credentials
# username = "username"
# password = "pa$$word"
## Additional HTTP headers
# [outputs.http.headers]
# # Should be set to "application/json" for json data_format
# Content-Type = "text/plain; charset=utf-8"
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Data format to output. ## Data format to output.
## Each data format has it's own unique set of configuration options, read ## Each data format has it's own unique set of configuration options, read
## more about them here: ## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx" # data_format = "influx"
` `
const ( const (
POST = "POST" defaultClientTimeout = 5 * time.Second
defaultContentType = "text/plain; charset=utf-8"
DEFAULT_TIME_OUT = 3 defaultMethod = http.MethodPost
CONTENT_TYPE = "content-type"
) )
type Http struct { type HTTP struct {
// http required option URL string `toml:"url"`
URL string `toml:"url"` Timeout internal.Duration `toml:"timeout"`
Headers map[string]string Method string `toml:"method"`
Username string `toml:"username"`
Password string `toml:"password"`
Headers map[string]string `toml:"headers"`
tls.ClientConfig
// Option with http default value client *http.Client
Timeout int `toml:"timeout"`
client http.Client
serializer serializers.Serializer serializer serializers.Serializer
} }
func (h *Http) SetSerializer(serializer serializers.Serializer) { func (h *HTTP) SetSerializer(serializer serializers.Serializer) {
h.serializer = serializer h.serializer = serializer
} }
// Connect to the Output func (h *HTTP) Connect() error {
func (h *Http) Connect() error { if h.Method == "" {
h.client = http.Client{ h.Method = http.MethodPost
}
h.Method = strings.ToUpper(h.Method)
if h.Method != http.MethodPost && h.Method != http.MethodPut {
return fmt.Errorf("invalid method [%s] %s", h.URL, h.Method)
}
if h.Timeout.Duration == 0 {
h.Timeout.Duration = defaultClientTimeout
}
tlsCfg, err := h.ClientConfig.TLSConfig()
if err != nil {
return err
}
h.client = &http.Client{
Transport: &http.Transport{ Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment, TLSClientConfig: tlsCfg,
Proxy: http.ProxyFromEnvironment,
}, },
Timeout: time.Duration(h.Timeout) * time.Second, Timeout: h.Timeout.Duration,
}
var isValid bool
for k := range h.Headers {
if strings.ToLower(k) == CONTENT_TYPE {
isValid = true
}
}
if !isValid {
return fmt.Errorf("E! httpHeader require content-type!")
} }
return nil return nil
} }
// Close is not implemented. Because http.Client not provided connection close policy. Instead, uses the response.Body.Close() pattern. func (h *HTTP) Close() error {
func (h *Http) Close() error {
return nil return nil
} }
// Description A plugin that can transmit metrics over HTTP func (h *HTTP) Description() string {
func (h *Http) Description() string {
return "A plugin that can transmit metrics over HTTP" return "A plugin that can transmit metrics over HTTP"
} }
// SampleConfig provides sample example for developer func (h *HTTP) SampleConfig() string {
func (h *Http) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Writes metrics over HTTP POST func (h *HTTP) Write(metrics []telegraf.Metric) error {
func (h *Http) Write(metrics []telegraf.Metric) error {
reqBody, err := h.serializer.SerializeBatch(metrics) reqBody, err := h.serializer.SerializeBatch(metrics)
if err != nil { if err != nil {
return fmt.Errorf("E! Error serializing some metrics: %s", err.Error()) return err
} }
if err := h.write(reqBody); err != nil { if err := h.write(reqBody); err != nil {
@ -109,9 +125,10 @@ func (h *Http) Write(metrics []telegraf.Metric) error {
return nil return nil
} }
func (h *Http) write(reqBody []byte) error { func (h *HTTP) write(reqBody []byte) error {
req, err := http.NewRequest(POST, h.URL, bytes.NewBuffer(reqBody)) req, err := http.NewRequest(h.Method, h.URL, bytes.NewBuffer(reqBody))
req.Header.Set("Content-Type", defaultContentType)
for k, v := range h.Headers { for k, v := range h.Headers {
req.Header.Set(k, v) req.Header.Set(k, v)
} }
@ -123,20 +140,8 @@ func (h *Http) write(reqBody []byte) error {
defer resp.Body.Close() defer resp.Body.Close()
_, err = ioutil.ReadAll(resp.Body) _, err = ioutil.ReadAll(resp.Body)
if err := h.isOk(resp, err); err != nil { if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return err return fmt.Errorf("when writing to [%s] received status code: %d", h.URL, resp.StatusCode)
}
return nil
}
func (h *Http) isOk(resp *http.Response, err error) error {
if resp == nil || err != nil {
return fmt.Errorf("E! %s request failed! %s.", h.URL, err.Error())
}
if resp.StatusCode < 200 || resp.StatusCode > 209 {
return fmt.Errorf("received bad status code, %d\n", resp.StatusCode)
} }
return nil return nil
@ -144,8 +149,9 @@ func (h *Http) isOk(resp *http.Response, err error) error {
func init() { func init() {
outputs.Add("http", func() telegraf.Output { outputs.Add("http", func() telegraf.Output {
return &Http{ return &HTTP{
Timeout: DEFAULT_TIME_OUT, Timeout: internal.Duration{Duration: defaultClientTimeout},
Method: defaultMethod,
} }
}) })
} }

View File

@ -2,243 +2,290 @@ package http
import ( import (
"fmt" "fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/serializers/graphite"
"github.com/influxdata/telegraf/plugins/serializers/json"
"github.com/stretchr/testify/assert"
"io/ioutil"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url"
"testing" "testing"
"time" "time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/stretchr/testify/require"
) )
var ( func getMetric() telegraf.Metric {
cpuTags = map[string]string{ m, err := metric.New(
"host": "localhost", "cpu",
"cpu": "cpu0", map[string]string{},
"datacenter": "us-west-2", map[string]interface{}{
} "value": 42.0,
cpuField = map[string]interface{}{
"usage_idle": float64(91.5),
}
memTags = map[string]string{
"host": "localhost",
"cpu": "mem",
"datacenter": "us-west-2",
}
memField = map[string]interface{}{
"used": float64(91.5),
}
count int
)
type TestOkHandler struct {
T *testing.T
Expected []string
}
// The handler gets a new variable each time it receives a request, so it fetches an expected string based on global variable.
func (h TestOkHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
actual, _ := ioutil.ReadAll(r.Body)
assert.Equal(h.T, h.Expected[count], string(actual), fmt.Sprintf("%d Expected fail!", count))
count++
fmt.Fprint(w, "ok")
}
type TestNotFoundHandler struct {
}
func (h TestNotFoundHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.NotFound(w, r)
}
func TestWriteAllInputMetric(t *testing.T) {
now := time.Now()
server := httptest.NewServer(&TestOkHandler{
T: t,
Expected: []string{
fmt.Sprintf("telegraf.cpu0.us-west-2.localhost.cpu.usage_idle 91.5 %d\ntelegraf.mem.us-west-2.localhost.mem.used 91.5 %d\n", now.Unix(), now.Unix()),
}, },
}) time.Unix(0, 0),
defer server.Close() )
defer resetCount() if err != nil {
panic(err)
}
return m
}
m1, _ := metric.New("cpu", cpuTags, cpuField, now) func TestInvalidMethod(t *testing.T) {
m2, _ := metric.New("mem", memTags, memField, now) plugin := &HTTP{
metrics := []telegraf.Metric{m1, m2} URL: "",
Method: http.MethodGet,
}
http := &Http{ err := plugin.Connect()
URL: server.URL, require.Error(t, err)
Headers: map[string]string{ }
"Content-Type": "plain/text",
func TestMethod(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)
tests := []struct {
name string
plugin *HTTP
expectedMethod string
connectError bool
}{
{
name: "default method is POST",
plugin: &HTTP{
URL: u.String(),
Method: defaultMethod,
},
expectedMethod: http.MethodPost,
},
{
name: "put is okay",
plugin: &HTTP{
URL: u.String(),
Method: http.MethodPut,
},
expectedMethod: http.MethodPut,
},
{
name: "get is invalid",
plugin: &HTTP{
URL: u.String(),
Method: http.MethodGet,
},
connectError: true,
},
{
name: "method is case insensitive",
plugin: &HTTP{
URL: u.String(),
Method: "poST",
},
expectedMethod: http.MethodPost,
}, },
} }
http.SetSerializer(&graphite.GraphiteSerializer{ for _, tt := range tests {
Prefix: "telegraf", t.Run(tt.name, func(t *testing.T) {
Template: "tags.measurement.field", ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
}) require.Equal(t, tt.expectedMethod, r.Method)
w.WriteHeader(http.StatusOK)
})
http.Connect() serializer := influx.NewSerializer()
err := http.Write(metrics) tt.plugin.SetSerializer(serializer)
err = tt.plugin.Connect()
if tt.connectError {
require.Error(t, err)
return
}
require.NoError(t, err)
assert.NoError(t, err) err = tt.plugin.Write([]telegraf.Metric{getMetric()})
require.NoError(t, err)
})
}
} }
func TestHttpWriteWithUnexpected404StatusCode(t *testing.T) { func TestStatusCode(t *testing.T) {
now := time.Now() ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()
server := httptest.NewServer(&TestNotFoundHandler{}) u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
defer server.Close() require.NoError(t, err)
m, _ := metric.New("cpu", cpuTags, cpuField, now) tests := []struct {
metrics := []telegraf.Metric{m} name string
plugin *HTTP
http := &Http{ statusCode int
URL: server.URL, errFunc func(t *testing.T, err error)
Headers: map[string]string{ }{
"Content-Type": "application/json", {
name: "success",
plugin: &HTTP{
URL: u.String(),
},
statusCode: http.StatusOK,
errFunc: func(t *testing.T, err error) {
require.NoError(t, err)
},
},
{
name: "1xx status is an error",
plugin: &HTTP{
URL: u.String(),
},
statusCode: 103,
errFunc: func(t *testing.T, err error) {
require.Error(t, err)
},
},
{
name: "3xx status is an error",
plugin: &HTTP{
URL: u.String(),
},
statusCode: http.StatusMultipleChoices,
errFunc: func(t *testing.T, err error) {
require.Error(t, err)
},
},
{
name: "4xx status is an error",
plugin: &HTTP{
URL: u.String(),
},
statusCode: http.StatusMultipleChoices,
errFunc: func(t *testing.T, err error) {
require.Error(t, err)
},
}, },
} }
http.SetSerializer(&graphite.GraphiteSerializer{ for _, tt := range tests {
Prefix: "telegraf", t.Run(tt.name, func(t *testing.T) {
Template: "tags.measurement.field", ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
}) w.WriteHeader(tt.statusCode)
})
http.Connect() serializer := influx.NewSerializer()
err := http.Write(metrics) tt.plugin.SetSerializer(serializer)
err = tt.plugin.Connect()
require.NoError(t, err)
assert.Error(t, err) err = tt.plugin.Write([]telegraf.Metric{getMetric()})
tt.errFunc(t, err)
})
}
} }
func TestHttpWriteWithExpected404StatusCode(t *testing.T) { func TestContentType(t *testing.T) {
now := time.Now() ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()
server := httptest.NewServer(&TestNotFoundHandler{}) u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
defer server.Close() require.NoError(t, err)
m, _ := metric.New("cpu", cpuTags, cpuField, now) tests := []struct {
metrics := []telegraf.Metric{m} name string
plugin *HTTP
http := &Http{ expected string
URL: server.URL, }{
Headers: map[string]string{ {
"Content-Type": "application/json", name: "default is text plain",
plugin: &HTTP{
URL: u.String(),
},
expected: defaultContentType,
},
{
name: "overwrite content_type",
plugin: &HTTP{
URL: u.String(),
Headers: map[string]string{"Content-Type": "application/json"},
},
expected: "application/json",
}, },
} }
http.SetSerializer(&graphite.GraphiteSerializer{ for _, tt := range tests {
Prefix: "telegraf", t.Run(tt.name, func(t *testing.T) {
Template: "tags.measurement.field", ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
}) require.Equal(t, tt.expected, r.Header.Get("Content-Type"))
w.WriteHeader(http.StatusOK)
})
http.Connect() serializer := influx.NewSerializer()
err := http.Write(metrics) tt.plugin.SetSerializer(serializer)
err = tt.plugin.Connect()
require.NoError(t, err)
assert.Error(t, err) err = tt.plugin.Write([]telegraf.Metric{getMetric()})
require.NoError(t, err)
})
}
} }
func TestHttpWriteWithIncorrectServerPort(t *testing.T) { func TestBasicAuth(t *testing.T) {
now := time.Now() ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()
m, _ := metric.New("cpu", cpuTags, cpuField, now) u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
metrics := []telegraf.Metric{m} require.NoError(t, err)
http := &Http{ tests := []struct {
URL: "http://127.0.0.1:56879/incorrect/url", name string
Headers: map[string]string{ plugin *HTTP
"Content-Type": "application/json", username string
password string
}{
{
name: "default",
plugin: &HTTP{
URL: u.String(),
},
},
{
name: "username only",
plugin: &HTTP{
URL: u.String(),
Username: "username",
},
},
{
name: "password only",
plugin: &HTTP{
URL: u.String(),
Password: "pa$$word",
},
},
{
name: "username and password",
plugin: &HTTP{
URL: u.String(),
Username: "username",
Password: "pa$$word",
},
}, },
} }
http.SetSerializer(&graphite.GraphiteSerializer{ for _, tt := range tests {
Prefix: "telegraf", t.Run(tt.name, func(t *testing.T) {
Template: "tags.measurement.field", ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
}) username, password, _ := r.BasicAuth()
require.Equal(t, tt.username, username)
require.Equal(t, tt.password, password)
w.WriteHeader(http.StatusOK)
})
http.Connect() serializer := influx.NewSerializer()
err := http.Write(metrics) tt.plugin.SetSerializer(serializer)
err = tt.plugin.Connect()
require.NoError(t, err)
assert.Error(t, err) err = tt.plugin.Write([]telegraf.Metric{getMetric()})
} require.NoError(t, err)
})
func TestHttpWriteWithHttpSerializer(t *testing.T) {
now := time.Now()
server := httptest.NewServer(&TestOkHandler{
T: t,
Expected: []string{
fmt.Sprintf("{\"metrics\":[{\"fields\":{\"usage_idle\":91.5},\"name\":\"cpu\",\"tags\":{\"cpu\":\"cpu0\",\"datacenter\":\"us-west-2\",\"host\":\"localhost\"},\"timestamp\":%d},{\"fields\":{\"usage_idle\":91.5},\"name\":\"cpu\",\"tags\":{\"cpu\":\"cpu0\",\"datacenter\":\"us-west-2\",\"host\":\"localhost\"},\"timestamp\":%d}]}", now.Unix(), now.Unix()),
},
})
defer server.Close()
http := &Http{
URL: server.URL,
Headers: map[string]string{
"Content-Type": "application/json",
},
} }
jsonSerializer, _ := json.NewSerializer(time.Second)
http.SetSerializer(jsonSerializer)
m1, _ := metric.New("cpu", cpuTags, cpuField, now)
m2, _ := metric.New("cpu", cpuTags, cpuField, now)
metrics := []telegraf.Metric{m1, m2}
http.Connect()
err := http.Write(metrics)
assert.Nil(t, err)
}
func TestHttpWithoutContentType(t *testing.T) {
http := &Http{
URL: "http://127.0.0.1:56879/correct/url",
}
err := http.Connect()
assert.Error(t, err)
}
func TestHttpWithContentType(t *testing.T) {
http := &Http{
URL: "http://127.0.0.1:56879/correct/url",
Headers: map[string]string{
"Content-Type": "application/json",
},
}
err := http.Connect()
assert.Nil(t, err)
}
func TestImplementedInterfaceFunction(t *testing.T) {
http := &Http{
URL: "http://127.0.0.1:56879/incorrect/url",
Headers: map[string]string{
"Content-Type": "application/json",
},
}
assert.NotNil(t, http.SampleConfig())
assert.NotNil(t, http.Description())
}
func resetCount() {
count = 0
} }