diff --git a/CHANGELOG.md b/CHANGELOG.md index 73a942730..fe87e41dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - [#735](https://github.com/influxdata/telegraf/pull/735): SNMP Table feature. Thanks @titilambert! - [#754](https://github.com/influxdata/telegraf/pull/754): docker plugin: adding `docker info` metrics to output. Thanks @titilambert! - [#788](https://github.com/influxdata/telegraf/pull/788): -input-list and -output-list command-line options. Thanks @ebookbug! +- [#778](https://github.com/influxdata/telegraf/pull/778): Adding a TCP input listener. ### Bugfixes - [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":" diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 262de37ac..2808ce2b5 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -47,6 +47,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/sqlserver" _ "github.com/influxdata/telegraf/plugins/inputs/statsd" _ "github.com/influxdata/telegraf/plugins/inputs/system" + _ "github.com/influxdata/telegraf/plugins/inputs/tcp_listener" _ "github.com/influxdata/telegraf/plugins/inputs/trig" _ "github.com/influxdata/telegraf/plugins/inputs/twemproxy" _ "github.com/influxdata/telegraf/plugins/inputs/udp_listener" diff --git a/plugins/inputs/tcp_listener/README.md b/plugins/inputs/tcp_listener/README.md new file mode 100644 index 000000000..63a7dea3c --- /dev/null +++ b/plugins/inputs/tcp_listener/README.md @@ -0,0 +1,30 @@ +# TCP listener service input plugin + +The TCP listener is a service input plugin that listens for messages on a TCP +socket and adds those messages to InfluxDB. +The plugin expects messages in the +[Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md). + +### Configuration: + +This is a sample configuration for the plugin. + +```toml +# Generic TCP listener +[[inputs.tcp_listener]] + ## Address and port to host TCP listener on + service_address = ":8094" + + ## Number of TCP messages allowed to queue up. Once filled, the + ## TCP listener will start dropping packets. + allowed_pending_messages = 10000 + + ## Maximum number of concurrent TCP connections to allow + max_tcp_connections = 250 + + ## Data format to consume. This can be "json", "influx" or "graphite" + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +``` diff --git a/plugins/inputs/tcp_listener/tcp_listener.go b/plugins/inputs/tcp_listener/tcp_listener.go new file mode 100644 index 000000000..dd239fedf --- /dev/null +++ b/plugins/inputs/tcp_listener/tcp_listener.go @@ -0,0 +1,264 @@ +package tcp_listener + +import ( + "bufio" + "fmt" + "log" + "net" + "sync" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" +) + +type TcpListener struct { + ServiceAddress string + AllowedPendingMessages int + MaxTCPConnections int `toml:"max_tcp_connections"` + + sync.Mutex + // Lock for preventing a data race during resource cleanup + cleanup sync.Mutex + wg sync.WaitGroup + + in chan []byte + done chan struct{} + // accept channel tracks how many active connections there are, if there + // is an available bool in accept, then we are below the maximum and can + // accept the connection + accept chan bool + + // track the listener here so we can close it in Stop() + listener *net.TCPListener + // track current connections so we can close them in Stop() + conns map[string]*net.TCPConn + + parser parsers.Parser + acc telegraf.Accumulator +} + +var dropwarn = "ERROR: Message queue full. Discarding line [%s] " + + "You may want to increase allowed_pending_messages in the config\n" + +const sampleConfig = ` + ## Address and port to host TCP listener on + service_address = ":8094" + + ## Number of TCP messages allowed to queue up. Once filled, the + ## TCP listener will start dropping packets. + allowed_pending_messages = 10000 + + ## Maximum number of concurrent TCP connections to allow + max_tcp_connections = 250 + + ## Data format to consume. This can be "json", "influx" or "graphite" + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +` + +func (t *TcpListener) SampleConfig() string { + return sampleConfig +} + +func (t *TcpListener) Description() string { + return "Generic TCP listener" +} + +// All the work is done in the Start() function, so this is just a dummy +// function. +func (t *TcpListener) Gather(_ telegraf.Accumulator) error { + return nil +} + +func (t *TcpListener) SetParser(parser parsers.Parser) { + t.parser = parser +} + +// Start starts the tcp listener service. +func (t *TcpListener) Start(acc telegraf.Accumulator) error { + t.Lock() + defer t.Unlock() + + t.acc = acc + t.in = make(chan []byte, t.AllowedPendingMessages) + t.done = make(chan struct{}) + t.accept = make(chan bool, t.MaxTCPConnections) + t.conns = make(map[string]*net.TCPConn) + for i := 0; i < t.MaxTCPConnections; i++ { + t.accept <- true + } + + // Start listener + var err error + address, _ := net.ResolveTCPAddr("tcp", t.ServiceAddress) + t.listener, err = net.ListenTCP("tcp", address) + if err != nil { + log.Fatalf("ERROR: ListenUDP - %s", err) + return err + } + log.Println("TCP server listening on: ", t.listener.Addr().String()) + + t.wg.Add(2) + go t.tcpListen() + go t.tcpParser() + + log.Printf("Started TCP listener service on %s\n", t.ServiceAddress) + return nil +} + +// Stop cleans up all resources +func (t *TcpListener) Stop() { + t.Lock() + defer t.Unlock() + close(t.done) + t.listener.Close() + + // Close all open TCP connections + // - get all conns from the t.conns map and put into slice + // - this is so the forget() function doesnt conflict with looping + // over the t.conns map + var conns []*net.TCPConn + t.cleanup.Lock() + for _, conn := range t.conns { + conns = append(conns, conn) + } + t.cleanup.Unlock() + for _, conn := range conns { + conn.Close() + } + + t.wg.Wait() + close(t.in) + log.Println("Stopped TCP listener service on ", t.ServiceAddress) +} + +// tcpListen listens for incoming TCP connections. +func (t *TcpListener) tcpListen() error { + defer t.wg.Done() + + for { + select { + case <-t.done: + return nil + default: + // Accept connection: + conn, err := t.listener.AcceptTCP() + if err != nil { + return err + } + + log.Printf("Received TCP Connection from %s", conn.RemoteAddr()) + + select { + case <-t.accept: + // not over connection limit, handle the connection properly. + t.wg.Add(1) + // generate a random id for this TCPConn + id := internal.RandomString(6) + t.remember(id, conn) + go t.handler(conn, id) + default: + // We are over the connection limit, refuse & close. + t.refuser(conn) + } + } + } +} + +// refuser refuses a TCP connection +func (t *TcpListener) refuser(conn *net.TCPConn) { + // Tell the connection why we are closing. + fmt.Fprintf(conn, "Telegraf maximum concurrent TCP connections (%d)"+ + " reached, closing.\nYou may want to increase max_tcp_connections in"+ + " the Telegraf tcp listener configuration.\n", t.MaxTCPConnections) + conn.Close() + log.Printf("Refused TCP Connection from %s", conn.RemoteAddr()) + log.Printf("WARNING: Maximum TCP Connections reached, you may want to" + + " adjust max_tcp_connections") +} + +// handler handles a single TCP Connection +func (t *TcpListener) handler(conn *net.TCPConn, id string) { + // connection cleanup function + defer func() { + t.wg.Done() + conn.Close() + log.Printf("Closed TCP Connection from %s", conn.RemoteAddr()) + // Add one connection potential back to channel when this one closes + t.accept <- true + t.forget(id) + }() + + scanner := bufio.NewScanner(conn) + for { + select { + case <-t.done: + return + default: + if !scanner.Scan() { + return + } + buf := scanner.Bytes() + select { + case t.in <- buf: + default: + log.Printf(dropwarn, string(buf)) + } + } + } +} + +// tcpParser parses the incoming tcp byte packets +func (t *TcpListener) tcpParser() error { + defer t.wg.Done() + for { + select { + case <-t.done: + return nil + case packet := <-t.in: + if len(packet) == 0 { + continue + } + metrics, err := t.parser.Parse(packet) + if err == nil { + t.storeMetrics(metrics) + } else { + log.Printf("Malformed packet: [%s], Error: %s\n", + string(packet), err) + } + } + } +} + +func (t *TcpListener) storeMetrics(metrics []telegraf.Metric) error { + t.Lock() + defer t.Unlock() + for _, m := range metrics { + t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + } + return nil +} + +// forget a TCP connection +func (t *TcpListener) forget(id string) { + t.cleanup.Lock() + defer t.cleanup.Unlock() + delete(t.conns, id) +} + +// remember a TCP connection +func (t *TcpListener) remember(id string, conn *net.TCPConn) { + t.cleanup.Lock() + defer t.cleanup.Unlock() + t.conns[id] = conn +} + +func init() { + inputs.Add("tcp_listener", func() telegraf.Input { + return &TcpListener{} + }) +} diff --git a/plugins/inputs/tcp_listener/tcp_listener_test.go b/plugins/inputs/tcp_listener/tcp_listener_test.go new file mode 100644 index 000000000..b4aec9dd2 --- /dev/null +++ b/plugins/inputs/tcp_listener/tcp_listener_test.go @@ -0,0 +1,259 @@ +package tcp_listener + +import ( + "fmt" + "net" + "testing" + "time" + + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/testutil" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + testMsg = "cpu_load_short,host=server01 value=12.0 1422568543702900257\n" + + testMsgs = ` +cpu_load_short,host=server02 value=12.0 1422568543702900257 +cpu_load_short,host=server03 value=12.0 1422568543702900257 +cpu_load_short,host=server04 value=12.0 1422568543702900257 +cpu_load_short,host=server05 value=12.0 1422568543702900257 +cpu_load_short,host=server06 value=12.0 1422568543702900257 +` +) + +func newTestTcpListener() (*TcpListener, chan []byte) { + in := make(chan []byte, 1500) + listener := &TcpListener{ + ServiceAddress: ":8194", + AllowedPendingMessages: 10000, + MaxTCPConnections: 250, + in: in, + done: make(chan struct{}), + } + return listener, in +} + +func TestConnectTCP(t *testing.T) { + listener := TcpListener{ + ServiceAddress: ":8194", + AllowedPendingMessages: 10000, + MaxTCPConnections: 250, + } + listener.parser, _ = parsers.NewInfluxParser() + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + conn, err := net.Dial("tcp", "127.0.0.1:8194") + require.NoError(t, err) + + // send single message to socket + fmt.Fprintf(conn, testMsg) + time.Sleep(time.Millisecond * 15) + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": "server01"}, + ) + + // send multiple messages to socket + fmt.Fprintf(conn, testMsgs) + time.Sleep(time.Millisecond * 15) + hostTags := []string{"server02", "server03", + "server04", "server05", "server06"} + for _, hostTag := range hostTags { + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": hostTag}, + ) + } +} + +// Test that MaxTCPConections is respected +func TestConcurrentConns(t *testing.T) { + listener := TcpListener{ + ServiceAddress: ":8195", + AllowedPendingMessages: 10000, + MaxTCPConnections: 2, + } + listener.parser, _ = parsers.NewInfluxParser() + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + _, err := net.Dial("tcp", "127.0.0.1:8195") + assert.NoError(t, err) + _, err = net.Dial("tcp", "127.0.0.1:8195") + assert.NoError(t, err) + + // Connection over the limit: + conn, err := net.Dial("tcp", "127.0.0.1:8195") + assert.NoError(t, err) + net.Dial("tcp", "127.0.0.1:8195") + buf := make([]byte, 1500) + n, err := conn.Read(buf) + assert.NoError(t, err) + assert.Equal(t, + "Telegraf maximum concurrent TCP connections (2) reached, closing.\n"+ + "You may want to increase max_tcp_connections in"+ + " the Telegraf tcp listener configuration.\n", + string(buf[:n])) + + _, err = conn.Write([]byte(testMsg)) + assert.NoError(t, err) + time.Sleep(time.Millisecond * 10) + assert.Zero(t, acc.NFields()) +} + +// Test that MaxTCPConections is respected when max==1 +func TestConcurrentConns1(t *testing.T) { + listener := TcpListener{ + ServiceAddress: ":8196", + AllowedPendingMessages: 10000, + MaxTCPConnections: 1, + } + listener.parser, _ = parsers.NewInfluxParser() + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + _, err := net.Dial("tcp", "127.0.0.1:8196") + assert.NoError(t, err) + + // Connection over the limit: + conn, err := net.Dial("tcp", "127.0.0.1:8196") + assert.NoError(t, err) + net.Dial("tcp", "127.0.0.1:8196") + buf := make([]byte, 1500) + n, err := conn.Read(buf) + assert.NoError(t, err) + assert.Equal(t, + "Telegraf maximum concurrent TCP connections (1) reached, closing.\n"+ + "You may want to increase max_tcp_connections in"+ + " the Telegraf tcp listener configuration.\n", + string(buf[:n])) + + _, err = conn.Write([]byte(testMsg)) + assert.NoError(t, err) + time.Sleep(time.Millisecond * 10) + assert.Zero(t, acc.NFields()) +} + +// Test that MaxTCPConections is respected +func TestCloseConcurrentConns(t *testing.T) { + listener := TcpListener{ + ServiceAddress: ":8195", + AllowedPendingMessages: 10000, + MaxTCPConnections: 2, + } + listener.parser, _ = parsers.NewInfluxParser() + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + + time.Sleep(time.Millisecond * 25) + _, err := net.Dial("tcp", "127.0.0.1:8195") + assert.NoError(t, err) + _, err = net.Dial("tcp", "127.0.0.1:8195") + assert.NoError(t, err) + + listener.Stop() +} + +func TestRunParser(t *testing.T) { + var testmsg = []byte(testMsg) + + listener, in := newTestTcpListener() + acc := testutil.Accumulator{} + listener.acc = &acc + defer close(listener.done) + + listener.parser, _ = parsers.NewInfluxParser() + listener.wg.Add(1) + go listener.tcpParser() + + in <- testmsg + time.Sleep(time.Millisecond * 25) + listener.Gather(&acc) + + if a := acc.NFields(); a != 1 { + t.Errorf("got %v, expected %v", a, 1) + } + + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": "server01"}, + ) +} + +func TestRunParserInvalidMsg(t *testing.T) { + var testmsg = []byte("cpu_load_short") + + listener, in := newTestTcpListener() + acc := testutil.Accumulator{} + listener.acc = &acc + defer close(listener.done) + + listener.parser, _ = parsers.NewInfluxParser() + listener.wg.Add(1) + go listener.tcpParser() + + in <- testmsg + time.Sleep(time.Millisecond * 25) + + if a := acc.NFields(); a != 0 { + t.Errorf("got %v, expected %v", a, 0) + } +} + +func TestRunParserGraphiteMsg(t *testing.T) { + var testmsg = []byte("cpu.load.graphite 12 1454780029") + + listener, in := newTestTcpListener() + acc := testutil.Accumulator{} + listener.acc = &acc + defer close(listener.done) + + listener.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) + listener.wg.Add(1) + go listener.tcpParser() + + in <- testmsg + time.Sleep(time.Millisecond * 25) + listener.Gather(&acc) + + acc.AssertContainsFields(t, "cpu_load_graphite", + map[string]interface{}{"value": float64(12)}) +} + +func TestRunParserJSONMsg(t *testing.T) { + var testmsg = []byte("{\"a\": 5, \"b\": {\"c\": 6}}\n") + + listener, in := newTestTcpListener() + acc := testutil.Accumulator{} + listener.acc = &acc + defer close(listener.done) + + listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil) + listener.wg.Add(1) + go listener.tcpParser() + + in <- testmsg + time.Sleep(time.Millisecond * 25) + listener.Gather(&acc) + + acc.AssertContainsFields(t, "udp_json_test", + map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }) +}