From 190a4128c57a7f1d83d48d4efcb3087ff645b125 Mon Sep 17 00:00:00 2001 From: Dark Date: Tue, 15 May 2018 09:15:40 +0900 Subject: [PATCH] Add HTTP output plugin (#2491) --- plugins/outputs/all/all.go | 1 + plugins/outputs/http/README.md | 28 ++++ plugins/outputs/http/http.go | 151 ++++++++++++++++++ plugins/outputs/http/http_test.go | 244 ++++++++++++++++++++++++++++++ 4 files changed, 424 insertions(+) create mode 100644 plugins/outputs/http/README.md create mode 100644 plugins/outputs/http/http.go create mode 100644 plugins/outputs/http/http_test.go diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 5e74eb796..94f5149b4 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -11,6 +11,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/file" _ "github.com/influxdata/telegraf/plugins/outputs/graphite" _ "github.com/influxdata/telegraf/plugins/outputs/graylog" + _ "github.com/influxdata/telegraf/plugins/outputs/http" _ "github.com/influxdata/telegraf/plugins/outputs/influxdb" _ "github.com/influxdata/telegraf/plugins/outputs/instrumental" _ "github.com/influxdata/telegraf/plugins/outputs/kafka" diff --git a/plugins/outputs/http/README.md b/plugins/outputs/http/README.md new file mode 100644 index 000000000..b7dd75276 --- /dev/null +++ b/plugins/outputs/http/README.md @@ -0,0 +1,28 @@ +# HTTP Output Plugin + +This plugin writes to a HTTP Server using the `POST Method`. + +Data collected from telegraf is sent in the Request Body. + +### Configuration: + +```toml +# Send telegraf metrics to HTTP Server(s) +[[outputs.http]] + ## It requires a url name. + ## Will be transmitted telegraf metrics to the HTTP Server using the below URL. + ## Note that not support the HTTPS. + url = "http://127.0.0.1:8080/metric" + ## Configure dial timeout in seconds. Default : 3 + timeout = 3 + ## http_headers option can add a custom header to the request. + ## Content-Type is required http header in http plugin. + ## so content-type of HTTP specification (plain/text, application/json, etc...) must be filled out. + [outputs.http.headers] + Content-Type = "plain/text" + ## Data format to output. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "influx" +``` diff --git a/plugins/outputs/http/http.go b/plugins/outputs/http/http.go new file mode 100644 index 000000000..e78031b8c --- /dev/null +++ b/plugins/outputs/http/http.go @@ -0,0 +1,151 @@ +package http + +import ( + "bytes" + "fmt" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/serializers" + "io/ioutil" + "net/http" + "strings" + "time" +) + +var sampleConfig = ` + ## It requires a url name. + ## Will be transmitted telegraf metrics to the HTTP Server using the below URL. + ## Note that not support the HTTPS. + url = "http://127.0.0.1:8080/metric" + ## Configure dial timeout in seconds. Default : 3 + timeout = 3 + ## http_headers option can add a custom header to the request. + ## Content-Type is required http header in http plugin. + ## so content-type of HTTP specification (plain/text, application/json, etc...) must be filled out. + [outputs.http.headers] + Content-Type = "plain/text" + ## Data format to output. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "influx" +` + +const ( + POST = "POST" + + DEFAULT_TIME_OUT = 3 + + CONTENT_TYPE = "content-type" +) + +type Http struct { + // http required option + URL string `toml:"url"` + Headers map[string]string + + // Option with http default value + Timeout int `toml:"timeout"` + + client http.Client + serializer serializers.Serializer +} + +func (h *Http) SetSerializer(serializer serializers.Serializer) { + h.serializer = serializer +} + +// Connect to the Output +func (h *Http) Connect() error { + h.client = http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + }, + Timeout: time.Duration(h.Timeout) * time.Second, + } + + var isValid bool + + for k := range h.Headers { + if strings.ToLower(k) == CONTENT_TYPE { + isValid = true + } + } + + if !isValid { + return fmt.Errorf("E! httpHeader require content-type!") + } + + return nil +} + +// Close is not implemented. Because http.Client not provided connection close policy. Instead, uses the response.Body.Close() pattern. +func (h *Http) Close() error { + return nil +} + +// Description A plugin that can transmit metrics over HTTP +func (h *Http) Description() string { + return "A plugin that can transmit metrics over HTTP" +} + +// SampleConfig provides sample example for developer +func (h *Http) SampleConfig() string { + return sampleConfig +} + +// Writes metrics over HTTP POST +func (h *Http) Write(metrics []telegraf.Metric) error { + reqBody, err := h.serializer.SerializeBatch(metrics) + + if err != nil { + return fmt.Errorf("E! Error serializing some metrics: %s", err.Error()) + } + + if err := h.write(reqBody); err != nil { + return err + } + + return nil +} + +func (h *Http) write(reqBody []byte) error { + req, err := http.NewRequest(POST, h.URL, bytes.NewBuffer(reqBody)) + + for k, v := range h.Headers { + req.Header.Set(k, v) + } + + resp, err := h.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + _, err = ioutil.ReadAll(resp.Body) + + if err := h.isOk(resp, err); err != nil { + return err + } + + return nil +} + +func (h *Http) isOk(resp *http.Response, err error) error { + if resp == nil || err != nil { + return fmt.Errorf("E! %s request failed! %s.", h.URL, err.Error()) + } + + if resp.StatusCode < 200 || resp.StatusCode > 209 { + return fmt.Errorf("received bad status code, %d\n", resp.StatusCode) + } + + return nil +} + +func init() { + outputs.Add("http", func() telegraf.Output { + return &Http{ + Timeout: DEFAULT_TIME_OUT, + } + }) +} diff --git a/plugins/outputs/http/http_test.go b/plugins/outputs/http/http_test.go new file mode 100644 index 000000000..aa9c68eba --- /dev/null +++ b/plugins/outputs/http/http_test.go @@ -0,0 +1,244 @@ +package http + +import ( + "fmt" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/serializers/graphite" + "github.com/influxdata/telegraf/plugins/serializers/json" + "github.com/stretchr/testify/assert" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +var ( + cpuTags = map[string]string{ + "host": "localhost", + "cpu": "cpu0", + "datacenter": "us-west-2", + } + + cpuField = map[string]interface{}{ + "usage_idle": float64(91.5), + } + + memTags = map[string]string{ + "host": "localhost", + "cpu": "mem", + "datacenter": "us-west-2", + } + + memField = map[string]interface{}{ + "used": float64(91.5), + } + + count int +) + +type TestOkHandler struct { + T *testing.T + Expected []string +} + +// The handler gets a new variable each time it receives a request, so it fetches an expected string based on global variable. +func (h TestOkHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + actual, _ := ioutil.ReadAll(r.Body) + + assert.Equal(h.T, h.Expected[count], string(actual), fmt.Sprintf("%d Expected fail!", count)) + + count++ + + fmt.Fprint(w, "ok") +} + +type TestNotFoundHandler struct { +} + +func (h TestNotFoundHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + http.NotFound(w, r) +} + +func TestWriteAllInputMetric(t *testing.T) { + now := time.Now() + + server := httptest.NewServer(&TestOkHandler{ + T: t, + Expected: []string{ + fmt.Sprintf("telegraf.cpu0.us-west-2.localhost.cpu.usage_idle 91.5 %d\ntelegraf.mem.us-west-2.localhost.mem.used 91.5 %d\n", now.Unix(), now.Unix()), + }, + }) + defer server.Close() + defer resetCount() + + m1, _ := metric.New("cpu", cpuTags, cpuField, now) + m2, _ := metric.New("mem", memTags, memField, now) + metrics := []telegraf.Metric{m1, m2} + + http := &Http{ + URL: server.URL, + Headers: map[string]string{ + "Content-Type": "plain/text", + }, + } + + http.SetSerializer(&graphite.GraphiteSerializer{ + Prefix: "telegraf", + Template: "tags.measurement.field", + }) + + http.Connect() + err := http.Write(metrics) + + assert.NoError(t, err) +} + +func TestHttpWriteWithUnexpected404StatusCode(t *testing.T) { + now := time.Now() + + server := httptest.NewServer(&TestNotFoundHandler{}) + defer server.Close() + + m, _ := metric.New("cpu", cpuTags, cpuField, now) + metrics := []telegraf.Metric{m} + + http := &Http{ + URL: server.URL, + Headers: map[string]string{ + "Content-Type": "application/json", + }, + } + + http.SetSerializer(&graphite.GraphiteSerializer{ + Prefix: "telegraf", + Template: "tags.measurement.field", + }) + + http.Connect() + err := http.Write(metrics) + + assert.Error(t, err) +} + +func TestHttpWriteWithExpected404StatusCode(t *testing.T) { + now := time.Now() + + server := httptest.NewServer(&TestNotFoundHandler{}) + defer server.Close() + + m, _ := metric.New("cpu", cpuTags, cpuField, now) + metrics := []telegraf.Metric{m} + + http := &Http{ + URL: server.URL, + Headers: map[string]string{ + "Content-Type": "application/json", + }, + } + + http.SetSerializer(&graphite.GraphiteSerializer{ + Prefix: "telegraf", + Template: "tags.measurement.field", + }) + + http.Connect() + err := http.Write(metrics) + + assert.Error(t, err) +} + +func TestHttpWriteWithIncorrectServerPort(t *testing.T) { + now := time.Now() + + m, _ := metric.New("cpu", cpuTags, cpuField, now) + metrics := []telegraf.Metric{m} + + http := &Http{ + URL: "http://127.0.0.1:56879/incorrect/url", + Headers: map[string]string{ + "Content-Type": "application/json", + }, + } + + http.SetSerializer(&graphite.GraphiteSerializer{ + Prefix: "telegraf", + Template: "tags.measurement.field", + }) + + http.Connect() + err := http.Write(metrics) + + assert.Error(t, err) +} + +func TestHttpWriteWithHttpSerializer(t *testing.T) { + now := time.Now() + + server := httptest.NewServer(&TestOkHandler{ + T: t, + Expected: []string{ + fmt.Sprintf("{\"metrics\":[{\"fields\":{\"usage_idle\":91.5},\"name\":\"cpu\",\"tags\":{\"cpu\":\"cpu0\",\"datacenter\":\"us-west-2\",\"host\":\"localhost\"},\"timestamp\":%d},{\"fields\":{\"usage_idle\":91.5},\"name\":\"cpu\",\"tags\":{\"cpu\":\"cpu0\",\"datacenter\":\"us-west-2\",\"host\":\"localhost\"},\"timestamp\":%d}]}", now.Unix(), now.Unix()), + }, + }) + + defer server.Close() + + http := &Http{ + URL: server.URL, + Headers: map[string]string{ + "Content-Type": "application/json", + }, + } + jsonSerializer, _ := json.NewSerializer(time.Second) + http.SetSerializer(jsonSerializer) + + m1, _ := metric.New("cpu", cpuTags, cpuField, now) + m2, _ := metric.New("cpu", cpuTags, cpuField, now) + metrics := []telegraf.Metric{m1, m2} + + http.Connect() + err := http.Write(metrics) + + assert.Nil(t, err) +} + +func TestHttpWithoutContentType(t *testing.T) { + http := &Http{ + URL: "http://127.0.0.1:56879/correct/url", + } + + err := http.Connect() + + assert.Error(t, err) +} + +func TestHttpWithContentType(t *testing.T) { + http := &Http{ + URL: "http://127.0.0.1:56879/correct/url", + Headers: map[string]string{ + "Content-Type": "application/json", + }, + } + + err := http.Connect() + + assert.Nil(t, err) +} + +func TestImplementedInterfaceFunction(t *testing.T) { + http := &Http{ + URL: "http://127.0.0.1:56879/incorrect/url", + Headers: map[string]string{ + "Content-Type": "application/json", + }, + } + + assert.NotNil(t, http.SampleConfig()) + assert.NotNil(t, http.Description()) +} + +func resetCount() { + count = 0 +}