diff --git a/plugins/outputs/http/README.md b/plugins/outputs/http/README.md index b7dd75276..2af3fb27a 100644 --- a/plugins/outputs/http/README.md +++ b/plugins/outputs/http/README.md @@ -1,28 +1,41 @@ # HTTP Output Plugin -This plugin writes to a HTTP Server using the `POST Method`. - -Data collected from telegraf is sent in the Request Body. +This plugin sends metrics in a HTTP message encoded using one of the output +data formats. For data_formats that support batching, metrics are sent in batch format. ### Configuration: ```toml -# Send telegraf metrics to HTTP Server(s) +# A plugin that can transmit metrics over HTTP [[outputs.http]] - ## It requires a url name. - ## Will be transmitted telegraf metrics to the HTTP Server using the below URL. - ## Note that not support the HTTPS. + ## URL is the address to send metrics to url = "http://127.0.0.1:8080/metric" - ## Configure dial timeout in seconds. Default : 3 - timeout = 3 - ## http_headers option can add a custom header to the request. - ## Content-Type is required http header in http plugin. - ## so content-type of HTTP specification (plain/text, application/json, etc...) must be filled out. - [outputs.http.headers] - Content-Type = "plain/text" + + ## Timeout for HTTP message + # timeout = "5s" + + ## HTTP method, one of: "POST" or "PUT" + # method = "POST" + + ## HTTP Basic Auth credentials + # username = "username" + # password = "pa$$word" + + ## Additional HTTP headers + # [outputs.http.headers] + # # Should be set manually to "application/json" for json data_format + # Content-Type = "text/plain; charset=utf-8" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + ## Data format to output. ## Each data format has it's own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md - data_format = "influx" + # data_format = "influx" ``` diff --git a/plugins/outputs/http/http.go b/plugins/outputs/http/http.go index e78031b8c..4b769a467 100644 --- a/plugins/outputs/http/http.go +++ b/plugins/outputs/http/http.go @@ -3,103 +3,119 @@ package http import ( "bytes" "fmt" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/outputs" - "github.com/influxdata/telegraf/plugins/serializers" "io/ioutil" "net/http" "strings" "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/serializers" ) var sampleConfig = ` - ## It requires a url name. - ## Will be transmitted telegraf metrics to the HTTP Server using the below URL. - ## Note that not support the HTTPS. + ## URL is the address to send metrics to url = "http://127.0.0.1:8080/metric" - ## Configure dial timeout in seconds. Default : 3 - timeout = 3 - ## http_headers option can add a custom header to the request. - ## Content-Type is required http header in http plugin. - ## so content-type of HTTP specification (plain/text, application/json, etc...) must be filled out. - [outputs.http.headers] - Content-Type = "plain/text" + + ## Timeout for HTTP message + # timeout = "5s" + + ## HTTP method, one of: "POST" or "PUT" + # method = "POST" + + ## HTTP Basic Auth credentials + # username = "username" + # password = "pa$$word" + + ## Additional HTTP headers + # [outputs.http.headers] + # # Should be set to "application/json" for json data_format + # Content-Type = "text/plain; charset=utf-8" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + ## Data format to output. ## Each data format has it's own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md - data_format = "influx" + # data_format = "influx" ` const ( - POST = "POST" - - DEFAULT_TIME_OUT = 3 - - CONTENT_TYPE = "content-type" + defaultClientTimeout = 5 * time.Second + defaultContentType = "text/plain; charset=utf-8" + defaultMethod = http.MethodPost ) -type Http struct { - // http required option - URL string `toml:"url"` - Headers map[string]string +type HTTP struct { + URL string `toml:"url"` + Timeout internal.Duration `toml:"timeout"` + Method string `toml:"method"` + Username string `toml:"username"` + Password string `toml:"password"` + Headers map[string]string `toml:"headers"` + tls.ClientConfig - // Option with http default value - Timeout int `toml:"timeout"` - - client http.Client + client *http.Client serializer serializers.Serializer } -func (h *Http) SetSerializer(serializer serializers.Serializer) { +func (h *HTTP) SetSerializer(serializer serializers.Serializer) { h.serializer = serializer } -// Connect to the Output -func (h *Http) Connect() error { - h.client = http.Client{ +func (h *HTTP) Connect() error { + if h.Method == "" { + h.Method = http.MethodPost + } + h.Method = strings.ToUpper(h.Method) + if h.Method != http.MethodPost && h.Method != http.MethodPut { + return fmt.Errorf("invalid method [%s] %s", h.URL, h.Method) + } + + if h.Timeout.Duration == 0 { + h.Timeout.Duration = defaultClientTimeout + } + + tlsCfg, err := h.ClientConfig.TLSConfig() + if err != nil { + return err + } + + h.client = &http.Client{ Transport: &http.Transport{ - Proxy: http.ProxyFromEnvironment, + TLSClientConfig: tlsCfg, + Proxy: http.ProxyFromEnvironment, }, - Timeout: time.Duration(h.Timeout) * time.Second, - } - - var isValid bool - - for k := range h.Headers { - if strings.ToLower(k) == CONTENT_TYPE { - isValid = true - } - } - - if !isValid { - return fmt.Errorf("E! httpHeader require content-type!") + Timeout: h.Timeout.Duration, } return nil } -// Close is not implemented. Because http.Client not provided connection close policy. Instead, uses the response.Body.Close() pattern. -func (h *Http) Close() error { +func (h *HTTP) Close() error { return nil } -// Description A plugin that can transmit metrics over HTTP -func (h *Http) Description() string { +func (h *HTTP) Description() string { return "A plugin that can transmit metrics over HTTP" } -// SampleConfig provides sample example for developer -func (h *Http) SampleConfig() string { +func (h *HTTP) SampleConfig() string { return sampleConfig } -// Writes metrics over HTTP POST -func (h *Http) Write(metrics []telegraf.Metric) error { +func (h *HTTP) Write(metrics []telegraf.Metric) error { reqBody, err := h.serializer.SerializeBatch(metrics) - if err != nil { - return fmt.Errorf("E! Error serializing some metrics: %s", err.Error()) + return err } if err := h.write(reqBody); err != nil { @@ -109,9 +125,10 @@ func (h *Http) Write(metrics []telegraf.Metric) error { return nil } -func (h *Http) write(reqBody []byte) error { - req, err := http.NewRequest(POST, h.URL, bytes.NewBuffer(reqBody)) +func (h *HTTP) write(reqBody []byte) error { + req, err := http.NewRequest(h.Method, h.URL, bytes.NewBuffer(reqBody)) + req.Header.Set("Content-Type", defaultContentType) for k, v := range h.Headers { req.Header.Set(k, v) } @@ -123,20 +140,8 @@ func (h *Http) write(reqBody []byte) error { defer resp.Body.Close() _, err = ioutil.ReadAll(resp.Body) - if err := h.isOk(resp, err); err != nil { - return err - } - - return nil -} - -func (h *Http) isOk(resp *http.Response, err error) error { - if resp == nil || err != nil { - return fmt.Errorf("E! %s request failed! %s.", h.URL, err.Error()) - } - - if resp.StatusCode < 200 || resp.StatusCode > 209 { - return fmt.Errorf("received bad status code, %d\n", resp.StatusCode) + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("when writing to [%s] received status code: %d", h.URL, resp.StatusCode) } return nil @@ -144,8 +149,9 @@ func (h *Http) isOk(resp *http.Response, err error) error { func init() { outputs.Add("http", func() telegraf.Output { - return &Http{ - Timeout: DEFAULT_TIME_OUT, + return &HTTP{ + Timeout: internal.Duration{Duration: defaultClientTimeout}, + Method: defaultMethod, } }) } diff --git a/plugins/outputs/http/http_test.go b/plugins/outputs/http/http_test.go index aa9c68eba..1d511d85b 100644 --- a/plugins/outputs/http/http_test.go +++ b/plugins/outputs/http/http_test.go @@ -2,243 +2,290 @@ package http import ( "fmt" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/metric" - "github.com/influxdata/telegraf/plugins/serializers/graphite" - "github.com/influxdata/telegraf/plugins/serializers/json" - "github.com/stretchr/testify/assert" - "io/ioutil" "net/http" "net/http/httptest" + "net/url" "testing" "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/serializers/influx" + "github.com/stretchr/testify/require" ) -var ( - cpuTags = map[string]string{ - "host": "localhost", - "cpu": "cpu0", - "datacenter": "us-west-2", - } - - cpuField = map[string]interface{}{ - "usage_idle": float64(91.5), - } - - memTags = map[string]string{ - "host": "localhost", - "cpu": "mem", - "datacenter": "us-west-2", - } - - memField = map[string]interface{}{ - "used": float64(91.5), - } - - count int -) - -type TestOkHandler struct { - T *testing.T - Expected []string -} - -// The handler gets a new variable each time it receives a request, so it fetches an expected string based on global variable. -func (h TestOkHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - actual, _ := ioutil.ReadAll(r.Body) - - assert.Equal(h.T, h.Expected[count], string(actual), fmt.Sprintf("%d Expected fail!", count)) - - count++ - - fmt.Fprint(w, "ok") -} - -type TestNotFoundHandler struct { -} - -func (h TestNotFoundHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - http.NotFound(w, r) -} - -func TestWriteAllInputMetric(t *testing.T) { - now := time.Now() - - server := httptest.NewServer(&TestOkHandler{ - T: t, - Expected: []string{ - fmt.Sprintf("telegraf.cpu0.us-west-2.localhost.cpu.usage_idle 91.5 %d\ntelegraf.mem.us-west-2.localhost.mem.used 91.5 %d\n", now.Unix(), now.Unix()), +func getMetric() telegraf.Metric { + m, err := metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, }, - }) - defer server.Close() - defer resetCount() + time.Unix(0, 0), + ) + if err != nil { + panic(err) + } + return m +} - m1, _ := metric.New("cpu", cpuTags, cpuField, now) - m2, _ := metric.New("mem", memTags, memField, now) - metrics := []telegraf.Metric{m1, m2} +func TestInvalidMethod(t *testing.T) { + plugin := &HTTP{ + URL: "", + Method: http.MethodGet, + } - http := &Http{ - URL: server.URL, - Headers: map[string]string{ - "Content-Type": "plain/text", + err := plugin.Connect() + require.Error(t, err) +} + +func TestMethod(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) + + tests := []struct { + name string + plugin *HTTP + expectedMethod string + connectError bool + }{ + { + name: "default method is POST", + plugin: &HTTP{ + URL: u.String(), + Method: defaultMethod, + }, + expectedMethod: http.MethodPost, + }, + { + name: "put is okay", + plugin: &HTTP{ + URL: u.String(), + Method: http.MethodPut, + }, + expectedMethod: http.MethodPut, + }, + { + name: "get is invalid", + plugin: &HTTP{ + URL: u.String(), + Method: http.MethodGet, + }, + connectError: true, + }, + { + name: "method is case insensitive", + plugin: &HTTP{ + URL: u.String(), + Method: "poST", + }, + expectedMethod: http.MethodPost, }, } - http.SetSerializer(&graphite.GraphiteSerializer{ - Prefix: "telegraf", - Template: "tags.measurement.field", - }) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, tt.expectedMethod, r.Method) + w.WriteHeader(http.StatusOK) + }) - http.Connect() - err := http.Write(metrics) + serializer := influx.NewSerializer() + tt.plugin.SetSerializer(serializer) + err = tt.plugin.Connect() + if tt.connectError { + require.Error(t, err) + return + } + require.NoError(t, err) - assert.NoError(t, err) + err = tt.plugin.Write([]telegraf.Metric{getMetric()}) + require.NoError(t, err) + }) + } } -func TestHttpWriteWithUnexpected404StatusCode(t *testing.T) { - now := time.Now() +func TestStatusCode(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() - server := httptest.NewServer(&TestNotFoundHandler{}) - defer server.Close() + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) - m, _ := metric.New("cpu", cpuTags, cpuField, now) - metrics := []telegraf.Metric{m} - - http := &Http{ - URL: server.URL, - Headers: map[string]string{ - "Content-Type": "application/json", + tests := []struct { + name string + plugin *HTTP + statusCode int + errFunc func(t *testing.T, err error) + }{ + { + name: "success", + plugin: &HTTP{ + URL: u.String(), + }, + statusCode: http.StatusOK, + errFunc: func(t *testing.T, err error) { + require.NoError(t, err) + }, + }, + { + name: "1xx status is an error", + plugin: &HTTP{ + URL: u.String(), + }, + statusCode: 103, + errFunc: func(t *testing.T, err error) { + require.Error(t, err) + }, + }, + { + name: "3xx status is an error", + plugin: &HTTP{ + URL: u.String(), + }, + statusCode: http.StatusMultipleChoices, + errFunc: func(t *testing.T, err error) { + require.Error(t, err) + }, + }, + { + name: "4xx status is an error", + plugin: &HTTP{ + URL: u.String(), + }, + statusCode: http.StatusMultipleChoices, + errFunc: func(t *testing.T, err error) { + require.Error(t, err) + }, }, } - http.SetSerializer(&graphite.GraphiteSerializer{ - Prefix: "telegraf", - Template: "tags.measurement.field", - }) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(tt.statusCode) + }) - http.Connect() - err := http.Write(metrics) + serializer := influx.NewSerializer() + tt.plugin.SetSerializer(serializer) + err = tt.plugin.Connect() + require.NoError(t, err) - assert.Error(t, err) + err = tt.plugin.Write([]telegraf.Metric{getMetric()}) + tt.errFunc(t, err) + }) + } } -func TestHttpWriteWithExpected404StatusCode(t *testing.T) { - now := time.Now() +func TestContentType(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() - server := httptest.NewServer(&TestNotFoundHandler{}) - defer server.Close() + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) - m, _ := metric.New("cpu", cpuTags, cpuField, now) - metrics := []telegraf.Metric{m} - - http := &Http{ - URL: server.URL, - Headers: map[string]string{ - "Content-Type": "application/json", + tests := []struct { + name string + plugin *HTTP + expected string + }{ + { + name: "default is text plain", + plugin: &HTTP{ + URL: u.String(), + }, + expected: defaultContentType, + }, + { + name: "overwrite content_type", + plugin: &HTTP{ + URL: u.String(), + Headers: map[string]string{"Content-Type": "application/json"}, + }, + expected: "application/json", }, } - http.SetSerializer(&graphite.GraphiteSerializer{ - Prefix: "telegraf", - Template: "tags.measurement.field", - }) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, tt.expected, r.Header.Get("Content-Type")) + w.WriteHeader(http.StatusOK) + }) - http.Connect() - err := http.Write(metrics) + serializer := influx.NewSerializer() + tt.plugin.SetSerializer(serializer) + err = tt.plugin.Connect() + require.NoError(t, err) - assert.Error(t, err) + err = tt.plugin.Write([]telegraf.Metric{getMetric()}) + require.NoError(t, err) + }) + } } -func TestHttpWriteWithIncorrectServerPort(t *testing.T) { - now := time.Now() +func TestBasicAuth(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() - m, _ := metric.New("cpu", cpuTags, cpuField, now) - metrics := []telegraf.Metric{m} + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) - http := &Http{ - URL: "http://127.0.0.1:56879/incorrect/url", - Headers: map[string]string{ - "Content-Type": "application/json", + tests := []struct { + name string + plugin *HTTP + username string + password string + }{ + { + name: "default", + plugin: &HTTP{ + URL: u.String(), + }, + }, + { + name: "username only", + plugin: &HTTP{ + URL: u.String(), + Username: "username", + }, + }, + { + name: "password only", + plugin: &HTTP{ + URL: u.String(), + Password: "pa$$word", + }, + }, + { + name: "username and password", + plugin: &HTTP{ + URL: u.String(), + Username: "username", + Password: "pa$$word", + }, }, } - http.SetSerializer(&graphite.GraphiteSerializer{ - Prefix: "telegraf", - Template: "tags.measurement.field", - }) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + username, password, _ := r.BasicAuth() + require.Equal(t, tt.username, username) + require.Equal(t, tt.password, password) + w.WriteHeader(http.StatusOK) + }) - http.Connect() - err := http.Write(metrics) + serializer := influx.NewSerializer() + tt.plugin.SetSerializer(serializer) + err = tt.plugin.Connect() + require.NoError(t, err) - assert.Error(t, err) -} - -func TestHttpWriteWithHttpSerializer(t *testing.T) { - now := time.Now() - - server := httptest.NewServer(&TestOkHandler{ - T: t, - Expected: []string{ - fmt.Sprintf("{\"metrics\":[{\"fields\":{\"usage_idle\":91.5},\"name\":\"cpu\",\"tags\":{\"cpu\":\"cpu0\",\"datacenter\":\"us-west-2\",\"host\":\"localhost\"},\"timestamp\":%d},{\"fields\":{\"usage_idle\":91.5},\"name\":\"cpu\",\"tags\":{\"cpu\":\"cpu0\",\"datacenter\":\"us-west-2\",\"host\":\"localhost\"},\"timestamp\":%d}]}", now.Unix(), now.Unix()), - }, - }) - - defer server.Close() - - http := &Http{ - URL: server.URL, - Headers: map[string]string{ - "Content-Type": "application/json", - }, + err = tt.plugin.Write([]telegraf.Metric{getMetric()}) + require.NoError(t, err) + }) } - jsonSerializer, _ := json.NewSerializer(time.Second) - http.SetSerializer(jsonSerializer) - - m1, _ := metric.New("cpu", cpuTags, cpuField, now) - m2, _ := metric.New("cpu", cpuTags, cpuField, now) - metrics := []telegraf.Metric{m1, m2} - - http.Connect() - err := http.Write(metrics) - - assert.Nil(t, err) -} - -func TestHttpWithoutContentType(t *testing.T) { - http := &Http{ - URL: "http://127.0.0.1:56879/correct/url", - } - - err := http.Connect() - - assert.Error(t, err) -} - -func TestHttpWithContentType(t *testing.T) { - http := &Http{ - URL: "http://127.0.0.1:56879/correct/url", - Headers: map[string]string{ - "Content-Type": "application/json", - }, - } - - err := http.Connect() - - assert.Nil(t, err) -} - -func TestImplementedInterfaceFunction(t *testing.T) { - http := &Http{ - URL: "http://127.0.0.1:56879/incorrect/url", - Headers: map[string]string{ - "Content-Type": "application/json", - }, - } - - assert.NotNil(t, http.SampleConfig()) - assert.NotNil(t, http.Description()) -} - -func resetCount() { - count = 0 }