telegraf/plugins/inputs/socket_listener/socket_listener.go

344 lines
8.2 KiB
Go

package socket_listener
import (
"bufio"
"fmt"
"io"
"log"
"net"
"os"
"strings"
"sync"
"time"
"crypto/tls"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
tlsint "github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)
type setReadBufferer interface {
SetReadBuffer(bytes int) error
}
type streamSocketListener struct {
net.Listener
*SocketListener
sockType string
connections map[string]net.Conn
connectionsMtx sync.Mutex
}
func (ssl *streamSocketListener) listen() {
ssl.connections = map[string]net.Conn{}
for {
c, err := ssl.Accept()
if err != nil {
if !strings.HasSuffix(err.Error(), ": use of closed network connection") {
ssl.AddError(err)
}
break
}
if ssl.ReadBufferSize > 0 {
if srb, ok := c.(setReadBufferer); ok {
srb.SetReadBuffer(ssl.ReadBufferSize)
} else {
log.Printf("W! Unable to set read buffer on a %s socket", ssl.sockType)
}
}
ssl.connectionsMtx.Lock()
if ssl.MaxConnections > 0 && len(ssl.connections) >= ssl.MaxConnections {
ssl.connectionsMtx.Unlock()
c.Close()
continue
}
ssl.connections[c.RemoteAddr().String()] = c
ssl.connectionsMtx.Unlock()
if err := ssl.setKeepAlive(c); err != nil {
ssl.AddError(fmt.Errorf("unable to configure keep alive (%s): %s", ssl.ServiceAddress, err))
}
go ssl.read(c)
}
ssl.connectionsMtx.Lock()
for _, c := range ssl.connections {
c.Close()
}
ssl.connectionsMtx.Unlock()
}
func (ssl *streamSocketListener) setKeepAlive(c net.Conn) error {
if ssl.KeepAlivePeriod == nil {
return nil
}
tcpc, ok := c.(*net.TCPConn)
if !ok {
return fmt.Errorf("cannot set keep alive on a %s socket", strings.SplitN(ssl.ServiceAddress, "://", 2)[0])
}
if ssl.KeepAlivePeriod.Duration == 0 {
return tcpc.SetKeepAlive(false)
}
if err := tcpc.SetKeepAlive(true); err != nil {
return err
}
return tcpc.SetKeepAlivePeriod(ssl.KeepAlivePeriod.Duration)
}
func (ssl *streamSocketListener) removeConnection(c net.Conn) {
ssl.connectionsMtx.Lock()
delete(ssl.connections, c.RemoteAddr().String())
ssl.connectionsMtx.Unlock()
}
func (ssl *streamSocketListener) read(c net.Conn) {
defer ssl.removeConnection(c)
defer c.Close()
scnr := bufio.NewScanner(c)
for {
if ssl.ReadTimeout != nil && ssl.ReadTimeout.Duration > 0 {
c.SetReadDeadline(time.Now().Add(ssl.ReadTimeout.Duration))
}
if !scnr.Scan() {
break
}
metrics, err := ssl.Parse(scnr.Bytes())
if err != nil {
ssl.AddError(fmt.Errorf("unable to parse incoming line: %s", err))
//TODO rate limit
continue
}
for _, m := range metrics {
ssl.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
}
if err := scnr.Err(); err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
log.Printf("D! Timeout in plugin [input.socket_listener]: %s", err)
} else if netErr != nil && !strings.HasSuffix(err.Error(), ": use of closed network connection") {
ssl.AddError(err)
}
}
}
type packetSocketListener struct {
net.PacketConn
*SocketListener
}
func (psl *packetSocketListener) listen() {
buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet
for {
n, _, err := psl.ReadFrom(buf)
if err != nil {
if !strings.HasSuffix(err.Error(), ": use of closed network connection") {
psl.AddError(err)
}
break
}
metrics, err := psl.Parse(buf[:n])
if err != nil {
psl.AddError(fmt.Errorf("unable to parse incoming packet: %s", err))
//TODO rate limit
continue
}
for _, m := range metrics {
psl.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
}
}
type SocketListener struct {
ServiceAddress string `toml:"service_address"`
MaxConnections int `toml:"max_connections"`
ReadBufferSize int `toml:"read_buffer_size"`
ReadTimeout *internal.Duration `toml:"read_timeout"`
KeepAlivePeriod *internal.Duration `toml:"keep_alive_period"`
tlsint.ServerConfig
parsers.Parser
telegraf.Accumulator
io.Closer
}
func (sl *SocketListener) Description() string {
return "Generic socket listener capable of handling multiple socket types."
}
func (sl *SocketListener) SampleConfig() string {
return `
## URL to listen on
# service_address = "tcp://:8094"
# service_address = "tcp://127.0.0.1:http"
# service_address = "tcp4://:8094"
# service_address = "tcp6://:8094"
# service_address = "tcp6://[2001:db8::1]:8094"
# service_address = "udp://:8094"
# service_address = "udp4://:8094"
# service_address = "udp6://:8094"
# service_address = "unix:///tmp/telegraf.sock"
# service_address = "unixgram:///tmp/telegraf.sock"
## Maximum number of concurrent connections.
## Only applies to stream sockets (e.g. TCP).
## 0 (default) is unlimited.
# max_connections = 1024
## Read timeout.
## Only applies to stream sockets (e.g. TCP).
## 0 (default) is unlimited.
# read_timeout = "30s"
## Optional TLS configuration.
## Only applies to stream sockets (e.g. TCP).
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Enables client authentication if set.
# tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
## Maximum socket buffer size in bytes.
## For stream sockets, once the buffer fills up, the sender will start backing up.
## For datagram sockets, once the buffer fills up, metrics will start dropping.
## Defaults to the OS default.
# read_buffer_size = 65535
## Period between keep alive probes.
## Only applies to TCP sockets.
## 0 disables keep alive probes.
## Defaults to the OS configuration.
# keep_alive_period = "5m"
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
# data_format = "influx"
`
}
func (sl *SocketListener) Gather(_ telegraf.Accumulator) error {
return nil
}
func (sl *SocketListener) SetParser(parser parsers.Parser) {
sl.Parser = parser
}
func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
sl.Accumulator = acc
spl := strings.SplitN(sl.ServiceAddress, "://", 2)
if len(spl) != 2 {
return fmt.Errorf("invalid service address: %s", sl.ServiceAddress)
}
if spl[0] == "unix" || spl[0] == "unixpacket" || spl[0] == "unixgram" {
// no good way of testing for "file does not exist".
// Instead just ignore error and blow up when we try to listen, which will
// indicate "address already in use" if file existed and we couldn't remove.
os.Remove(spl[1])
}
switch spl[0] {
case "tcp", "tcp4", "tcp6", "unix", "unixpacket":
var (
err error
l net.Listener
)
tlsCfg, err := sl.ServerConfig.TLSConfig()
if err != nil {
return nil
}
if tlsCfg == nil {
l, err = net.Listen(spl[0], spl[1])
} else {
l, err = tls.Listen(spl[0], spl[1], tlsCfg)
}
if err != nil {
return err
}
ssl := &streamSocketListener{
Listener: l,
SocketListener: sl,
sockType: spl[0],
}
sl.Closer = ssl
go ssl.listen()
case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram":
pc, err := net.ListenPacket(spl[0], spl[1])
if err != nil {
return err
}
if sl.ReadBufferSize > 0 {
if srb, ok := pc.(setReadBufferer); ok {
srb.SetReadBuffer(sl.ReadBufferSize)
} else {
log.Printf("W! Unable to set read buffer on a %s socket", spl[0])
}
}
psl := &packetSocketListener{
PacketConn: pc,
SocketListener: sl,
}
sl.Closer = psl
go psl.listen()
default:
return fmt.Errorf("unknown protocol '%s' in '%s'", spl[0], sl.ServiceAddress)
}
if spl[0] == "unix" || spl[0] == "unixpacket" || spl[0] == "unixgram" {
sl.Closer = unixCloser{path: spl[1], closer: sl.Closer}
}
return nil
}
func (sl *SocketListener) Stop() {
if sl.Closer != nil {
sl.Close()
sl.Closer = nil
}
}
func newSocketListener() *SocketListener {
parser, _ := parsers.NewInfluxParser()
return &SocketListener{
Parser: parser,
}
}
type unixCloser struct {
path string
closer io.Closer
}
func (uc unixCloser) Close() error {
err := uc.closer.Close()
os.Remove(uc.path) // ignore error
return err
}
func init() {
inputs.Add("socket_listener", func() telegraf.Input { return newSocketListener() })
}