New Tcp Forwarder Output plugin.

This can be use to forward metric to a centralised endpoint (telegraf or not)

Sample use case:
    Considering a Pool of server with telegraf and tcp_forwarder
    enabled, who will send data to some other sever with tcp_listener.
    This allow to have a better security, as credential for "real"
    output is not in every server of the Pool, and will allow to rotate
    credential much easier.
This commit is contained in:
tuier 2016-07-20 13:22:54 +02:00
parent b905bc1b5d
commit e791b0ab11
4 changed files with 276 additions and 0 deletions

View File

@ -19,4 +19,5 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/opentsdb"
_ "github.com/influxdata/telegraf/plugins/outputs/prometheus_client"
_ "github.com/influxdata/telegraf/plugins/outputs/riemann"
_ "github.com/influxdata/telegraf/plugins/outputs/tcp_forwarder"
)

View File

@ -0,0 +1,20 @@
# Tcp Forwarder Output Plugin
This plugin will send all metrics through TCP in the chosen format, this can be
use by example with tcp listener input plugin
```toml
[[outputs.tcp_forwarder]]
## TCP server/endpoint to send metrics to.
servers = ["localhost:8089"]
## timeout in seconds for the write connection
timeout = 2
## reconnect before every push
reconnect = false
## Data format to _output_.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
```

View File

@ -0,0 +1,159 @@
package tcp_forwarder
import (
"errors"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
"io"
"log"
"net"
"strings"
"sync"
"time"
)
// TCPForwarder structure for configuration and server
type TCPForwarder struct {
sync.Mutex
Server string
Timeout internal.Duration
DataFormat string
Reconnect bool
conn net.Conn
serializer serializers.Serializer
}
var sampleConfig = `
## TCP servers/endpoints to send metrics to.
server = "localhost:8089"
## timeout for the write connection
timeout = "5s"
## force reconnection before every push
reconnect = false
## Data format to _output_.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
`
// SetSerializer is the function from output plugin to use a serializer for
// formating data
func (t *TCPForwarder) SetSerializer(serializer serializers.Serializer) {
t.serializer = serializer
}
// Connect is the default output plugin connection function who make sure it
// can connect to the endpoint
func (t *TCPForwarder) Connect() error {
if len(t.Server) == 0 {
t.Server = "localhost:8089"
}
if t.Timeout.Duration.Seconds() < 1 {
t.Timeout.Duration = time.Second
}
// try connect
if err := t.reconnect(); err != nil {
return err
}
return nil
}
func (t *TCPForwarder) reconnect() error {
if t.Reconnect {
t.Close()
}
if t.Reconnect || t.isClosed() {
conn, err := net.DialTimeout("tcp", t.Server, t.Timeout.Duration)
if err == nil {
fmt.Println("TCP_forwarder, re-connected: " + t.Server)
t.conn = conn
} else {
log.Printf("Error connecting to <%s>: %s", t.Server, err.Error())
return err
}
}
return nil
}
func (t *TCPForwarder) isClosed() bool {
var one []byte
if t.conn == nil {
return true
}
t.conn.SetReadDeadline(time.Now())
if _, err := t.conn.Read(one); err == io.EOF {
t.Close()
return true
}
return false
}
// Close is use to close connection to all Tcp endpoints
func (t *TCPForwarder) Close() error {
t.Lock()
defer t.Unlock()
if t.conn != nil {
t.conn.Close()
t.conn = nil
}
return nil
}
// SampleConfig is the default function who return the default configuration
// for tcp forwarder output
func (t *TCPForwarder) SampleConfig() string {
return sampleConfig
}
// Description is the default function who return the description of tcp
// forwarder output
func (t *TCPForwarder) Description() string {
return "Generic TCP forwarder for metrics"
}
// Write is the default function to call to "send" a metric through the Output
func (t *TCPForwarder) Write(metrics []telegraf.Metric) error {
// reconnect if needed
if err := t.reconnect(); err != nil {
return err
}
// Prepare data
t.Lock()
defer t.Unlock()
var bp []string
for _, metric := range metrics {
sMetrics, err := t.serializer.Serialize(metric)
if err != nil {
log.Printf("Error while serializing some metrics: %s", err.Error())
}
bp = append(bp, sMetrics...)
}
// TODO should we add a join function in serialiser ?
points := strings.Join(bp, "\n") + "\n"
t.conn.SetWriteDeadline(time.Now().Add(t.Timeout.Duration))
if _, e := fmt.Fprintf(t.conn, points); e != nil {
fmt.Println("ERROR: " + e.Error())
t.conn.Close()
t.conn = nil
return errors.New("Could not write to tcp endpoint\n")
}
return nil
}
func init() {
outputs.Add("tcp_forwarder", func() telegraf.Output {
return &TCPForwarder{}
})
}

View File

@ -0,0 +1,96 @@
package tcp_forwarder
import (
"bufio"
"fmt"
"log"
"net"
"net/textproto"
"sync"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestTCPForwarderError(t *testing.T) {
server := "127.0.0.1:8089"
// Init plugin
g := TCPForwarder{
Server: server,
serializer: &influx.InfluxSerializer{},
}
// Error
err := g.Connect()
assert.Equal(
t,
fmt.Sprintf("dial tcp %s: getsockopt: connection refused", server),
err.Error())
}
func TestTCPForwaderOK(t *testing.T) {
var wg sync.WaitGroup
// Start TCP server
wg.Add(1)
TCPServer(t, &wg)
// Give the fake TCP server some time to start:
// Init plugin
g := TCPForwarder{
serializer: &influx.InfluxSerializer{},
}
// Init metrics
m1, _ := telegraf.NewMetric(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"myfield": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m2, _ := telegraf.NewMetric(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m3, _ := telegraf.NewMetric(
"my_measurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
// Prepare point list
metrics := []telegraf.Metric{m1, m2, m3}
err1 := g.Connect()
require.NoError(t, err1)
// Send Data
err2 := g.Write(metrics)
require.NoError(t, err2)
// Waiting TCPserver
wg.Wait()
g.Close()
}
func TCPServer(t *testing.T, wg *sync.WaitGroup) {
tcpServer, err := net.Listen("tcp", "127.0.0.1:8089")
if err != nil {
log.Printf("Couldn't Listen to port 8089: %s\n", err)
return
}
go func() {
defer wg.Done()
conn, _ := tcpServer.Accept()
reader := bufio.NewReader(conn)
tp := textproto.NewReader(reader)
data1, _ := tp.ReadLine()
assert.Equal(t, "mymeasurement,host=192.168.0.1 myfield=3.14 1289430000000000000", data1)
data2, _ := tp.ReadLine()
assert.Equal(t, "mymeasurement,host=192.168.0.1 value=3.14 1289430000000000000", data2)
data3, _ := tp.ReadLine()
assert.Equal(t, "my_measurement,host=192.168.0.1 value=3.14 1289430000000000000", data3)
conn.Close()
}()
}