From 121d71f48dd1e98571b131e504cfec4794179824 Mon Sep 17 00:00:00 2001 From: ncohensm Date: Wed, 22 Jun 2016 17:22:00 -0700 Subject: [PATCH] add unit tests --- .../http_listener/http_listener_test.go | 227 +++--------------- 1 file changed, 38 insertions(+), 189 deletions(-) diff --git a/plugins/inputs/http_listener/http_listener_test.go b/plugins/inputs/http_listener/http_listener_test.go index 927a791c3..fdf1354c2 100644 --- a/plugins/inputs/http_listener/http_listener_test.go +++ b/plugins/inputs/http_listener/http_listener_test.go @@ -1,48 +1,40 @@ package http_listener import ( - "fmt" - "net" "testing" "time" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "net/http" + "bytes" ) const ( testMsg = "cpu_load_short,host=server01 value=12.0 1422568543702900257\n" - testMsgs = ` -cpu_load_short,host=server02 value=12.0 1422568543702900257 + testMsgs = `cpu_load_short,host=server02 value=12.0 1422568543702900257 cpu_load_short,host=server03 value=12.0 1422568543702900257 cpu_load_short,host=server04 value=12.0 1422568543702900257 cpu_load_short,host=server05 value=12.0 1422568543702900257 cpu_load_short,host=server06 value=12.0 1422568543702900257 ` + badMsg = "blahblahblah: 42\n" ) -func newTestTcpListener() (*TcpListener, chan []byte) { - in := make(chan []byte, 1500) - listener := &TcpListener{ - ServiceAddress: ":8194", - AllowedPendingMessages: 10000, - MaxTCPConnections: 250, - in: in, - done: make(chan struct{}), +func newTestHttpListener() (*HttpListener) { + listener := &HttpListener{ + ServiceAddress: ":8186", + ReadTimeout: "10", + WriteTimeout: "10", } - return listener, in + return listener } -func TestConnectTCP(t *testing.T) { - listener := TcpListener{ - ServiceAddress: ":8194", - AllowedPendingMessages: 10000, - MaxTCPConnections: 250, - } +func TestWriteHTTP(t *testing.T) { + listener := newTestHttpListener() listener.parser, _ = parsers.NewInfluxParser() acc := &testutil.Accumulator{} @@ -50,19 +42,23 @@ func TestConnectTCP(t *testing.T) { defer listener.Stop() time.Sleep(time.Millisecond * 25) - conn, err := net.Dial("tcp", "127.0.0.1:8194") - require.NoError(t, err) - // send single message to socket - fmt.Fprintf(conn, testMsg) + // post single message to listener + var resp, err = http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(testMsg))) + require.NoError(t, err) + require.EqualValues(t, resp.StatusCode, 204) + time.Sleep(time.Millisecond * 15) acc.AssertContainsTaggedFields(t, "cpu_load_short", map[string]interface{}{"value": float64(12)}, map[string]string{"host": "server01"}, ) - // send multiple messages to socket - fmt.Fprintf(conn, testMsgs) + // post multiple message to listener + resp, err = http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(testMsgs))) + require.NoError(t, err) + require.EqualValues(t, resp.StatusCode, 204) + time.Sleep(time.Millisecond * 15) hostTags := []string{"server02", "server03", "server04", "server05", "server06"} @@ -74,13 +70,10 @@ func TestConnectTCP(t *testing.T) { } } -// Test that MaxTCPConections is respected -func TestConcurrentConns(t *testing.T) { - listener := TcpListener{ - ServiceAddress: ":8195", - AllowedPendingMessages: 10000, - MaxTCPConnections: 2, - } +func TestWriteHTTPInvalid(t *testing.T) { + time.Sleep(time.Millisecond * 250) + + listener := newTestHttpListener() listener.parser, _ = parsers.NewInfluxParser() acc := &testutil.Accumulator{} @@ -88,37 +81,17 @@ func TestConcurrentConns(t *testing.T) { defer listener.Stop() time.Sleep(time.Millisecond * 25) - _, err := net.Dial("tcp", "127.0.0.1:8195") - assert.NoError(t, err) - _, err = net.Dial("tcp", "127.0.0.1:8195") - assert.NoError(t, err) - // Connection over the limit: - conn, err := net.Dial("tcp", "127.0.0.1:8195") - assert.NoError(t, err) - net.Dial("tcp", "127.0.0.1:8195") - buf := make([]byte, 1500) - n, err := conn.Read(buf) - assert.NoError(t, err) - assert.Equal(t, - "Telegraf maximum concurrent TCP connections (2) reached, closing.\n"+ - "You may want to increase max_tcp_connections in"+ - " the Telegraf tcp listener configuration.\n", - string(buf[:n])) - - _, err = conn.Write([]byte(testMsg)) - assert.NoError(t, err) - time.Sleep(time.Millisecond * 10) - assert.Zero(t, acc.NFields()) + // post single message to listener + var resp, err = http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(badMsg))) + require.NoError(t, err) + require.EqualValues(t, resp.StatusCode, 500) } -// Test that MaxTCPConections is respected when max==1 -func TestConcurrentConns1(t *testing.T) { - listener := TcpListener{ - ServiceAddress: ":8196", - AllowedPendingMessages: 10000, - MaxTCPConnections: 1, - } +func TestQueryHTTP(t *testing.T) { + time.Sleep(time.Millisecond * 250) + + listener := newTestHttpListener() listener.parser, _ = parsers.NewInfluxParser() acc := &testutil.Accumulator{} @@ -126,134 +99,10 @@ func TestConcurrentConns1(t *testing.T) { defer listener.Stop() time.Sleep(time.Millisecond * 25) - _, err := net.Dial("tcp", "127.0.0.1:8196") - assert.NoError(t, err) - // Connection over the limit: - conn, err := net.Dial("tcp", "127.0.0.1:8196") - assert.NoError(t, err) - net.Dial("tcp", "127.0.0.1:8196") - buf := make([]byte, 1500) - n, err := conn.Read(buf) - assert.NoError(t, err) - assert.Equal(t, - "Telegraf maximum concurrent TCP connections (1) reached, closing.\n"+ - "You may want to increase max_tcp_connections in"+ - " the Telegraf tcp listener configuration.\n", - string(buf[:n])) - - _, err = conn.Write([]byte(testMsg)) - assert.NoError(t, err) - time.Sleep(time.Millisecond * 10) - assert.Zero(t, acc.NFields()) + // post query to listener + var resp, err = http.Post("http://localhost:8186/query?db=&q=CREATE+DATABASE+IF+NOT+EXISTS+%22mydb%22", "", nil) + require.NoError(t, err) + require.EqualValues(t, resp.StatusCode, 200) } -// Test that MaxTCPConections is respected -func TestCloseConcurrentConns(t *testing.T) { - listener := TcpListener{ - ServiceAddress: ":8195", - AllowedPendingMessages: 10000, - MaxTCPConnections: 2, - } - listener.parser, _ = parsers.NewInfluxParser() - - acc := &testutil.Accumulator{} - require.NoError(t, listener.Start(acc)) - - time.Sleep(time.Millisecond * 25) - _, err := net.Dial("tcp", "127.0.0.1:8195") - assert.NoError(t, err) - _, err = net.Dial("tcp", "127.0.0.1:8195") - assert.NoError(t, err) - - listener.Stop() -} - -func TestRunParser(t *testing.T) { - var testmsg = []byte(testMsg) - - listener, in := newTestTcpListener() - acc := testutil.Accumulator{} - listener.acc = &acc - defer close(listener.done) - - listener.parser, _ = parsers.NewInfluxParser() - listener.wg.Add(1) - go listener.tcpParser() - - in <- testmsg - time.Sleep(time.Millisecond * 25) - listener.Gather(&acc) - - if a := acc.NFields(); a != 1 { - t.Errorf("got %v, expected %v", a, 1) - } - - acc.AssertContainsTaggedFields(t, "cpu_load_short", - map[string]interface{}{"value": float64(12)}, - map[string]string{"host": "server01"}, - ) -} - -func TestRunParserInvalidMsg(t *testing.T) { - var testmsg = []byte("cpu_load_short") - - listener, in := newTestTcpListener() - acc := testutil.Accumulator{} - listener.acc = &acc - defer close(listener.done) - - listener.parser, _ = parsers.NewInfluxParser() - listener.wg.Add(1) - go listener.tcpParser() - - in <- testmsg - time.Sleep(time.Millisecond * 25) - - if a := acc.NFields(); a != 0 { - t.Errorf("got %v, expected %v", a, 0) - } -} - -func TestRunParserGraphiteMsg(t *testing.T) { - var testmsg = []byte("cpu.load.graphite 12 1454780029") - - listener, in := newTestTcpListener() - acc := testutil.Accumulator{} - listener.acc = &acc - defer close(listener.done) - - listener.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) - listener.wg.Add(1) - go listener.tcpParser() - - in <- testmsg - time.Sleep(time.Millisecond * 25) - listener.Gather(&acc) - - acc.AssertContainsFields(t, "cpu_load_graphite", - map[string]interface{}{"value": float64(12)}) -} - -func TestRunParserJSONMsg(t *testing.T) { - var testmsg = []byte("{\"a\": 5, \"b\": {\"c\": 6}}\n") - - listener, in := newTestTcpListener() - acc := testutil.Accumulator{} - listener.acc = &acc - defer close(listener.done) - - listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil) - listener.wg.Add(1) - go listener.tcpParser() - - in <- testmsg - time.Sleep(time.Millisecond * 25) - listener.Gather(&acc) - - acc.AssertContainsFields(t, "udp_json_test", - map[string]interface{}{ - "a": float64(5), - "b_c": float64(6), - }) -}