diff --git a/CHANGELOG.md b/CHANGELOG.md index f380f19cf..fc36f594e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ - [#2973](https://github.com/influxdata/telegraf/pull/2973): Change default prometheus_client port. - [#2661](https://github.com/influxdata/telegraf/pull/2661): Add fluentd input plugin. - [#2990](https://github.com/influxdata/telegraf/pull/2990): Add result_type field to net_response input plugin. +- [#2571](https://github.com/influxdata/telegraf/pull/2571): Add read timeout to socket_listener ### Bugfixes diff --git a/plugins/inputs/socket_listener/README.md b/plugins/inputs/socket_listener/README.md index 7baa0bffc..698f3aeee 100644 --- a/plugins/inputs/socket_listener/README.md +++ b/plugins/inputs/socket_listener/README.md @@ -30,6 +30,11 @@ This is a sample configuration for the plugin. ## 0 (default) is unlimited. # max_connections = 1024 + ## Read timeout. + ## Only applies to stream sockets (e.g. TCP). + ## 0 (default) is unlimited. + # read_timeout = "30s" + ## Maximum socket buffer size in bytes. ## For stream sockets, once the buffer fills up, the sender will start backing up. ## For datagram sockets, once the buffer fills up, metrics will start dropping. diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go index 839b28368..965a6a870 100644 --- a/plugins/inputs/socket_listener/socket_listener.go +++ b/plugins/inputs/socket_listener/socket_listener.go @@ -10,6 +10,8 @@ import ( "strings" "sync" + "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" @@ -91,7 +93,13 @@ func (ssl *streamSocketListener) read(c net.Conn) { defer c.Close() scnr := bufio.NewScanner(c) - for scnr.Scan() { + for { + if ssl.ReadTimeout != nil && ssl.ReadTimeout.Duration > 0 { + c.SetReadDeadline(time.Now().Add(ssl.ReadTimeout.Duration)) + } + if !scnr.Scan() { + break + } metrics, err := ssl.Parse(scnr.Bytes()) if err != nil { ssl.AddError(fmt.Errorf("unable to parse incoming line: %s", err)) @@ -104,7 +112,9 @@ func (ssl *streamSocketListener) read(c net.Conn) { } if err := scnr.Err(); err != nil { - if !strings.HasSuffix(err.Error(), ": use of closed network connection") { + if err, ok := err.(net.Error); ok && err.Timeout() { + log.Printf("D! Timeout in plugin [input.socket_listener]: %s", err) + } else if !strings.HasSuffix(err.Error(), ": use of closed network connection") { ssl.AddError(err) } } @@ -142,6 +152,7 @@ type SocketListener struct { ServiceAddress string MaxConnections int ReadBufferSize int + ReadTimeout *internal.Duration KeepAlivePeriod *internal.Duration parsers.Parser @@ -172,6 +183,11 @@ func (sl *SocketListener) SampleConfig() string { ## 0 (default) is unlimited. # max_connections = 1024 + ## Read timeout. + ## Only applies to stream sockets (e.g. TCP). + ## 0 (default) is unlimited. + # read_timeout = "30s" + ## Maximum socket buffer size in bytes. ## For stream sockets, once the buffer fills up, the sender will start backing up. ## For datagram sockets, once the buffer fills up, metrics will start dropping.