Add HTTP output plugin (#2491)
This commit is contained in:
		
							parent
							
								
									d19a33dd6f
								
							
						
					
					
						commit
						190a4128c5
					
				|  | @ -11,6 +11,7 @@ import ( | |||
| 	_ "github.com/influxdata/telegraf/plugins/outputs/file" | ||||
| 	_ "github.com/influxdata/telegraf/plugins/outputs/graphite" | ||||
| 	_ "github.com/influxdata/telegraf/plugins/outputs/graylog" | ||||
| 	_ "github.com/influxdata/telegraf/plugins/outputs/http" | ||||
| 	_ "github.com/influxdata/telegraf/plugins/outputs/influxdb" | ||||
| 	_ "github.com/influxdata/telegraf/plugins/outputs/instrumental" | ||||
| 	_ "github.com/influxdata/telegraf/plugins/outputs/kafka" | ||||
|  |  | |||
|  | @ -0,0 +1,28 @@ | |||
| # HTTP Output Plugin | ||||
| 
 | ||||
| This plugin writes to a HTTP Server using the `POST Method`. | ||||
| 
 | ||||
| Data collected from telegraf is sent in the Request Body. | ||||
| 
 | ||||
| ### Configuration: | ||||
| 
 | ||||
| ```toml | ||||
| # Send telegraf metrics to HTTP Server(s) | ||||
| [[outputs.http]] | ||||
|   ## It requires a url name. | ||||
|   ## Will be transmitted telegraf metrics to the HTTP Server using the below URL. | ||||
|   ## Note that not support the HTTPS. | ||||
|   url = "http://127.0.0.1:8080/metric" | ||||
|   ## Configure dial timeout in seconds. Default : 3 | ||||
|   timeout = 3 | ||||
|   ## http_headers option can add a custom header to the request. | ||||
|   ## Content-Type is required http header in http plugin. | ||||
|   ## so content-type of HTTP specification (plain/text, application/json, etc...) must be filled out. | ||||
|   [outputs.http.headers] | ||||
|     Content-Type = "plain/text" | ||||
|   ## Data format to output. | ||||
|   ## Each data format has it's own unique set of configuration options, read | ||||
|   ## more about them here: | ||||
|   ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md | ||||
|   data_format = "influx" | ||||
| ``` | ||||
|  | @ -0,0 +1,151 @@ | |||
| package http | ||||
| 
 | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"fmt" | ||||
| 	"github.com/influxdata/telegraf" | ||||
| 	"github.com/influxdata/telegraf/plugins/outputs" | ||||
| 	"github.com/influxdata/telegraf/plugins/serializers" | ||||
| 	"io/ioutil" | ||||
| 	"net/http" | ||||
| 	"strings" | ||||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| var sampleConfig = ` | ||||
|   ## It requires a url name. | ||||
|   ## Will be transmitted telegraf metrics to the HTTP Server using the below URL. | ||||
|   ## Note that not support the HTTPS. | ||||
|   url = "http://127.0.0.1:8080/metric" | ||||
|   ## Configure dial timeout in seconds. Default : 3 | ||||
|   timeout = 3 | ||||
|   ## http_headers option can add a custom header to the request. | ||||
|   ## Content-Type is required http header in http plugin. | ||||
|   ## so content-type of HTTP specification (plain/text, application/json, etc...) must be filled out. | ||||
|   [outputs.http.headers] | ||||
|     Content-Type = "plain/text" | ||||
|   ## Data format to output. | ||||
|   ## Each data format has it's own unique set of configuration options, read | ||||
|   ## more about them here: | ||||
|   ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
 | ||||
|   data_format = "influx" | ||||
| ` | ||||
| 
 | ||||
| const ( | ||||
| 	POST = "POST" | ||||
| 
 | ||||
| 	DEFAULT_TIME_OUT = 3 | ||||
| 
 | ||||
| 	CONTENT_TYPE = "content-type" | ||||
| ) | ||||
| 
 | ||||
| type Http struct { | ||||
| 	// http required option
 | ||||
| 	URL     string `toml:"url"` | ||||
| 	Headers map[string]string | ||||
| 
 | ||||
| 	// Option with http default value
 | ||||
| 	Timeout int `toml:"timeout"` | ||||
| 
 | ||||
| 	client     http.Client | ||||
| 	serializer serializers.Serializer | ||||
| } | ||||
| 
 | ||||
| func (h *Http) SetSerializer(serializer serializers.Serializer) { | ||||
| 	h.serializer = serializer | ||||
| } | ||||
| 
 | ||||
| // Connect to the Output
 | ||||
| func (h *Http) Connect() error { | ||||
| 	h.client = http.Client{ | ||||
| 		Transport: &http.Transport{ | ||||
| 			Proxy: http.ProxyFromEnvironment, | ||||
| 		}, | ||||
| 		Timeout: time.Duration(h.Timeout) * time.Second, | ||||
| 	} | ||||
| 
 | ||||
| 	var isValid bool | ||||
| 
 | ||||
| 	for k := range h.Headers { | ||||
| 		if strings.ToLower(k) == CONTENT_TYPE { | ||||
| 			isValid = true | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if !isValid { | ||||
| 		return fmt.Errorf("E! httpHeader require content-type!") | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // Close is not implemented. Because http.Client not provided connection close policy. Instead, uses the response.Body.Close() pattern.
 | ||||
| func (h *Http) Close() error { | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // Description A plugin that can transmit metrics over HTTP
 | ||||
| func (h *Http) Description() string { | ||||
| 	return "A plugin that can transmit metrics over HTTP" | ||||
| } | ||||
| 
 | ||||
| // SampleConfig provides sample example for developer
 | ||||
| func (h *Http) SampleConfig() string { | ||||
| 	return sampleConfig | ||||
| } | ||||
| 
 | ||||
| // Writes metrics over HTTP POST
 | ||||
| func (h *Http) Write(metrics []telegraf.Metric) error { | ||||
| 	reqBody, err := h.serializer.SerializeBatch(metrics) | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("E! Error serializing some metrics: %s", err.Error()) | ||||
| 	} | ||||
| 
 | ||||
| 	if err := h.write(reqBody); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (h *Http) write(reqBody []byte) error { | ||||
| 	req, err := http.NewRequest(POST, h.URL, bytes.NewBuffer(reqBody)) | ||||
| 
 | ||||
| 	for k, v := range h.Headers { | ||||
| 		req.Header.Set(k, v) | ||||
| 	} | ||||
| 
 | ||||
| 	resp, err := h.client.Do(req) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	defer resp.Body.Close() | ||||
| 	_, err = ioutil.ReadAll(resp.Body) | ||||
| 
 | ||||
| 	if err := h.isOk(resp, err); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (h *Http) isOk(resp *http.Response, err error) error { | ||||
| 	if resp == nil || err != nil { | ||||
| 		return fmt.Errorf("E! %s request failed! %s.", h.URL, err.Error()) | ||||
| 	} | ||||
| 
 | ||||
| 	if resp.StatusCode < 200 || resp.StatusCode > 209 { | ||||
| 		return fmt.Errorf("received bad status code, %d\n", resp.StatusCode) | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func init() { | ||||
| 	outputs.Add("http", func() telegraf.Output { | ||||
| 		return &Http{ | ||||
| 			Timeout: DEFAULT_TIME_OUT, | ||||
| 		} | ||||
| 	}) | ||||
| } | ||||
|  | @ -0,0 +1,244 @@ | |||
| package http | ||||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"github.com/influxdata/telegraf" | ||||
| 	"github.com/influxdata/telegraf/metric" | ||||
| 	"github.com/influxdata/telegraf/plugins/serializers/graphite" | ||||
| 	"github.com/influxdata/telegraf/plugins/serializers/json" | ||||
| 	"github.com/stretchr/testify/assert" | ||||
| 	"io/ioutil" | ||||
| 	"net/http" | ||||
| 	"net/http/httptest" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| var ( | ||||
| 	cpuTags = map[string]string{ | ||||
| 		"host":       "localhost", | ||||
| 		"cpu":        "cpu0", | ||||
| 		"datacenter": "us-west-2", | ||||
| 	} | ||||
| 
 | ||||
| 	cpuField = map[string]interface{}{ | ||||
| 		"usage_idle": float64(91.5), | ||||
| 	} | ||||
| 
 | ||||
| 	memTags = map[string]string{ | ||||
| 		"host":       "localhost", | ||||
| 		"cpu":        "mem", | ||||
| 		"datacenter": "us-west-2", | ||||
| 	} | ||||
| 
 | ||||
| 	memField = map[string]interface{}{ | ||||
| 		"used": float64(91.5), | ||||
| 	} | ||||
| 
 | ||||
| 	count int | ||||
| ) | ||||
| 
 | ||||
| type TestOkHandler struct { | ||||
| 	T        *testing.T | ||||
| 	Expected []string | ||||
| } | ||||
| 
 | ||||
| // The handler gets a new variable each time it receives a request, so it fetches an expected string based on global variable.
 | ||||
| func (h TestOkHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||||
| 	actual, _ := ioutil.ReadAll(r.Body) | ||||
| 
 | ||||
| 	assert.Equal(h.T, h.Expected[count], string(actual), fmt.Sprintf("%d Expected fail!", count)) | ||||
| 
 | ||||
| 	count++ | ||||
| 
 | ||||
| 	fmt.Fprint(w, "ok") | ||||
| } | ||||
| 
 | ||||
| type TestNotFoundHandler struct { | ||||
| } | ||||
| 
 | ||||
| func (h TestNotFoundHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||||
| 	http.NotFound(w, r) | ||||
| } | ||||
| 
 | ||||
| func TestWriteAllInputMetric(t *testing.T) { | ||||
| 	now := time.Now() | ||||
| 
 | ||||
| 	server := httptest.NewServer(&TestOkHandler{ | ||||
| 		T: t, | ||||
| 		Expected: []string{ | ||||
| 			fmt.Sprintf("telegraf.cpu0.us-west-2.localhost.cpu.usage_idle 91.5 %d\ntelegraf.mem.us-west-2.localhost.mem.used 91.5 %d\n", now.Unix(), now.Unix()), | ||||
| 		}, | ||||
| 	}) | ||||
| 	defer server.Close() | ||||
| 	defer resetCount() | ||||
| 
 | ||||
| 	m1, _ := metric.New("cpu", cpuTags, cpuField, now) | ||||
| 	m2, _ := metric.New("mem", memTags, memField, now) | ||||
| 	metrics := []telegraf.Metric{m1, m2} | ||||
| 
 | ||||
| 	http := &Http{ | ||||
| 		URL: server.URL, | ||||
| 		Headers: map[string]string{ | ||||
| 			"Content-Type": "plain/text", | ||||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
| 	http.SetSerializer(&graphite.GraphiteSerializer{ | ||||
| 		Prefix:   "telegraf", | ||||
| 		Template: "tags.measurement.field", | ||||
| 	}) | ||||
| 
 | ||||
| 	http.Connect() | ||||
| 	err := http.Write(metrics) | ||||
| 
 | ||||
| 	assert.NoError(t, err) | ||||
| } | ||||
| 
 | ||||
| func TestHttpWriteWithUnexpected404StatusCode(t *testing.T) { | ||||
| 	now := time.Now() | ||||
| 
 | ||||
| 	server := httptest.NewServer(&TestNotFoundHandler{}) | ||||
| 	defer server.Close() | ||||
| 
 | ||||
| 	m, _ := metric.New("cpu", cpuTags, cpuField, now) | ||||
| 	metrics := []telegraf.Metric{m} | ||||
| 
 | ||||
| 	http := &Http{ | ||||
| 		URL: server.URL, | ||||
| 		Headers: map[string]string{ | ||||
| 			"Content-Type": "application/json", | ||||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
| 	http.SetSerializer(&graphite.GraphiteSerializer{ | ||||
| 		Prefix:   "telegraf", | ||||
| 		Template: "tags.measurement.field", | ||||
| 	}) | ||||
| 
 | ||||
| 	http.Connect() | ||||
| 	err := http.Write(metrics) | ||||
| 
 | ||||
| 	assert.Error(t, err) | ||||
| } | ||||
| 
 | ||||
| func TestHttpWriteWithExpected404StatusCode(t *testing.T) { | ||||
| 	now := time.Now() | ||||
| 
 | ||||
| 	server := httptest.NewServer(&TestNotFoundHandler{}) | ||||
| 	defer server.Close() | ||||
| 
 | ||||
| 	m, _ := metric.New("cpu", cpuTags, cpuField, now) | ||||
| 	metrics := []telegraf.Metric{m} | ||||
| 
 | ||||
| 	http := &Http{ | ||||
| 		URL: server.URL, | ||||
| 		Headers: map[string]string{ | ||||
| 			"Content-Type": "application/json", | ||||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
| 	http.SetSerializer(&graphite.GraphiteSerializer{ | ||||
| 		Prefix:   "telegraf", | ||||
| 		Template: "tags.measurement.field", | ||||
| 	}) | ||||
| 
 | ||||
| 	http.Connect() | ||||
| 	err := http.Write(metrics) | ||||
| 
 | ||||
| 	assert.Error(t, err) | ||||
| } | ||||
| 
 | ||||
| func TestHttpWriteWithIncorrectServerPort(t *testing.T) { | ||||
| 	now := time.Now() | ||||
| 
 | ||||
| 	m, _ := metric.New("cpu", cpuTags, cpuField, now) | ||||
| 	metrics := []telegraf.Metric{m} | ||||
| 
 | ||||
| 	http := &Http{ | ||||
| 		URL: "http://127.0.0.1:56879/incorrect/url", | ||||
| 		Headers: map[string]string{ | ||||
| 			"Content-Type": "application/json", | ||||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
| 	http.SetSerializer(&graphite.GraphiteSerializer{ | ||||
| 		Prefix:   "telegraf", | ||||
| 		Template: "tags.measurement.field", | ||||
| 	}) | ||||
| 
 | ||||
| 	http.Connect() | ||||
| 	err := http.Write(metrics) | ||||
| 
 | ||||
| 	assert.Error(t, err) | ||||
| } | ||||
| 
 | ||||
| func TestHttpWriteWithHttpSerializer(t *testing.T) { | ||||
| 	now := time.Now() | ||||
| 
 | ||||
| 	server := httptest.NewServer(&TestOkHandler{ | ||||
| 		T: t, | ||||
| 		Expected: []string{ | ||||
| 			fmt.Sprintf("{\"metrics\":[{\"fields\":{\"usage_idle\":91.5},\"name\":\"cpu\",\"tags\":{\"cpu\":\"cpu0\",\"datacenter\":\"us-west-2\",\"host\":\"localhost\"},\"timestamp\":%d},{\"fields\":{\"usage_idle\":91.5},\"name\":\"cpu\",\"tags\":{\"cpu\":\"cpu0\",\"datacenter\":\"us-west-2\",\"host\":\"localhost\"},\"timestamp\":%d}]}", now.Unix(), now.Unix()), | ||||
| 		}, | ||||
| 	}) | ||||
| 
 | ||||
| 	defer server.Close() | ||||
| 
 | ||||
| 	http := &Http{ | ||||
| 		URL: server.URL, | ||||
| 		Headers: map[string]string{ | ||||
| 			"Content-Type": "application/json", | ||||
| 		}, | ||||
| 	} | ||||
| 	jsonSerializer, _ := json.NewSerializer(time.Second) | ||||
| 	http.SetSerializer(jsonSerializer) | ||||
| 
 | ||||
| 	m1, _ := metric.New("cpu", cpuTags, cpuField, now) | ||||
| 	m2, _ := metric.New("cpu", cpuTags, cpuField, now) | ||||
| 	metrics := []telegraf.Metric{m1, m2} | ||||
| 
 | ||||
| 	http.Connect() | ||||
| 	err := http.Write(metrics) | ||||
| 
 | ||||
| 	assert.Nil(t, err) | ||||
| } | ||||
| 
 | ||||
| func TestHttpWithoutContentType(t *testing.T) { | ||||
| 	http := &Http{ | ||||
| 		URL: "http://127.0.0.1:56879/correct/url", | ||||
| 	} | ||||
| 
 | ||||
| 	err := http.Connect() | ||||
| 
 | ||||
| 	assert.Error(t, err) | ||||
| } | ||||
| 
 | ||||
| func TestHttpWithContentType(t *testing.T) { | ||||
| 	http := &Http{ | ||||
| 		URL: "http://127.0.0.1:56879/correct/url", | ||||
| 		Headers: map[string]string{ | ||||
| 			"Content-Type": "application/json", | ||||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
| 	err := http.Connect() | ||||
| 
 | ||||
| 	assert.Nil(t, err) | ||||
| } | ||||
| 
 | ||||
| func TestImplementedInterfaceFunction(t *testing.T) { | ||||
| 	http := &Http{ | ||||
| 		URL: "http://127.0.0.1:56879/incorrect/url", | ||||
| 		Headers: map[string]string{ | ||||
| 			"Content-Type": "application/json", | ||||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
| 	assert.NotNil(t, http.SampleConfig()) | ||||
| 	assert.NotNil(t, http.Description()) | ||||
| } | ||||
| 
 | ||||
| func resetCount() { | ||||
| 	count = 0 | ||||
| } | ||||
		Loading…
	
		Reference in New Issue