add keep-alive support to socket_listener & socket_writer (#2697)
closes #2635
This commit is contained in:
		
							parent
							
								
									3c28b93514
								
							
						
					
					
						commit
						8f5cd6c2ae
					
				|  | @ -74,6 +74,7 @@ be deprecated eventually. | ||||||
| - [#2038](https://github.com/influxdata/telegraf/issues/2038): Add papertrail support to webhooks | - [#2038](https://github.com/influxdata/telegraf/issues/2038): Add papertrail support to webhooks | ||||||
| - [#2253](https://github.com/influxdata/telegraf/pull/2253): Change jolokia plugin to use bulk requests. | - [#2253](https://github.com/influxdata/telegraf/pull/2253): Change jolokia plugin to use bulk requests. | ||||||
| - [#2575](https://github.com/influxdata/telegraf/issues/2575) Add diskio input for Darwin | - [#2575](https://github.com/influxdata/telegraf/issues/2575) Add diskio input for Darwin | ||||||
|  | - [#2635](https://github.com/influxdata/telegraf/issues/2635): add tcp keep-alive to socket_listener & socket_writer | ||||||
| 
 | 
 | ||||||
| ### Bugfixes | ### Bugfixes | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -36,6 +36,12 @@ This is a sample configuration for the plugin. | ||||||
|   ## Defaults to the OS default. |   ## Defaults to the OS default. | ||||||
|   # read_buffer_size = 65535 |   # read_buffer_size = 65535 | ||||||
| 
 | 
 | ||||||
|  |   ## Period between keep alive probes. | ||||||
|  |   ## Only applies to TCP sockets. | ||||||
|  |   ## 0 disables keep alive probes. | ||||||
|  |   ## Defaults to the OS configuration. | ||||||
|  |   # keep_alive_period = "5m" | ||||||
|  | 
 | ||||||
|   ## Data format to consume. |   ## Data format to consume. | ||||||
|   ## Each data format has it's own unique set of configuration options, read |   ## Each data format has it's own unique set of configuration options, read | ||||||
|   ## more about them here: |   ## more about them here: | ||||||
|  |  | ||||||
|  | @ -11,6 +11,7 @@ import ( | ||||||
| 	"sync" | 	"sync" | ||||||
| 
 | 
 | ||||||
| 	"github.com/influxdata/telegraf" | 	"github.com/influxdata/telegraf" | ||||||
|  | 	"github.com/influxdata/telegraf/internal" | ||||||
| 	"github.com/influxdata/telegraf/plugins/inputs" | 	"github.com/influxdata/telegraf/plugins/inputs" | ||||||
| 	"github.com/influxdata/telegraf/plugins/parsers" | 	"github.com/influxdata/telegraf/plugins/parsers" | ||||||
| ) | ) | ||||||
|  | @ -47,6 +48,11 @@ func (ssl *streamSocketListener) listen() { | ||||||
| 		} | 		} | ||||||
| 		ssl.connections[c.RemoteAddr().String()] = c | 		ssl.connections[c.RemoteAddr().String()] = c | ||||||
| 		ssl.connectionsMtx.Unlock() | 		ssl.connectionsMtx.Unlock() | ||||||
|  | 
 | ||||||
|  | 		if err := ssl.setKeepAlive(c); err != nil { | ||||||
|  | 			ssl.AddError(fmt.Errorf("unable to configure keep alive (%s): %s", ssl.ServiceAddress, err)) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
| 		go ssl.read(c) | 		go ssl.read(c) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -57,6 +63,23 @@ func (ssl *streamSocketListener) listen() { | ||||||
| 	ssl.connectionsMtx.Unlock() | 	ssl.connectionsMtx.Unlock() | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (ssl *streamSocketListener) setKeepAlive(c net.Conn) error { | ||||||
|  | 	if ssl.KeepAlivePeriod == nil { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	tcpc, ok := c.(*net.TCPConn) | ||||||
|  | 	if !ok { | ||||||
|  | 		return fmt.Errorf("cannot set keep alive on a %s socket", strings.SplitN(ssl.ServiceAddress, "://", 2)[0]) | ||||||
|  | 	} | ||||||
|  | 	if ssl.KeepAlivePeriod.Duration == 0 { | ||||||
|  | 		return tcpc.SetKeepAlive(false) | ||||||
|  | 	} | ||||||
|  | 	if err := tcpc.SetKeepAlive(true); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	return tcpc.SetKeepAlivePeriod(ssl.KeepAlivePeriod.Duration) | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func (ssl *streamSocketListener) removeConnection(c net.Conn) { | func (ssl *streamSocketListener) removeConnection(c net.Conn) { | ||||||
| 	ssl.connectionsMtx.Lock() | 	ssl.connectionsMtx.Lock() | ||||||
| 	delete(ssl.connections, c.RemoteAddr().String()) | 	delete(ssl.connections, c.RemoteAddr().String()) | ||||||
|  | @ -116,9 +139,10 @@ func (psl *packetSocketListener) listen() { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type SocketListener struct { | type SocketListener struct { | ||||||
| 	ServiceAddress string | 	ServiceAddress  string | ||||||
| 	MaxConnections int | 	MaxConnections  int | ||||||
| 	ReadBufferSize int | 	ReadBufferSize  int | ||||||
|  | 	KeepAlivePeriod *internal.Duration | ||||||
| 
 | 
 | ||||||
| 	parsers.Parser | 	parsers.Parser | ||||||
| 	telegraf.Accumulator | 	telegraf.Accumulator | ||||||
|  | @ -154,6 +178,12 @@ func (sl *SocketListener) SampleConfig() string { | ||||||
|   ## Defaults to the OS default. |   ## Defaults to the OS default. | ||||||
|   # read_buffer_size = 65535 |   # read_buffer_size = 65535 | ||||||
| 
 | 
 | ||||||
|  |   ## Period between keep alive probes. | ||||||
|  |   ## Only applies to TCP sockets. | ||||||
|  |   ## 0 disables keep alive probes. | ||||||
|  |   ## Defaults to the OS configuration. | ||||||
|  |   # keep_alive_period = "5m" | ||||||
|  | 
 | ||||||
|   ## Data format to consume. |   ## Data format to consume. | ||||||
|   ## Each data format has it's own unique set of configuration options, read |   ## Each data format has it's own unique set of configuration options, read | ||||||
|   ## more about them here: |   ## more about them here: | ||||||
|  |  | ||||||
|  | @ -19,6 +19,12 @@ It can output data in any of the [supported output formats](https://github.com/i | ||||||
|   # address = "unix:///tmp/telegraf.sock" |   # address = "unix:///tmp/telegraf.sock" | ||||||
|   # address = "unixgram:///tmp/telegraf.sock" |   # address = "unixgram:///tmp/telegraf.sock" | ||||||
| 
 | 
 | ||||||
|  |   ## Period between keep alive probes. | ||||||
|  |   ## Only applies to TCP sockets. | ||||||
|  |   ## 0 disables keep alive probes. | ||||||
|  |   ## Defaults to the OS configuration. | ||||||
|  |   # keep_alive_period = "5m" | ||||||
|  | 
 | ||||||
|   ## Data format to generate. |   ## Data format to generate. | ||||||
|   ## Each data format has it's own unique set of configuration options, read |   ## Each data format has it's own unique set of configuration options, read | ||||||
|   ## more about them here: |   ## more about them here: | ||||||
|  |  | ||||||
|  | @ -2,16 +2,19 @@ package socket_writer | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"fmt" | 	"fmt" | ||||||
|  | 	"log" | ||||||
| 	"net" | 	"net" | ||||||
| 	"strings" | 	"strings" | ||||||
| 
 | 
 | ||||||
| 	"github.com/influxdata/telegraf" | 	"github.com/influxdata/telegraf" | ||||||
|  | 	"github.com/influxdata/telegraf/internal" | ||||||
| 	"github.com/influxdata/telegraf/plugins/outputs" | 	"github.com/influxdata/telegraf/plugins/outputs" | ||||||
| 	"github.com/influxdata/telegraf/plugins/serializers" | 	"github.com/influxdata/telegraf/plugins/serializers" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| type SocketWriter struct { | type SocketWriter struct { | ||||||
| 	Address string | 	Address         string | ||||||
|  | 	KeepAlivePeriod *internal.Duration | ||||||
| 
 | 
 | ||||||
| 	serializers.Serializer | 	serializers.Serializer | ||||||
| 
 | 
 | ||||||
|  | @ -36,6 +39,12 @@ func (sw *SocketWriter) SampleConfig() string { | ||||||
|   # address = "unix:///tmp/telegraf.sock" |   # address = "unix:///tmp/telegraf.sock" | ||||||
|   # address = "unixgram:///tmp/telegraf.sock" |   # address = "unixgram:///tmp/telegraf.sock" | ||||||
| 
 | 
 | ||||||
|  |   ## Period between keep alive probes. | ||||||
|  |   ## Only applies to TCP sockets. | ||||||
|  |   ## 0 disables keep alive probes. | ||||||
|  |   ## Defaults to the OS configuration. | ||||||
|  |   # keep_alive_period = "5m" | ||||||
|  | 
 | ||||||
|   ## Data format to generate. |   ## Data format to generate. | ||||||
|   ## Each data format has it's own unique set of configuration options, read |   ## Each data format has it's own unique set of configuration options, read | ||||||
|   ## more about them here: |   ## more about them here: | ||||||
|  | @ -59,10 +68,31 @@ func (sw *SocketWriter) Connect() error { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	if err := sw.setKeepAlive(c); err != nil { | ||||||
|  | 		log.Printf("unable to configure keep alive (%s): %s", sw.Address, err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	sw.Conn = c | 	sw.Conn = c | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (sw *SocketWriter) setKeepAlive(c net.Conn) error { | ||||||
|  | 	if sw.KeepAlivePeriod == nil { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	tcpc, ok := c.(*net.TCPConn) | ||||||
|  | 	if !ok { | ||||||
|  | 		return fmt.Errorf("cannot set keep alive on a %s socket", strings.SplitN(sw.Address, "://", 2)[0]) | ||||||
|  | 	} | ||||||
|  | 	if sw.KeepAlivePeriod.Duration == 0 { | ||||||
|  | 		return tcpc.SetKeepAlive(false) | ||||||
|  | 	} | ||||||
|  | 	if err := tcpc.SetKeepAlive(true); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	return tcpc.SetKeepAlivePeriod(sw.KeepAlivePeriod.Duration) | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // Write writes the given metrics to the destination.
 | // Write writes the given metrics to the destination.
 | ||||||
| // If an error is encountered, it is up to the caller to retry the same write again later.
 | // If an error is encountered, it is up to the caller to retry the same write again later.
 | ||||||
| // Not parallel safe.
 | // Not parallel safe.
 | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue