diff --git a/plugins/inputs/http_listener/http_listener.go b/plugins/inputs/http_listener/http_listener.go index 0f426f809..f0ad5752e 100644 --- a/plugins/inputs/http_listener/http_listener.go +++ b/plugins/inputs/http_listener/http_listener.go @@ -35,6 +35,7 @@ type HTTPListener struct { WriteTimeout internal.Duration MaxBodySize int64 MaxLineSize int + Port int mu sync.Mutex wg sync.WaitGroup @@ -124,6 +125,7 @@ func (h *HTTPListener) Start(acc telegraf.Accumulator) error { return err } h.listener = listener + h.Port = listener.Addr().(*net.TCPAddr).Port h.wg.Add(1) go func() { diff --git a/plugins/inputs/http_listener/http_listener_test.go b/plugins/inputs/http_listener/http_listener_test.go index 7e6fbc8ab..41c0e9db8 100644 --- a/plugins/inputs/http_listener/http_listener_test.go +++ b/plugins/inputs/http_listener/http_listener_test.go @@ -4,6 +4,8 @@ import ( "bytes" "io/ioutil" "net/http" + "net/url" + "strconv" "sync" "testing" @@ -30,11 +32,21 @@ cpu_load_short,host=server06 value=12.0 1422568543702900257 func newTestHTTPListener() *HTTPListener { listener := &HTTPListener{ - ServiceAddress: ":8186", + ServiceAddress: ":0", } return listener } +func createURL(listener *HTTPListener, path string, rawquery string) string { + u := url.URL{ + Scheme: "http", + Host: "localhost:" + strconv.Itoa(listener.Port), + Path: path, + RawQuery: rawquery, + } + return u.String() +} + func TestWriteHTTP(t *testing.T) { listener := newTestHTTPListener() @@ -43,7 +55,7 @@ func TestWriteHTTP(t *testing.T) { defer listener.Stop() // post single message to listener - resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(testMsg))) + resp, err := http.Post(createURL(listener, "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsg))) require.NoError(t, err) require.EqualValues(t, 204, resp.StatusCode) @@ -54,7 +66,7 @@ func TestWriteHTTP(t *testing.T) { ) // post multiple message to listener - resp, err = http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(testMsgs))) + resp, err = http.Post(createURL(listener, "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgs))) require.NoError(t, err) require.EqualValues(t, 204, resp.StatusCode) @@ -69,7 +81,7 @@ func TestWriteHTTP(t *testing.T) { } // Post a gigantic metric to the listener and verify that an error is returned: - resp, err = http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(hugeMetric))) + resp, err = http.Post(createURL(listener, "/write", "db=mydb"), "", bytes.NewBuffer([]byte(hugeMetric))) require.NoError(t, err) require.EqualValues(t, 400, resp.StatusCode) @@ -89,7 +101,7 @@ func TestWriteHTTPNoNewline(t *testing.T) { defer listener.Stop() // post single message to listener - resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(testMsgNoNewline))) + resp, err := http.Post(createURL(listener, "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgNoNewline))) require.NoError(t, err) require.EqualValues(t, 204, resp.StatusCode) @@ -102,7 +114,7 @@ func TestWriteHTTPNoNewline(t *testing.T) { func TestWriteHTTPMaxLineSizeIncrease(t *testing.T) { listener := &HTTPListener{ - ServiceAddress: ":8296", + ServiceAddress: ":0", MaxLineSize: 128 * 1000, } @@ -111,14 +123,14 @@ func TestWriteHTTPMaxLineSizeIncrease(t *testing.T) { defer listener.Stop() // Post a gigantic metric to the listener and verify that it writes OK this time: - resp, err := http.Post("http://localhost:8296/write?db=mydb", "", bytes.NewBuffer([]byte(hugeMetric))) + resp, err := http.Post(createURL(listener, "/write", "db=mydb"), "", bytes.NewBuffer([]byte(hugeMetric))) require.NoError(t, err) require.EqualValues(t, 204, resp.StatusCode) } func TestWriteHTTPVerySmallMaxBody(t *testing.T) { listener := &HTTPListener{ - ServiceAddress: ":8297", + ServiceAddress: ":0", MaxBodySize: 4096, } @@ -126,14 +138,14 @@ func TestWriteHTTPVerySmallMaxBody(t *testing.T) { require.NoError(t, listener.Start(acc)) defer listener.Stop() - resp, err := http.Post("http://localhost:8297/write", "", bytes.NewBuffer([]byte(hugeMetric))) + resp, err := http.Post(createURL(listener, "/write", ""), "", bytes.NewBuffer([]byte(hugeMetric))) require.NoError(t, err) require.EqualValues(t, 413, resp.StatusCode) } func TestWriteHTTPVerySmallMaxLineSize(t *testing.T) { listener := &HTTPListener{ - ServiceAddress: ":8298", + ServiceAddress: ":0", MaxLineSize: 70, } @@ -141,7 +153,7 @@ func TestWriteHTTPVerySmallMaxLineSize(t *testing.T) { require.NoError(t, listener.Start(acc)) defer listener.Stop() - resp, err := http.Post("http://localhost:8298/write", "", bytes.NewBuffer([]byte(testMsgs))) + resp, err := http.Post(createURL(listener, "/write", ""), "", bytes.NewBuffer([]byte(testMsgs))) require.NoError(t, err) require.EqualValues(t, 204, resp.StatusCode) @@ -158,7 +170,7 @@ func TestWriteHTTPVerySmallMaxLineSize(t *testing.T) { func TestWriteHTTPLargeLinesSkipped(t *testing.T) { listener := &HTTPListener{ - ServiceAddress: ":8300", + ServiceAddress: ":0", MaxLineSize: 100, } @@ -166,7 +178,7 @@ func TestWriteHTTPLargeLinesSkipped(t *testing.T) { require.NoError(t, listener.Start(acc)) defer listener.Stop() - resp, err := http.Post("http://localhost:8300/write", "", bytes.NewBuffer([]byte(hugeMetric+testMsgs))) + resp, err := http.Post(createURL(listener, "/write", ""), "", bytes.NewBuffer([]byte(hugeMetric+testMsgs))) require.NoError(t, err) require.EqualValues(t, 400, resp.StatusCode) @@ -183,9 +195,7 @@ func TestWriteHTTPLargeLinesSkipped(t *testing.T) { // test that writing gzipped data works func TestWriteHTTPGzippedData(t *testing.T) { - listener := &HTTPListener{ - ServiceAddress: ":8299", - } + listener := newTestHTTPListener() acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -194,7 +204,7 @@ func TestWriteHTTPGzippedData(t *testing.T) { data, err := ioutil.ReadFile("./testdata/testmsgs.gz") require.NoError(t, err) - req, err := http.NewRequest("POST", "http://localhost:8299/write", bytes.NewBuffer(data)) + req, err := http.NewRequest("POST", createURL(listener, "/write", ""), bytes.NewBuffer(data)) require.NoError(t, err) req.Header.Set("Content-Encoding", "gzip") @@ -216,7 +226,7 @@ func TestWriteHTTPGzippedData(t *testing.T) { // writes 25,000 metrics to the listener with 10 different writers func TestWriteHTTPHighTraffic(t *testing.T) { - listener := &HTTPListener{ServiceAddress: ":8286"} + listener := newTestHTTPListener() acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -229,7 +239,7 @@ func TestWriteHTTPHighTraffic(t *testing.T) { go func(innerwg *sync.WaitGroup) { defer innerwg.Done() for i := 0; i < 500; i++ { - resp, err := http.Post("http://localhost:8286/write?db=mydb", "", bytes.NewBuffer([]byte(testMsgs))) + resp, err := http.Post(createURL(listener, "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgs))) require.NoError(t, err) require.EqualValues(t, 204, resp.StatusCode) } @@ -251,7 +261,7 @@ func TestReceive404ForInvalidEndpoint(t *testing.T) { defer listener.Stop() // post single message to listener - resp, err := http.Post("http://localhost:8186/foobar", "", bytes.NewBuffer([]byte(testMsg))) + resp, err := http.Post(createURL(listener, "/foobar", ""), "", bytes.NewBuffer([]byte(testMsg))) require.NoError(t, err) require.EqualValues(t, 404, resp.StatusCode) } @@ -264,7 +274,7 @@ func TestWriteHTTPInvalid(t *testing.T) { defer listener.Stop() // post single message to listener - resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(badMsg))) + resp, err := http.Post(createURL(listener, "/write", "db=mydb"), "", bytes.NewBuffer([]byte(badMsg))) require.NoError(t, err) require.EqualValues(t, 400, resp.StatusCode) } @@ -277,7 +287,7 @@ func TestWriteHTTPEmpty(t *testing.T) { defer listener.Stop() // post single message to listener - resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(emptyMsg))) + resp, err := http.Post(createURL(listener, "/write", "db=mydb"), "", bytes.NewBuffer([]byte(emptyMsg))) require.NoError(t, err) require.EqualValues(t, 204, resp.StatusCode) } @@ -290,12 +300,13 @@ func TestQueryAndPingHTTP(t *testing.T) { defer listener.Stop() // post query to listener - resp, err := http.Post("http://localhost:8186/query?db=&q=CREATE+DATABASE+IF+NOT+EXISTS+%22mydb%22", "", nil) + resp, err := http.Post( + createURL(listener, "/query", "db=&q=CREATE+DATABASE+IF+NOT+EXISTS+%22mydb%22"), "", nil) require.NoError(t, err) require.EqualValues(t, 200, resp.StatusCode) // post ping to listener - resp, err = http.Post("http://localhost:8186/ping", "", nil) + resp, err = http.Post(createURL(listener, "/ping", ""), "", nil) require.NoError(t, err) require.EqualValues(t, 204, resp.StatusCode) }