diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 28354e7e4..cd1fa3b23 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -19,4 +19,5 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/opentsdb" _ "github.com/influxdata/telegraf/plugins/outputs/prometheus_client" _ "github.com/influxdata/telegraf/plugins/outputs/riemann" + _ "github.com/influxdata/telegraf/plugins/outputs/tcp_forwarder" ) diff --git a/plugins/outputs/tcp_forwarder/README.md b/plugins/outputs/tcp_forwarder/README.md new file mode 100644 index 000000000..a8b2e125a --- /dev/null +++ b/plugins/outputs/tcp_forwarder/README.md @@ -0,0 +1,20 @@ + +# Tcp Forwarder Output Plugin + +This plugin will send all metrics through TCP in the chosen format, this can be +use by example with tcp listener input plugin + +```toml +[[outputs.tcp_forwarder]] + ## TCP server/endpoint to send metrics to. + servers = ["localhost:8089"] + ## timeout in seconds for the write connection + timeout = 2 + ## reconnect before every push + reconnect = false + ## Data format to _output_. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +``` diff --git a/plugins/outputs/tcp_forwarder/tcp_forwarder.go b/plugins/outputs/tcp_forwarder/tcp_forwarder.go new file mode 100644 index 000000000..1d505de00 --- /dev/null +++ b/plugins/outputs/tcp_forwarder/tcp_forwarder.go @@ -0,0 +1,159 @@ +package tcp_forwarder + +import ( + "errors" + "fmt" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/serializers" + "io" + "log" + "net" + "strings" + "sync" + "time" +) + +// TCPForwarder structure for configuration and server +type TCPForwarder struct { + sync.Mutex + + Server string + Timeout internal.Duration + DataFormat string + Reconnect bool + conn net.Conn + serializer serializers.Serializer +} + +var sampleConfig = ` + ## TCP servers/endpoints to send metrics to. + server = "localhost:8089" + ## timeout for the write connection + timeout = "5s" + ## force reconnection before every push + reconnect = false + ## Data format to _output_. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +` + +// SetSerializer is the function from output plugin to use a serializer for +// formating data +func (t *TCPForwarder) SetSerializer(serializer serializers.Serializer) { + t.serializer = serializer +} + +// Connect is the default output plugin connection function who make sure it +// can connect to the endpoint +func (t *TCPForwarder) Connect() error { + + if len(t.Server) == 0 { + t.Server = "localhost:8089" + } + if t.Timeout.Duration.Seconds() < 1 { + t.Timeout.Duration = time.Second + } + + // try connect + if err := t.reconnect(); err != nil { + return err + } + + return nil +} + +func (t *TCPForwarder) reconnect() error { + if t.Reconnect { + t.Close() + } + if t.Reconnect || t.isClosed() { + conn, err := net.DialTimeout("tcp", t.Server, t.Timeout.Duration) + if err == nil { + fmt.Println("TCP_forwarder, re-connected: " + t.Server) + t.conn = conn + } else { + log.Printf("Error connecting to <%s>: %s", t.Server, err.Error()) + return err + } + } + return nil +} + +func (t *TCPForwarder) isClosed() bool { + var one []byte + if t.conn == nil { + return true + } + + t.conn.SetReadDeadline(time.Now()) + if _, err := t.conn.Read(one); err == io.EOF { + t.Close() + return true + } + return false +} + +// Close is use to close connection to all Tcp endpoints +func (t *TCPForwarder) Close() error { + t.Lock() + defer t.Unlock() + if t.conn != nil { + t.conn.Close() + t.conn = nil + } + return nil +} + +// SampleConfig is the default function who return the default configuration +// for tcp forwarder output +func (t *TCPForwarder) SampleConfig() string { + return sampleConfig +} + +// Description is the default function who return the description of tcp +// forwarder output +func (t *TCPForwarder) Description() string { + return "Generic TCP forwarder for metrics" +} + +// Write is the default function to call to "send" a metric through the Output +func (t *TCPForwarder) Write(metrics []telegraf.Metric) error { + // reconnect if needed + if err := t.reconnect(); err != nil { + return err + } + // Prepare data + t.Lock() + defer t.Unlock() + + var bp []string + for _, metric := range metrics { + sMetrics, err := t.serializer.Serialize(metric) + if err != nil { + log.Printf("Error while serializing some metrics: %s", err.Error()) + } + bp = append(bp, sMetrics...) + } + + // TODO should we add a join function in serialiser ? + points := strings.Join(bp, "\n") + "\n" + + t.conn.SetWriteDeadline(time.Now().Add(t.Timeout.Duration)) + if _, e := fmt.Fprintf(t.conn, points); e != nil { + fmt.Println("ERROR: " + e.Error()) + t.conn.Close() + t.conn = nil + return errors.New("Could not write to tcp endpoint\n") + } + return nil +} + +func init() { + outputs.Add("tcp_forwarder", func() telegraf.Output { + return &TCPForwarder{} + }) +} diff --git a/plugins/outputs/tcp_forwarder/tcp_forwarder_test.go b/plugins/outputs/tcp_forwarder/tcp_forwarder_test.go new file mode 100644 index 000000000..987b69ab1 --- /dev/null +++ b/plugins/outputs/tcp_forwarder/tcp_forwarder_test.go @@ -0,0 +1,96 @@ +package tcp_forwarder + +import ( + "bufio" + "fmt" + "log" + "net" + "net/textproto" + "sync" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/serializers/influx" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTCPForwarderError(t *testing.T) { + server := "127.0.0.1:8089" + // Init plugin + g := TCPForwarder{ + Server: server, + serializer: &influx.InfluxSerializer{}, + } + // Error + err := g.Connect() + assert.Equal( + t, + fmt.Sprintf("dial tcp %s: getsockopt: connection refused", server), + err.Error()) +} + +func TestTCPForwaderOK(t *testing.T) { + var wg sync.WaitGroup + // Start TCP server + wg.Add(1) + TCPServer(t, &wg) + // Give the fake TCP server some time to start: + // Init plugin + g := TCPForwarder{ + serializer: &influx.InfluxSerializer{}, + } + // Init metrics + m1, _ := telegraf.NewMetric( + "mymeasurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"myfield": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + m2, _ := telegraf.NewMetric( + "mymeasurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"value": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + m3, _ := telegraf.NewMetric( + "my_measurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"value": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + // Prepare point list + metrics := []telegraf.Metric{m1, m2, m3} + err1 := g.Connect() + require.NoError(t, err1) + // Send Data + err2 := g.Write(metrics) + require.NoError(t, err2) + // Waiting TCPserver + wg.Wait() + g.Close() +} + +func TCPServer(t *testing.T, wg *sync.WaitGroup) { + tcpServer, err := net.Listen("tcp", "127.0.0.1:8089") + if err != nil { + log.Printf("Couldn't Listen to port 8089: %s\n", err) + return + } + go func() { + defer wg.Done() + conn, _ := tcpServer.Accept() + reader := bufio.NewReader(conn) + tp := textproto.NewReader(reader) + data1, _ := tp.ReadLine() + assert.Equal(t, "mymeasurement,host=192.168.0.1 myfield=3.14 1289430000000000000", data1) + data2, _ := tp.ReadLine() + assert.Equal(t, "mymeasurement,host=192.168.0.1 value=3.14 1289430000000000000", data2) + data3, _ := tp.ReadLine() + assert.Equal(t, "my_measurement,host=192.168.0.1 value=3.14 1289430000000000000", data3) + conn.Close() + }() +}