Fix socket_listener setting ReadBufferSize on TCP sockets (#3874)

This commit is contained in:
Patrick Hemmer 2018-03-09 12:44:35 -05:00 committed by Daniel Nelson
parent 6437e23dd4
commit 48cbdbcdde
3 changed files with 38 additions and 8 deletions

View File

@ -87,6 +87,7 @@
- [#1896](https://github.com/influxdata/telegraf/issues/1896): Fix various mysql data type conversions. - [#1896](https://github.com/influxdata/telegraf/issues/1896): Fix various mysql data type conversions.
- [#3810](https://github.com/influxdata/telegraf/issues/3810): Fix metric buffer limit in internal plugin after reload. - [#3810](https://github.com/influxdata/telegraf/issues/3810): Fix metric buffer limit in internal plugin after reload.
- [#3801](https://github.com/influxdata/telegraf/issues/3801): Fix panic in http_response on invalid regex. - [#3801](https://github.com/influxdata/telegraf/issues/3801): Fix panic in http_response on invalid regex.
- [#3973](https://github.com/influxdata/telegraf/issues/3873): Fix socket_listener setting ReadBufferSize on tcp sockets.
## v1.5.3 [unreleased] ## v1.5.3 [unreleased]

View File

@ -26,6 +26,8 @@ type streamSocketListener struct {
net.Listener net.Listener
*SocketListener *SocketListener
sockType string
connections map[string]net.Conn connections map[string]net.Conn
connectionsMtx sync.Mutex connectionsMtx sync.Mutex
} }
@ -42,6 +44,14 @@ func (ssl *streamSocketListener) listen() {
break break
} }
if ssl.ReadBufferSize > 0 {
if srb, ok := c.(setReadBufferer); ok {
srb.SetReadBuffer(ssl.ReadBufferSize)
} else {
log.Printf("W! Unable to set read buffer on a %s socket", ssl.sockType)
}
}
ssl.connectionsMtx.Lock() ssl.connectionsMtx.Lock()
if ssl.MaxConnections > 0 && len(ssl.connections) >= ssl.MaxConnections { if ssl.MaxConnections > 0 && len(ssl.connections) >= ssl.MaxConnections {
ssl.connectionsMtx.Unlock() ssl.connectionsMtx.Unlock()
@ -237,17 +247,10 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
return err return err
} }
if sl.ReadBufferSize > 0 {
if srb, ok := l.(setReadBufferer); ok {
srb.SetReadBuffer(sl.ReadBufferSize)
} else {
log.Printf("W! Unable to set read buffer on a %s socket", spl[0])
}
}
ssl := &streamSocketListener{ ssl := &streamSocketListener{
Listener: l, Listener: l,
SocketListener: sl, SocketListener: sl,
sockType: spl[0],
} }
sl.Closer = ssl sl.Closer = ssl

View File

@ -1,6 +1,8 @@
package socket_listener package socket_listener
import ( import (
"bytes"
"log"
"net" "net"
"os" "os"
"testing" "testing"
@ -11,9 +13,24 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
// testEmptyLog is a helper function to ensure no data is written to log.
// Should be called at the start of the test, and returns a function which should run at the end.
func testEmptyLog(t *testing.T) func() {
buf := bytes.NewBuffer(nil)
log.SetOutput(buf)
return func() {
log.SetOutput(os.Stderr)
assert.Empty(t, string(buf.Bytes()), "log not empty")
}
}
func TestSocketListener_tcp(t *testing.T) { func TestSocketListener_tcp(t *testing.T) {
defer testEmptyLog(t)()
sl := newSocketListener() sl := newSocketListener()
sl.ServiceAddress = "tcp://127.0.0.1:0" sl.ServiceAddress = "tcp://127.0.0.1:0"
sl.ReadBufferSize = 1024
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
err := sl.Start(acc) err := sl.Start(acc)
@ -27,8 +44,11 @@ func TestSocketListener_tcp(t *testing.T) {
} }
func TestSocketListener_udp(t *testing.T) { func TestSocketListener_udp(t *testing.T) {
defer testEmptyLog(t)()
sl := newSocketListener() sl := newSocketListener()
sl.ServiceAddress = "udp://127.0.0.1:0" sl.ServiceAddress = "udp://127.0.0.1:0"
sl.ReadBufferSize = 1024
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
err := sl.Start(acc) err := sl.Start(acc)
@ -42,9 +62,12 @@ func TestSocketListener_udp(t *testing.T) {
} }
func TestSocketListener_unix(t *testing.T) { func TestSocketListener_unix(t *testing.T) {
defer testEmptyLog(t)()
os.Create("/tmp/telegraf_test.sock") os.Create("/tmp/telegraf_test.sock")
sl := newSocketListener() sl := newSocketListener()
sl.ServiceAddress = "unix:///tmp/telegraf_test.sock" sl.ServiceAddress = "unix:///tmp/telegraf_test.sock"
sl.ReadBufferSize = 1024
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
err := sl.Start(acc) err := sl.Start(acc)
@ -58,9 +81,12 @@ func TestSocketListener_unix(t *testing.T) {
} }
func TestSocketListener_unixgram(t *testing.T) { func TestSocketListener_unixgram(t *testing.T) {
defer testEmptyLog(t)()
os.Create("/tmp/telegraf_test.sock") os.Create("/tmp/telegraf_test.sock")
sl := newSocketListener() sl := newSocketListener()
sl.ServiceAddress = "unixgram:///tmp/telegraf_test.sock" sl.ServiceAddress = "unixgram:///tmp/telegraf_test.sock"
sl.ReadBufferSize = 1024
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
err := sl.Start(acc) err := sl.Start(acc)