http_listener input unit tests
This commit is contained in:
parent
097b1e09db
commit
c849b58de9
|
@ -30,7 +30,7 @@ continue sending logs to /var/log/telegraf/telegraf.log.
|
||||||
- [#1542](https://github.com/influxdata/telegraf/pull/1542): Add filestack webhook plugin.
|
- [#1542](https://github.com/influxdata/telegraf/pull/1542): Add filestack webhook plugin.
|
||||||
- [#1599](https://github.com/influxdata/telegraf/pull/1599): Add server hostname for each docker measurements.
|
- [#1599](https://github.com/influxdata/telegraf/pull/1599): Add server hostname for each docker measurements.
|
||||||
- [#1697](https://github.com/influxdata/telegraf/pull/1697): Add NATS output plugin.
|
- [#1697](https://github.com/influxdata/telegraf/pull/1697): Add NATS output plugin.
|
||||||
- [#1407](https://github.com/influxdata/telegraf/pull/1407): HTTP service listener input plugin.
|
- [#1407](https://github.com/influxdata/telegraf/pull/1407) & [#1915](https://github.com/influxdata/telegraf/pull/1915): HTTP service listener input plugin.
|
||||||
- [#1699](https://github.com/influxdata/telegraf/pull/1699): Add database blacklist option for Postgresql
|
- [#1699](https://github.com/influxdata/telegraf/pull/1699): Add database blacklist option for Postgresql
|
||||||
- [#1791](https://github.com/influxdata/telegraf/pull/1791): Add Docker container state metrics to Docker input plugin output
|
- [#1791](https://github.com/influxdata/telegraf/pull/1791): Add Docker container state metrics to Docker input plugin output
|
||||||
- [#1755](https://github.com/influxdata/telegraf/issues/1755): Add support to SNMP for IP & MAC address conversion.
|
- [#1755](https://github.com/influxdata/telegraf/issues/1755): Add support to SNMP for IP & MAC address conversion.
|
||||||
|
|
|
@ -14,7 +14,7 @@ type Buffer struct {
|
||||||
// total metrics added
|
// total metrics added
|
||||||
total int
|
total int
|
||||||
|
|
||||||
sync.Mutex
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBuffer returns a Buffer
|
// NewBuffer returns a Buffer
|
||||||
|
@ -65,13 +65,13 @@ func (b *Buffer) Add(metrics ...telegraf.Metric) {
|
||||||
// the batch will be of maximum length batchSize. It can be less than batchSize,
|
// the batch will be of maximum length batchSize. It can be less than batchSize,
|
||||||
// if the length of Buffer is less than batchSize.
|
// if the length of Buffer is less than batchSize.
|
||||||
func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
|
func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
|
||||||
b.Lock()
|
b.mu.Lock()
|
||||||
n := min(len(b.buf), batchSize)
|
n := min(len(b.buf), batchSize)
|
||||||
out := make([]telegraf.Metric, n)
|
out := make([]telegraf.Metric, n)
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
out[i] = <-b.buf
|
out[i] = <-b.buf
|
||||||
}
|
}
|
||||||
b.Unlock()
|
b.mu.Unlock()
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,7 @@ const (
|
||||||
DEFAULT_MAX_LINE_SIZE = 64 * 1024
|
DEFAULT_MAX_LINE_SIZE = 64 * 1024
|
||||||
)
|
)
|
||||||
|
|
||||||
type HttpListener struct {
|
type HTTPListener struct {
|
||||||
ServiceAddress string
|
ServiceAddress string
|
||||||
ReadTimeout internal.Duration
|
ReadTimeout internal.Duration
|
||||||
WriteTimeout internal.Duration
|
WriteTimeout internal.Duration
|
||||||
|
@ -63,24 +63,23 @@ const sampleConfig = `
|
||||||
max_line_size = 0
|
max_line_size = 0
|
||||||
`
|
`
|
||||||
|
|
||||||
func (h *HttpListener) SampleConfig() string {
|
func (h *HTTPListener) SampleConfig() string {
|
||||||
return sampleConfig
|
return sampleConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *HttpListener) Description() string {
|
func (h *HTTPListener) Description() string {
|
||||||
return "Influx HTTP write listener"
|
return "Influx HTTP write listener"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *HttpListener) Gather(_ telegraf.Accumulator) error {
|
func (h *HTTPListener) Gather(_ telegraf.Accumulator) error {
|
||||||
log.Printf("D! The http_listener has created %d buffers", h.pool.ncreated())
|
log.Printf("D! The http_listener has created %d buffers", h.pool.ncreated())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start starts the http listener service.
|
// Start starts the http listener service.
|
||||||
func (h *HttpListener) Start(acc telegraf.Accumulator) error {
|
func (h *HTTPListener) Start(acc telegraf.Accumulator) error {
|
||||||
h.mu.Lock()
|
h.mu.Lock()
|
||||||
defer h.mu.Unlock()
|
defer h.mu.Unlock()
|
||||||
h.parser = influx.InfluxParser{}
|
|
||||||
|
|
||||||
if h.MaxBodySize == 0 {
|
if h.MaxBodySize == 0 {
|
||||||
h.MaxBodySize = DEFAULT_MAX_BODY_SIZE
|
h.MaxBodySize = DEFAULT_MAX_BODY_SIZE
|
||||||
|
@ -110,7 +109,7 @@ func (h *HttpListener) Start(acc telegraf.Accumulator) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop cleans up all resources
|
// Stop cleans up all resources
|
||||||
func (h *HttpListener) Stop() {
|
func (h *HTTPListener) Stop() {
|
||||||
h.mu.Lock()
|
h.mu.Lock()
|
||||||
defer h.mu.Unlock()
|
defer h.mu.Unlock()
|
||||||
|
|
||||||
|
@ -124,7 +123,7 @@ func (h *HttpListener) Stop() {
|
||||||
// like server.Serve, httpListen will always return a non-nil error, for this
|
// like server.Serve, httpListen will always return a non-nil error, for this
|
||||||
// reason, the error returned should probably be ignored.
|
// reason, the error returned should probably be ignored.
|
||||||
// see https://golang.org/pkg/net/http/#Server.Serve
|
// see https://golang.org/pkg/net/http/#Server.Serve
|
||||||
func (h *HttpListener) httpListen() error {
|
func (h *HTTPListener) httpListen() error {
|
||||||
if h.ReadTimeout.Duration < time.Second {
|
if h.ReadTimeout.Duration < time.Second {
|
||||||
h.ReadTimeout.Duration = time.Second * 10
|
h.ReadTimeout.Duration = time.Second * 10
|
||||||
}
|
}
|
||||||
|
@ -141,7 +140,7 @@ func (h *HttpListener) httpListen() error {
|
||||||
return server.Serve(h.listener)
|
return server.Serve(h.listener)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
|
func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
|
||||||
switch req.URL.Path {
|
switch req.URL.Path {
|
||||||
case "/write":
|
case "/write":
|
||||||
h.serveWrite(res, req)
|
h.serveWrite(res, req)
|
||||||
|
@ -161,7 +160,7 @@ func (h *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *HttpListener) serveWrite(res http.ResponseWriter, req *http.Request) {
|
func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
|
||||||
// Check that the content length is not too large for us to handle.
|
// Check that the content length is not too large for us to handle.
|
||||||
if req.ContentLength > h.MaxBodySize {
|
if req.ContentLength > h.MaxBodySize {
|
||||||
tooLarge(res)
|
tooLarge(res)
|
||||||
|
@ -171,8 +170,9 @@ func (h *HttpListener) serveWrite(res http.ResponseWriter, req *http.Request) {
|
||||||
|
|
||||||
// Handle gzip request bodies
|
// Handle gzip request bodies
|
||||||
body := req.Body
|
body := req.Body
|
||||||
|
var err error
|
||||||
if req.Header.Get("Content-Encoding") == "gzip" {
|
if req.Header.Get("Content-Encoding") == "gzip" {
|
||||||
body, err := gzip.NewReader(req.Body)
|
body, err = gzip.NewReader(req.Body)
|
||||||
defer body.Close()
|
defer body.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("E! " + err.Error())
|
log.Println("E! " + err.Error())
|
||||||
|
@ -185,7 +185,7 @@ func (h *HttpListener) serveWrite(res http.ResponseWriter, req *http.Request) {
|
||||||
var return400 bool
|
var return400 bool
|
||||||
var hangingBytes bool
|
var hangingBytes bool
|
||||||
buf := h.pool.get()
|
buf := h.pool.get()
|
||||||
defer func() { h.pool.put(buf) }()
|
defer h.pool.put(buf)
|
||||||
bufStart := 0
|
bufStart := 0
|
||||||
for {
|
for {
|
||||||
n, err := io.ReadFull(body, buf[bufStart:])
|
n, err := io.ReadFull(body, buf[bufStart:])
|
||||||
|
@ -261,7 +261,7 @@ func (h *HttpListener) serveWrite(res http.ResponseWriter, req *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *HttpListener) parse(b []byte, t time.Time) error {
|
func (h *HTTPListener) parse(b []byte, t time.Time) error {
|
||||||
metrics, err := h.parser.ParseWithDefaultTime(b, t)
|
metrics, err := h.parser.ParseWithDefaultTime(b, t)
|
||||||
|
|
||||||
for _, m := range metrics {
|
for _, m := range metrics {
|
||||||
|
@ -287,7 +287,7 @@ func badRequest(res http.ResponseWriter) {
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
inputs.Add("http_listener", func() telegraf.Input {
|
inputs.Add("http_listener", func() telegraf.Input {
|
||||||
return &HttpListener{
|
return &HTTPListener{
|
||||||
ServiceAddress: ":8186",
|
ServiceAddress: ":8186",
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
|
@ -2,6 +2,7 @@ package http_listener
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -26,15 +27,15 @@ cpu_load_short,host=server06 value=12.0 1422568543702900257
|
||||||
emptyMsg = ""
|
emptyMsg = ""
|
||||||
)
|
)
|
||||||
|
|
||||||
func newTestHttpListener() *HttpListener {
|
func newTestHTTPListener() *HTTPListener {
|
||||||
listener := &HttpListener{
|
listener := &HTTPListener{
|
||||||
ServiceAddress: ":8186",
|
ServiceAddress: ":8186",
|
||||||
}
|
}
|
||||||
return listener
|
return listener
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWriteHTTP(t *testing.T) {
|
func TestWriteHTTP(t *testing.T) {
|
||||||
listener := newTestHttpListener()
|
listener := newTestHTTPListener()
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
require.NoError(t, listener.Start(acc))
|
require.NoError(t, listener.Start(acc))
|
||||||
|
@ -81,7 +82,7 @@ func TestWriteHTTP(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWriteHTTPMaxLineSizeIncrease(t *testing.T) {
|
func TestWriteHTTPMaxLineSizeIncrease(t *testing.T) {
|
||||||
listener := &HttpListener{
|
listener := &HTTPListener{
|
||||||
ServiceAddress: ":8296",
|
ServiceAddress: ":8296",
|
||||||
MaxLineSize: 128 * 1000,
|
MaxLineSize: 128 * 1000,
|
||||||
}
|
}
|
||||||
|
@ -92,15 +93,121 @@ func TestWriteHTTPMaxLineSizeIncrease(t *testing.T) {
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 25)
|
time.Sleep(time.Millisecond * 25)
|
||||||
|
|
||||||
// Post a gigantic metric to the listener and verify that an error is returned:
|
// Post a gigantic metric to the listener and verify that it writes OK this time:
|
||||||
resp, err := http.Post("http://localhost:8296/write?db=mydb", "", bytes.NewBuffer([]byte(hugeMetric)))
|
resp, err := http.Post("http://localhost:8296/write?db=mydb", "", bytes.NewBuffer([]byte(hugeMetric)))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.EqualValues(t, 204, resp.StatusCode)
|
require.EqualValues(t, 204, resp.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWriteHTTPVerySmallMaxBody(t *testing.T) {
|
||||||
|
listener := &HTTPListener{
|
||||||
|
ServiceAddress: ":8297",
|
||||||
|
MaxBodySize: 4096,
|
||||||
|
}
|
||||||
|
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
require.NoError(t, listener.Start(acc))
|
||||||
|
defer listener.Stop()
|
||||||
|
|
||||||
|
time.Sleep(time.Millisecond * 25)
|
||||||
|
|
||||||
|
resp, err := http.Post("http://localhost:8297/write", "", bytes.NewBuffer([]byte(hugeMetric)))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.EqualValues(t, 413, resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWriteHTTPVerySmallMaxLineSize(t *testing.T) {
|
||||||
|
listener := &HTTPListener{
|
||||||
|
ServiceAddress: ":8298",
|
||||||
|
MaxLineSize: 70,
|
||||||
|
}
|
||||||
|
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
require.NoError(t, listener.Start(acc))
|
||||||
|
defer listener.Stop()
|
||||||
|
|
||||||
|
time.Sleep(time.Millisecond * 25)
|
||||||
|
|
||||||
|
resp, err := http.Post("http://localhost:8298/write", "", bytes.NewBuffer([]byte(testMsgs)))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.EqualValues(t, 204, resp.StatusCode)
|
||||||
|
|
||||||
|
time.Sleep(time.Millisecond * 15)
|
||||||
|
hostTags := []string{"server02", "server03",
|
||||||
|
"server04", "server05", "server06"}
|
||||||
|
for _, hostTag := range hostTags {
|
||||||
|
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
||||||
|
map[string]interface{}{"value": float64(12)},
|
||||||
|
map[string]string{"host": hostTag},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWriteHTTPLargeLinesSkipped(t *testing.T) {
|
||||||
|
listener := &HTTPListener{
|
||||||
|
ServiceAddress: ":8300",
|
||||||
|
MaxLineSize: 100,
|
||||||
|
}
|
||||||
|
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
require.NoError(t, listener.Start(acc))
|
||||||
|
defer listener.Stop()
|
||||||
|
|
||||||
|
time.Sleep(time.Millisecond * 25)
|
||||||
|
|
||||||
|
resp, err := http.Post("http://localhost:8300/write", "", bytes.NewBuffer([]byte(hugeMetric+testMsgs)))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.EqualValues(t, 400, resp.StatusCode)
|
||||||
|
|
||||||
|
time.Sleep(time.Millisecond * 15)
|
||||||
|
hostTags := []string{"server02", "server03",
|
||||||
|
"server04", "server05", "server06"}
|
||||||
|
for _, hostTag := range hostTags {
|
||||||
|
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
||||||
|
map[string]interface{}{"value": float64(12)},
|
||||||
|
map[string]string{"host": hostTag},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// test that writing gzipped data works
|
||||||
|
func TestWriteHTTPGzippedData(t *testing.T) {
|
||||||
|
listener := &HTTPListener{
|
||||||
|
ServiceAddress: ":8299",
|
||||||
|
}
|
||||||
|
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
require.NoError(t, listener.Start(acc))
|
||||||
|
defer listener.Stop()
|
||||||
|
|
||||||
|
time.Sleep(time.Millisecond * 25)
|
||||||
|
|
||||||
|
data, err := ioutil.ReadFile("./testdata/testmsgs.gz")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
req, err := http.NewRequest("POST", "http://localhost:8299/write", bytes.NewBuffer(data))
|
||||||
|
require.NoError(t, err)
|
||||||
|
req.Header.Set("Content-Encoding", "gzip")
|
||||||
|
|
||||||
|
client := &http.Client{}
|
||||||
|
resp, err := client.Do(req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.EqualValues(t, 204, resp.StatusCode)
|
||||||
|
|
||||||
|
time.Sleep(time.Millisecond * 50)
|
||||||
|
hostTags := []string{"server02", "server03",
|
||||||
|
"server04", "server05", "server06"}
|
||||||
|
for _, hostTag := range hostTags {
|
||||||
|
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
||||||
|
map[string]interface{}{"value": float64(12)},
|
||||||
|
map[string]string{"host": hostTag},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// writes 25,000 metrics to the listener with 10 different writers
|
// writes 25,000 metrics to the listener with 10 different writers
|
||||||
func TestWriteHTTPHighTraffic(t *testing.T) {
|
func TestWriteHTTPHighTraffic(t *testing.T) {
|
||||||
listener := &HttpListener{ServiceAddress: ":8286"}
|
listener := &HTTPListener{ServiceAddress: ":8286"}
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
require.NoError(t, listener.Start(acc))
|
require.NoError(t, listener.Start(acc))
|
||||||
|
@ -123,14 +230,14 @@ func TestWriteHTTPHighTraffic(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
time.Sleep(time.Millisecond * 50)
|
time.Sleep(time.Millisecond * 250)
|
||||||
listener.Gather(acc)
|
listener.Gather(acc)
|
||||||
|
|
||||||
require.Equal(t, int64(25000), int64(acc.NMetrics()))
|
require.Equal(t, int64(25000), int64(acc.NMetrics()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestReceive404ForInvalidEndpoint(t *testing.T) {
|
func TestReceive404ForInvalidEndpoint(t *testing.T) {
|
||||||
listener := newTestHttpListener()
|
listener := newTestHTTPListener()
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
require.NoError(t, listener.Start(acc))
|
require.NoError(t, listener.Start(acc))
|
||||||
|
@ -147,7 +254,7 @@ func TestReceive404ForInvalidEndpoint(t *testing.T) {
|
||||||
func TestWriteHTTPInvalid(t *testing.T) {
|
func TestWriteHTTPInvalid(t *testing.T) {
|
||||||
time.Sleep(time.Millisecond * 250)
|
time.Sleep(time.Millisecond * 250)
|
||||||
|
|
||||||
listener := newTestHttpListener()
|
listener := newTestHTTPListener()
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
require.NoError(t, listener.Start(acc))
|
require.NoError(t, listener.Start(acc))
|
||||||
|
@ -164,7 +271,7 @@ func TestWriteHTTPInvalid(t *testing.T) {
|
||||||
func TestWriteHTTPEmpty(t *testing.T) {
|
func TestWriteHTTPEmpty(t *testing.T) {
|
||||||
time.Sleep(time.Millisecond * 250)
|
time.Sleep(time.Millisecond * 250)
|
||||||
|
|
||||||
listener := newTestHttpListener()
|
listener := newTestHTTPListener()
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
require.NoError(t, listener.Start(acc))
|
require.NoError(t, listener.Start(acc))
|
||||||
|
@ -181,7 +288,7 @@ func TestWriteHTTPEmpty(t *testing.T) {
|
||||||
func TestQueryAndPingHTTP(t *testing.T) {
|
func TestQueryAndPingHTTP(t *testing.T) {
|
||||||
time.Sleep(time.Millisecond * 250)
|
time.Sleep(time.Millisecond * 250)
|
||||||
|
|
||||||
listener := newTestHttpListener()
|
listener := newTestHTTPListener()
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
require.NoError(t, listener.Start(acc))
|
require.NoError(t, listener.Start(acc))
|
||||||
|
|
Binary file not shown.
Loading…
Reference in New Issue