add keep-alive support to socket_listener & socket_writer (#2697)
closes #2635
This commit is contained in:
parent
ddc2f64593
commit
b1a2f896a2
|
@ -74,6 +74,7 @@ be deprecated eventually.
|
||||||
- [#2038](https://github.com/influxdata/telegraf/issues/2038): Add papertrail support to webhooks
|
- [#2038](https://github.com/influxdata/telegraf/issues/2038): Add papertrail support to webhooks
|
||||||
- [#2253](https://github.com/influxdata/telegraf/pull/2253): Change jolokia plugin to use bulk requests.
|
- [#2253](https://github.com/influxdata/telegraf/pull/2253): Change jolokia plugin to use bulk requests.
|
||||||
- [#2575](https://github.com/influxdata/telegraf/issues/2575) Add diskio input for Darwin
|
- [#2575](https://github.com/influxdata/telegraf/issues/2575) Add diskio input for Darwin
|
||||||
|
- [#2635](https://github.com/influxdata/telegraf/issues/2635): add tcp keep-alive to socket_listener & socket_writer
|
||||||
|
|
||||||
### Bugfixes
|
### Bugfixes
|
||||||
|
|
||||||
|
|
|
@ -36,6 +36,12 @@ This is a sample configuration for the plugin.
|
||||||
## Defaults to the OS default.
|
## Defaults to the OS default.
|
||||||
# read_buffer_size = 65535
|
# read_buffer_size = 65535
|
||||||
|
|
||||||
|
## Period between keep alive probes.
|
||||||
|
## Only applies to TCP sockets.
|
||||||
|
## 0 disables keep alive probes.
|
||||||
|
## Defaults to the OS configuration.
|
||||||
|
# keep_alive_period = "5m"
|
||||||
|
|
||||||
## Data format to consume.
|
## Data format to consume.
|
||||||
## Each data format has it's own unique set of configuration options, read
|
## Each data format has it's own unique set of configuration options, read
|
||||||
## more about them here:
|
## more about them here:
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers"
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
)
|
)
|
||||||
|
@ -47,6 +48,11 @@ func (ssl *streamSocketListener) listen() {
|
||||||
}
|
}
|
||||||
ssl.connections[c.RemoteAddr().String()] = c
|
ssl.connections[c.RemoteAddr().String()] = c
|
||||||
ssl.connectionsMtx.Unlock()
|
ssl.connectionsMtx.Unlock()
|
||||||
|
|
||||||
|
if err := ssl.setKeepAlive(c); err != nil {
|
||||||
|
ssl.AddError(fmt.Errorf("unable to configure keep alive (%s): %s", ssl.ServiceAddress, err))
|
||||||
|
}
|
||||||
|
|
||||||
go ssl.read(c)
|
go ssl.read(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -57,6 +63,23 @@ func (ssl *streamSocketListener) listen() {
|
||||||
ssl.connectionsMtx.Unlock()
|
ssl.connectionsMtx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ssl *streamSocketListener) setKeepAlive(c net.Conn) error {
|
||||||
|
if ssl.KeepAlivePeriod == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
tcpc, ok := c.(*net.TCPConn)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot set keep alive on a %s socket", strings.SplitN(ssl.ServiceAddress, "://", 2)[0])
|
||||||
|
}
|
||||||
|
if ssl.KeepAlivePeriod.Duration == 0 {
|
||||||
|
return tcpc.SetKeepAlive(false)
|
||||||
|
}
|
||||||
|
if err := tcpc.SetKeepAlive(true); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return tcpc.SetKeepAlivePeriod(ssl.KeepAlivePeriod.Duration)
|
||||||
|
}
|
||||||
|
|
||||||
func (ssl *streamSocketListener) removeConnection(c net.Conn) {
|
func (ssl *streamSocketListener) removeConnection(c net.Conn) {
|
||||||
ssl.connectionsMtx.Lock()
|
ssl.connectionsMtx.Lock()
|
||||||
delete(ssl.connections, c.RemoteAddr().String())
|
delete(ssl.connections, c.RemoteAddr().String())
|
||||||
|
@ -116,9 +139,10 @@ func (psl *packetSocketListener) listen() {
|
||||||
}
|
}
|
||||||
|
|
||||||
type SocketListener struct {
|
type SocketListener struct {
|
||||||
ServiceAddress string
|
ServiceAddress string
|
||||||
MaxConnections int
|
MaxConnections int
|
||||||
ReadBufferSize int
|
ReadBufferSize int
|
||||||
|
KeepAlivePeriod *internal.Duration
|
||||||
|
|
||||||
parsers.Parser
|
parsers.Parser
|
||||||
telegraf.Accumulator
|
telegraf.Accumulator
|
||||||
|
@ -154,6 +178,12 @@ func (sl *SocketListener) SampleConfig() string {
|
||||||
## Defaults to the OS default.
|
## Defaults to the OS default.
|
||||||
# read_buffer_size = 65535
|
# read_buffer_size = 65535
|
||||||
|
|
||||||
|
## Period between keep alive probes.
|
||||||
|
## Only applies to TCP sockets.
|
||||||
|
## 0 disables keep alive probes.
|
||||||
|
## Defaults to the OS configuration.
|
||||||
|
# keep_alive_period = "5m"
|
||||||
|
|
||||||
## Data format to consume.
|
## Data format to consume.
|
||||||
## Each data format has it's own unique set of configuration options, read
|
## Each data format has it's own unique set of configuration options, read
|
||||||
## more about them here:
|
## more about them here:
|
||||||
|
|
|
@ -19,6 +19,12 @@ It can output data in any of the [supported output formats](https://github.com/i
|
||||||
# address = "unix:///tmp/telegraf.sock"
|
# address = "unix:///tmp/telegraf.sock"
|
||||||
# address = "unixgram:///tmp/telegraf.sock"
|
# address = "unixgram:///tmp/telegraf.sock"
|
||||||
|
|
||||||
|
## Period between keep alive probes.
|
||||||
|
## Only applies to TCP sockets.
|
||||||
|
## 0 disables keep alive probes.
|
||||||
|
## Defaults to the OS configuration.
|
||||||
|
# keep_alive_period = "5m"
|
||||||
|
|
||||||
## Data format to generate.
|
## Data format to generate.
|
||||||
## Each data format has it's own unique set of configuration options, read
|
## Each data format has it's own unique set of configuration options, read
|
||||||
## more about them here:
|
## more about them here:
|
||||||
|
|
|
@ -2,16 +2,19 @@ package socket_writer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers"
|
"github.com/influxdata/telegraf/plugins/serializers"
|
||||||
)
|
)
|
||||||
|
|
||||||
type SocketWriter struct {
|
type SocketWriter struct {
|
||||||
Address string
|
Address string
|
||||||
|
KeepAlivePeriod *internal.Duration
|
||||||
|
|
||||||
serializers.Serializer
|
serializers.Serializer
|
||||||
|
|
||||||
|
@ -36,6 +39,12 @@ func (sw *SocketWriter) SampleConfig() string {
|
||||||
# address = "unix:///tmp/telegraf.sock"
|
# address = "unix:///tmp/telegraf.sock"
|
||||||
# address = "unixgram:///tmp/telegraf.sock"
|
# address = "unixgram:///tmp/telegraf.sock"
|
||||||
|
|
||||||
|
## Period between keep alive probes.
|
||||||
|
## Only applies to TCP sockets.
|
||||||
|
## 0 disables keep alive probes.
|
||||||
|
## Defaults to the OS configuration.
|
||||||
|
# keep_alive_period = "5m"
|
||||||
|
|
||||||
## Data format to generate.
|
## Data format to generate.
|
||||||
## Each data format has it's own unique set of configuration options, read
|
## Each data format has it's own unique set of configuration options, read
|
||||||
## more about them here:
|
## more about them here:
|
||||||
|
@ -59,10 +68,31 @@ func (sw *SocketWriter) Connect() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := sw.setKeepAlive(c); err != nil {
|
||||||
|
log.Printf("unable to configure keep alive (%s): %s", sw.Address, err)
|
||||||
|
}
|
||||||
|
|
||||||
sw.Conn = c
|
sw.Conn = c
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sw *SocketWriter) setKeepAlive(c net.Conn) error {
|
||||||
|
if sw.KeepAlivePeriod == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
tcpc, ok := c.(*net.TCPConn)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot set keep alive on a %s socket", strings.SplitN(sw.Address, "://", 2)[0])
|
||||||
|
}
|
||||||
|
if sw.KeepAlivePeriod.Duration == 0 {
|
||||||
|
return tcpc.SetKeepAlive(false)
|
||||||
|
}
|
||||||
|
if err := tcpc.SetKeepAlive(true); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return tcpc.SetKeepAlivePeriod(sw.KeepAlivePeriod.Duration)
|
||||||
|
}
|
||||||
|
|
||||||
// Write writes the given metrics to the destination.
|
// Write writes the given metrics to the destination.
|
||||||
// If an error is encountered, it is up to the caller to retry the same write again later.
|
// If an error is encountered, it is up to the caller to retry the same write again later.
|
||||||
// Not parallel safe.
|
// Not parallel safe.
|
||||||
|
|
Loading…
Reference in New Issue