add socket listener & writer (#2094)
closes #1516 closes #1711 closes #1721 closes #1526
This commit is contained in:
committed by
Cameron Sparr
parent
2a32cba35b
commit
510b750da4
@@ -21,4 +21,5 @@ import (
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/prometheus_client"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/riemann"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/riemann_legacy"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/socket_writer"
|
||||
)
|
||||
|
||||
106
plugins/outputs/socket_writer/socket_writer.go
Normal file
106
plugins/outputs/socket_writer/socket_writer.go
Normal file
@@ -0,0 +1,106 @@
|
||||
package socket_writer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"github.com/influxdata/telegraf/plugins/serializers"
|
||||
)
|
||||
|
||||
type SocketWriter struct {
|
||||
Address string
|
||||
|
||||
serializers.Serializer
|
||||
|
||||
net.Conn
|
||||
}
|
||||
|
||||
func (sw *SocketWriter) Description() string {
|
||||
return "Generic socket writer capable of handling multiple socket types."
|
||||
}
|
||||
|
||||
func (sw *SocketWriter) SampleConfig() string {
|
||||
return `
|
||||
## URL to connect to
|
||||
# address = "tcp://127.0.0.1:8094"
|
||||
# address = "tcp://example.com:http"
|
||||
# address = "tcp4://127.0.0.1:8094"
|
||||
# address = "tcp6://127.0.0.1:8094"
|
||||
# address = "tcp6://[2001:db8::1]:8094"
|
||||
# address = "udp://127.0.0.1:8094"
|
||||
# address = "udp4://127.0.0.1:8094"
|
||||
# address = "udp6://127.0.0.1:8094"
|
||||
# address = "unix:///tmp/telegraf.sock"
|
||||
# address = "unixgram:///tmp/telegraf.sock"
|
||||
|
||||
## Data format to generate.
|
||||
## Each data format has it's own unique set of configuration options, read
|
||||
## more about them here:
|
||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||
# data_format = "influx"
|
||||
`
|
||||
}
|
||||
|
||||
func (sw *SocketWriter) SetSerializer(s serializers.Serializer) {
|
||||
sw.Serializer = s
|
||||
}
|
||||
|
||||
func (sw *SocketWriter) Connect() error {
|
||||
spl := strings.SplitN(sw.Address, "://", 2)
|
||||
if len(spl) != 2 {
|
||||
return fmt.Errorf("invalid address: %s", sw.Address)
|
||||
}
|
||||
|
||||
c, err := net.Dial(spl[0], spl[1])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sw.Conn = c
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write writes the given metrics to the destination.
|
||||
// If an error is encountered, it is up to the caller to retry the same write again later.
|
||||
// Not parallel safe.
|
||||
func (sw *SocketWriter) Write(metrics []telegraf.Metric) error {
|
||||
if sw.Conn == nil {
|
||||
// previous write failed with permanent error and socket was closed.
|
||||
if err := sw.Connect(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, m := range metrics {
|
||||
bs, err := sw.Serialize(m)
|
||||
if err != nil {
|
||||
//TODO log & keep going with remaining metrics
|
||||
return err
|
||||
}
|
||||
if _, err := sw.Conn.Write(bs); err != nil {
|
||||
//TODO log & keep going with remaining strings
|
||||
if err, ok := err.(net.Error); !ok || !err.Temporary() {
|
||||
// permanent error. close the connection
|
||||
sw.Close()
|
||||
sw.Conn = nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func newSocketWriter() *SocketWriter {
|
||||
s, _ := serializers.NewInfluxSerializer()
|
||||
return &SocketWriter{
|
||||
Serializer: s,
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("socket_writer", func() telegraf.Output { return newSocketWriter() })
|
||||
}
|
||||
187
plugins/outputs/socket_writer/socket_writer_test.go
Normal file
187
plugins/outputs/socket_writer/socket_writer_test.go
Normal file
@@ -0,0 +1,187 @@
|
||||
package socket_writer
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"net"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSocketWriter_tcp(t *testing.T) {
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
sw := newSocketWriter()
|
||||
sw.Address = "tcp://" + listener.Addr().String()
|
||||
|
||||
err = sw.Connect()
|
||||
require.NoError(t, err)
|
||||
|
||||
lconn, err := listener.Accept()
|
||||
require.NoError(t, err)
|
||||
|
||||
testSocketWriter_stream(t, sw, lconn)
|
||||
}
|
||||
|
||||
func TestSocketWriter_udp(t *testing.T) {
|
||||
listener, err := net.ListenPacket("udp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
sw := newSocketWriter()
|
||||
sw.Address = "udp://" + listener.LocalAddr().String()
|
||||
|
||||
err = sw.Connect()
|
||||
require.NoError(t, err)
|
||||
|
||||
testSocketWriter_packet(t, sw, listener)
|
||||
}
|
||||
|
||||
func TestSocketWriter_unix(t *testing.T) {
|
||||
defer os.Remove("/tmp/telegraf_test.sock")
|
||||
listener, err := net.Listen("unix", "/tmp/telegraf_test.sock")
|
||||
require.NoError(t, err)
|
||||
|
||||
sw := newSocketWriter()
|
||||
sw.Address = "unix:///tmp/telegraf_test.sock"
|
||||
|
||||
err = sw.Connect()
|
||||
require.NoError(t, err)
|
||||
|
||||
lconn, err := listener.Accept()
|
||||
require.NoError(t, err)
|
||||
|
||||
testSocketWriter_stream(t, sw, lconn)
|
||||
}
|
||||
|
||||
func TestSocketWriter_unixgram(t *testing.T) {
|
||||
defer os.Remove("/tmp/telegraf_test.sock")
|
||||
listener, err := net.ListenPacket("unixgram", "/tmp/telegraf_test.sock")
|
||||
require.NoError(t, err)
|
||||
|
||||
sw := newSocketWriter()
|
||||
sw.Address = "unixgram:///tmp/telegraf_test.sock"
|
||||
|
||||
err = sw.Connect()
|
||||
require.NoError(t, err)
|
||||
|
||||
testSocketWriter_packet(t, sw, listener)
|
||||
}
|
||||
|
||||
func testSocketWriter_stream(t *testing.T, sw *SocketWriter, lconn net.Conn) {
|
||||
metrics := []telegraf.Metric{}
|
||||
metrics = append(metrics, testutil.TestMetric(1, "test"))
|
||||
mbs1out, _ := sw.Serialize(metrics[0])
|
||||
metrics = append(metrics, testutil.TestMetric(2, "test"))
|
||||
mbs2out, _ := sw.Serialize(metrics[1])
|
||||
|
||||
err := sw.Write(metrics)
|
||||
require.NoError(t, err)
|
||||
|
||||
scnr := bufio.NewScanner(lconn)
|
||||
require.True(t, scnr.Scan())
|
||||
mstr1in := scnr.Text() + "\n"
|
||||
require.True(t, scnr.Scan())
|
||||
mstr2in := scnr.Text() + "\n"
|
||||
|
||||
assert.Equal(t, string(mbs1out), mstr1in)
|
||||
assert.Equal(t, string(mbs2out), mstr2in)
|
||||
}
|
||||
|
||||
func testSocketWriter_packet(t *testing.T, sw *SocketWriter, lconn net.PacketConn) {
|
||||
metrics := []telegraf.Metric{}
|
||||
metrics = append(metrics, testutil.TestMetric(1, "test"))
|
||||
mbs1out, _ := sw.Serialize(metrics[0])
|
||||
metrics = append(metrics, testutil.TestMetric(2, "test"))
|
||||
mbs2out, _ := sw.Serialize(metrics[1])
|
||||
|
||||
err := sw.Write(metrics)
|
||||
require.NoError(t, err)
|
||||
|
||||
buf := make([]byte, 256)
|
||||
var mstrins []string
|
||||
for len(mstrins) < 2 {
|
||||
n, _, err := lconn.ReadFrom(buf)
|
||||
require.NoError(t, err)
|
||||
for _, bs := range bytes.Split(buf[:n], []byte{'\n'}) {
|
||||
if len(bs) == 0 {
|
||||
continue
|
||||
}
|
||||
mstrins = append(mstrins, string(bs)+"\n")
|
||||
}
|
||||
}
|
||||
require.Len(t, mstrins, 2)
|
||||
|
||||
assert.Equal(t, string(mbs1out), mstrins[0])
|
||||
assert.Equal(t, string(mbs2out), mstrins[1])
|
||||
}
|
||||
|
||||
func TestSocketWriter_Write_err(t *testing.T) {
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
sw := newSocketWriter()
|
||||
sw.Address = "tcp://" + listener.Addr().String()
|
||||
|
||||
err = sw.Connect()
|
||||
require.NoError(t, err)
|
||||
sw.Conn.(*net.TCPConn).SetReadBuffer(256)
|
||||
|
||||
lconn, err := listener.Accept()
|
||||
require.NoError(t, err)
|
||||
lconn.(*net.TCPConn).SetWriteBuffer(256)
|
||||
|
||||
metrics := []telegraf.Metric{testutil.TestMetric(1, "testerr")}
|
||||
|
||||
// close the socket to generate an error
|
||||
lconn.Close()
|
||||
sw.Close()
|
||||
err = sw.Write(metrics)
|
||||
require.Error(t, err)
|
||||
assert.Nil(t, sw.Conn)
|
||||
}
|
||||
|
||||
func TestSocketWriter_Write_reconnect(t *testing.T) {
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
sw := newSocketWriter()
|
||||
sw.Address = "tcp://" + listener.Addr().String()
|
||||
|
||||
err = sw.Connect()
|
||||
require.NoError(t, err)
|
||||
sw.Conn.(*net.TCPConn).SetReadBuffer(256)
|
||||
|
||||
lconn, err := listener.Accept()
|
||||
require.NoError(t, err)
|
||||
lconn.(*net.TCPConn).SetWriteBuffer(256)
|
||||
lconn.Close()
|
||||
sw.Conn = nil
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
var lerr error
|
||||
go func() {
|
||||
lconn, lerr = listener.Accept()
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
metrics := []telegraf.Metric{testutil.TestMetric(1, "testerr")}
|
||||
err = sw.Write(metrics)
|
||||
require.NoError(t, err)
|
||||
|
||||
wg.Wait()
|
||||
assert.NoError(t, lerr)
|
||||
|
||||
mbsout, _ := sw.Serialize(metrics[0])
|
||||
buf := make([]byte, 256)
|
||||
n, err := lconn.Read(buf)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, string(mbsout), string(buf[:n]))
|
||||
}
|
||||
Reference in New Issue
Block a user