2017-02-02 16:24:03 +00:00
|
|
|
package socket_writer
|
|
|
|
|
|
|
|
import (
|
2019-06-04 00:34:48 +00:00
|
|
|
"crypto/tls"
|
2017-02-02 16:24:03 +00:00
|
|
|
"fmt"
|
2017-04-24 20:14:42 +00:00
|
|
|
"log"
|
2017-02-02 16:24:03 +00:00
|
|
|
"net"
|
|
|
|
"strings"
|
|
|
|
|
|
|
|
"github.com/influxdata/telegraf"
|
2017-04-24 20:14:42 +00:00
|
|
|
"github.com/influxdata/telegraf/internal"
|
2018-05-04 23:33:23 +00:00
|
|
|
tlsint "github.com/influxdata/telegraf/internal/tls"
|
2017-02-02 16:24:03 +00:00
|
|
|
"github.com/influxdata/telegraf/plugins/outputs"
|
|
|
|
"github.com/influxdata/telegraf/plugins/serializers"
|
|
|
|
)
|
|
|
|
|
|
|
|
type SocketWriter struct {
|
2020-04-30 20:21:34 +00:00
|
|
|
ContentEncoding string `toml:"content_encoding"`
|
2018-05-04 23:33:23 +00:00
|
|
|
Address string
|
|
|
|
KeepAlivePeriod *internal.Duration
|
|
|
|
tlsint.ClientConfig
|
2017-02-02 16:24:03 +00:00
|
|
|
|
|
|
|
serializers.Serializer
|
|
|
|
|
2020-04-30 20:21:34 +00:00
|
|
|
encoder internal.ContentEncoder
|
|
|
|
|
2017-02-02 16:24:03 +00:00
|
|
|
net.Conn
|
|
|
|
}
|
|
|
|
|
|
|
|
func (sw *SocketWriter) Description() string {
|
|
|
|
return "Generic socket writer capable of handling multiple socket types."
|
|
|
|
}
|
|
|
|
|
|
|
|
func (sw *SocketWriter) SampleConfig() string {
|
|
|
|
return `
|
|
|
|
## URL to connect to
|
|
|
|
# address = "tcp://127.0.0.1:8094"
|
|
|
|
# address = "tcp://example.com:http"
|
|
|
|
# address = "tcp4://127.0.0.1:8094"
|
|
|
|
# address = "tcp6://127.0.0.1:8094"
|
|
|
|
# address = "tcp6://[2001:db8::1]:8094"
|
|
|
|
# address = "udp://127.0.0.1:8094"
|
|
|
|
# address = "udp4://127.0.0.1:8094"
|
|
|
|
# address = "udp6://127.0.0.1:8094"
|
|
|
|
# address = "unix:///tmp/telegraf.sock"
|
|
|
|
# address = "unixgram:///tmp/telegraf.sock"
|
|
|
|
|
2018-05-04 23:33:23 +00:00
|
|
|
## Optional TLS Config
|
|
|
|
# tls_ca = "/etc/telegraf/ca.pem"
|
|
|
|
# tls_cert = "/etc/telegraf/cert.pem"
|
|
|
|
# tls_key = "/etc/telegraf/key.pem"
|
|
|
|
## Use TLS but skip chain & host verification
|
2018-04-18 00:02:04 +00:00
|
|
|
# insecure_skip_verify = false
|
|
|
|
|
2017-04-24 20:14:42 +00:00
|
|
|
## Period between keep alive probes.
|
|
|
|
## Only applies to TCP sockets.
|
|
|
|
## 0 disables keep alive probes.
|
|
|
|
## Defaults to the OS configuration.
|
|
|
|
# keep_alive_period = "5m"
|
|
|
|
|
2020-04-30 20:21:34 +00:00
|
|
|
## Content encoding for packet-based connections (i.e. UDP, unixgram).
|
|
|
|
## Can be set to "gzip" or to "identity" to apply no encoding.
|
|
|
|
##
|
|
|
|
# content_encoding = "identity"
|
|
|
|
|
2017-02-02 16:24:03 +00:00
|
|
|
## Data format to generate.
|
2017-04-27 21:59:18 +00:00
|
|
|
## Each data format has its own unique set of configuration options, read
|
2017-02-02 16:24:03 +00:00
|
|
|
## more about them here:
|
|
|
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
|
|
|
# data_format = "influx"
|
|
|
|
`
|
|
|
|
}
|
|
|
|
|
|
|
|
func (sw *SocketWriter) SetSerializer(s serializers.Serializer) {
|
|
|
|
sw.Serializer = s
|
|
|
|
}
|
|
|
|
|
|
|
|
func (sw *SocketWriter) Connect() error {
|
|
|
|
spl := strings.SplitN(sw.Address, "://", 2)
|
|
|
|
if len(spl) != 2 {
|
|
|
|
return fmt.Errorf("invalid address: %s", sw.Address)
|
|
|
|
}
|
|
|
|
|
2018-05-04 23:33:23 +00:00
|
|
|
tlsCfg, err := sw.ClientConfig.TLSConfig()
|
2018-04-18 00:02:04 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2018-04-18 00:48:30 +00:00
|
|
|
var c net.Conn
|
2018-04-18 00:02:04 +00:00
|
|
|
if tlsCfg == nil {
|
|
|
|
c, err = net.Dial(spl[0], spl[1])
|
|
|
|
} else {
|
|
|
|
c, err = tls.Dial(spl[0], spl[1], tlsCfg)
|
|
|
|
}
|
2017-02-02 16:24:03 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2017-04-24 20:14:42 +00:00
|
|
|
if err := sw.setKeepAlive(c); err != nil {
|
|
|
|
log.Printf("unable to configure keep alive (%s): %s", sw.Address, err)
|
|
|
|
}
|
2020-04-30 20:21:34 +00:00
|
|
|
//set encoder
|
|
|
|
sw.encoder, err = internal.NewContentEncoder(sw.ContentEncoding)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-04-24 20:14:42 +00:00
|
|
|
|
2017-02-02 16:24:03 +00:00
|
|
|
sw.Conn = c
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-04-24 20:14:42 +00:00
|
|
|
func (sw *SocketWriter) setKeepAlive(c net.Conn) error {
|
|
|
|
if sw.KeepAlivePeriod == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
tcpc, ok := c.(*net.TCPConn)
|
|
|
|
if !ok {
|
|
|
|
return fmt.Errorf("cannot set keep alive on a %s socket", strings.SplitN(sw.Address, "://", 2)[0])
|
|
|
|
}
|
|
|
|
if sw.KeepAlivePeriod.Duration == 0 {
|
|
|
|
return tcpc.SetKeepAlive(false)
|
|
|
|
}
|
|
|
|
if err := tcpc.SetKeepAlive(true); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return tcpc.SetKeepAlivePeriod(sw.KeepAlivePeriod.Duration)
|
|
|
|
}
|
|
|
|
|
2017-02-02 16:24:03 +00:00
|
|
|
// Write writes the given metrics to the destination.
|
|
|
|
// If an error is encountered, it is up to the caller to retry the same write again later.
|
|
|
|
// Not parallel safe.
|
|
|
|
func (sw *SocketWriter) Write(metrics []telegraf.Metric) error {
|
|
|
|
if sw.Conn == nil {
|
|
|
|
// previous write failed with permanent error and socket was closed.
|
|
|
|
if err := sw.Connect(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, m := range metrics {
|
|
|
|
bs, err := sw.Serialize(m)
|
|
|
|
if err != nil {
|
2019-06-04 00:34:48 +00:00
|
|
|
log.Printf("D! [outputs.socket_writer] Could not serialize metric: %v", err)
|
|
|
|
continue
|
2017-02-02 16:24:03 +00:00
|
|
|
}
|
2020-04-30 20:21:34 +00:00
|
|
|
|
|
|
|
bs, err = sw.encoder.Encode(bs)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("D! [outputs.socket_writer] Could not encode metric: %v", err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2017-02-02 16:24:03 +00:00
|
|
|
if _, err := sw.Conn.Write(bs); err != nil {
|
|
|
|
//TODO log & keep going with remaining strings
|
|
|
|
if err, ok := err.(net.Error); !ok || !err.Temporary() {
|
|
|
|
// permanent error. close the connection
|
|
|
|
sw.Close()
|
|
|
|
sw.Conn = nil
|
2018-05-29 23:10:27 +00:00
|
|
|
return fmt.Errorf("closing connection: %v", err)
|
2017-02-02 16:24:03 +00:00
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-05-02 18:06:49 +00:00
|
|
|
// Close closes the connection. Noop if already closed.
|
|
|
|
func (sw *SocketWriter) Close() error {
|
|
|
|
if sw.Conn == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
err := sw.Conn.Close()
|
|
|
|
sw.Conn = nil
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2017-02-02 16:24:03 +00:00
|
|
|
func newSocketWriter() *SocketWriter {
|
|
|
|
s, _ := serializers.NewInfluxSerializer()
|
|
|
|
return &SocketWriter{
|
|
|
|
Serializer: s,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func init() {
|
|
|
|
outputs.Add("socket_writer", func() telegraf.Output { return newSocketWriter() })
|
|
|
|
}
|