diff --git a/CHANGELOG.md b/CHANGELOG.md index 6723ca112..00310c38c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -87,6 +87,7 @@ - [#1896](https://github.com/influxdata/telegraf/issues/1896): Fix various mysql data type conversions. - [#3810](https://github.com/influxdata/telegraf/issues/3810): Fix metric buffer limit in internal plugin after reload. - [#3801](https://github.com/influxdata/telegraf/issues/3801): Fix panic in http_response on invalid regex. +- [#3973](https://github.com/influxdata/telegraf/issues/3873): Fix socket_listener setting ReadBufferSize on tcp sockets. ## v1.5.3 [unreleased] diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go index 965a6a870..2ad6c4264 100644 --- a/plugins/inputs/socket_listener/socket_listener.go +++ b/plugins/inputs/socket_listener/socket_listener.go @@ -26,6 +26,8 @@ type streamSocketListener struct { net.Listener *SocketListener + sockType string + connections map[string]net.Conn connectionsMtx sync.Mutex } @@ -42,6 +44,14 @@ func (ssl *streamSocketListener) listen() { break } + if ssl.ReadBufferSize > 0 { + if srb, ok := c.(setReadBufferer); ok { + srb.SetReadBuffer(ssl.ReadBufferSize) + } else { + log.Printf("W! Unable to set read buffer on a %s socket", ssl.sockType) + } + } + ssl.connectionsMtx.Lock() if ssl.MaxConnections > 0 && len(ssl.connections) >= ssl.MaxConnections { ssl.connectionsMtx.Unlock() @@ -237,17 +247,10 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error { return err } - if sl.ReadBufferSize > 0 { - if srb, ok := l.(setReadBufferer); ok { - srb.SetReadBuffer(sl.ReadBufferSize) - } else { - log.Printf("W! Unable to set read buffer on a %s socket", spl[0]) - } - } - ssl := &streamSocketListener{ Listener: l, SocketListener: sl, + sockType: spl[0], } sl.Closer = ssl diff --git a/plugins/inputs/socket_listener/socket_listener_test.go b/plugins/inputs/socket_listener/socket_listener_test.go index b263e5082..4e8335699 100644 --- a/plugins/inputs/socket_listener/socket_listener_test.go +++ b/plugins/inputs/socket_listener/socket_listener_test.go @@ -1,6 +1,8 @@ package socket_listener import ( + "bytes" + "log" "net" "os" "testing" @@ -11,9 +13,24 @@ import ( "github.com/stretchr/testify/require" ) +// testEmptyLog is a helper function to ensure no data is written to log. +// Should be called at the start of the test, and returns a function which should run at the end. +func testEmptyLog(t *testing.T) func() { + buf := bytes.NewBuffer(nil) + log.SetOutput(buf) + + return func() { + log.SetOutput(os.Stderr) + assert.Empty(t, string(buf.Bytes()), "log not empty") + } +} + func TestSocketListener_tcp(t *testing.T) { + defer testEmptyLog(t)() + sl := newSocketListener() sl.ServiceAddress = "tcp://127.0.0.1:0" + sl.ReadBufferSize = 1024 acc := &testutil.Accumulator{} err := sl.Start(acc) @@ -27,8 +44,11 @@ func TestSocketListener_tcp(t *testing.T) { } func TestSocketListener_udp(t *testing.T) { + defer testEmptyLog(t)() + sl := newSocketListener() sl.ServiceAddress = "udp://127.0.0.1:0" + sl.ReadBufferSize = 1024 acc := &testutil.Accumulator{} err := sl.Start(acc) @@ -42,9 +62,12 @@ func TestSocketListener_udp(t *testing.T) { } func TestSocketListener_unix(t *testing.T) { + defer testEmptyLog(t)() + os.Create("/tmp/telegraf_test.sock") sl := newSocketListener() sl.ServiceAddress = "unix:///tmp/telegraf_test.sock" + sl.ReadBufferSize = 1024 acc := &testutil.Accumulator{} err := sl.Start(acc) @@ -58,9 +81,12 @@ func TestSocketListener_unix(t *testing.T) { } func TestSocketListener_unixgram(t *testing.T) { + defer testEmptyLog(t)() + os.Create("/tmp/telegraf_test.sock") sl := newSocketListener() sl.ServiceAddress = "unixgram:///tmp/telegraf_test.sock" + sl.ReadBufferSize = 1024 acc := &testutil.Accumulator{} err := sl.Start(acc)