Adding a TCP input listener

closes #481
This commit is contained in:
Cameron Sparr 2016-03-02 15:31:46 +00:00 committed by Michele Fadda
parent b863992a13
commit 3b8eaeb234
5 changed files with 555 additions and 0 deletions

View File

@ -10,6 +10,7 @@
- [#735](https://github.com/influxdata/telegraf/pull/735): SNMP Table feature. Thanks @titilambert! - [#735](https://github.com/influxdata/telegraf/pull/735): SNMP Table feature. Thanks @titilambert!
- [#754](https://github.com/influxdata/telegraf/pull/754): docker plugin: adding `docker info` metrics to output. Thanks @titilambert! - [#754](https://github.com/influxdata/telegraf/pull/754): docker plugin: adding `docker info` metrics to output. Thanks @titilambert!
- [#788](https://github.com/influxdata/telegraf/pull/788): -input-list and -output-list command-line options. Thanks @ebookbug! - [#788](https://github.com/influxdata/telegraf/pull/788): -input-list and -output-list command-line options. Thanks @ebookbug!
- [#778](https://github.com/influxdata/telegraf/pull/778): Adding a TCP input listener.
### Bugfixes ### Bugfixes
- [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":" - [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":"

View File

@ -47,6 +47,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver" _ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"
_ "github.com/influxdata/telegraf/plugins/inputs/statsd" _ "github.com/influxdata/telegraf/plugins/inputs/statsd"
_ "github.com/influxdata/telegraf/plugins/inputs/system" _ "github.com/influxdata/telegraf/plugins/inputs/system"
_ "github.com/influxdata/telegraf/plugins/inputs/tcp_listener"
_ "github.com/influxdata/telegraf/plugins/inputs/trig" _ "github.com/influxdata/telegraf/plugins/inputs/trig"
_ "github.com/influxdata/telegraf/plugins/inputs/twemproxy" _ "github.com/influxdata/telegraf/plugins/inputs/twemproxy"
_ "github.com/influxdata/telegraf/plugins/inputs/udp_listener" _ "github.com/influxdata/telegraf/plugins/inputs/udp_listener"

View File

@ -0,0 +1,30 @@
# TCP listener service input plugin
The TCP listener is a service input plugin that listens for messages on a TCP
socket and adds those messages to InfluxDB.
The plugin expects messages in the
[Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md).
### Configuration:
This is a sample configuration for the plugin.
```toml
# Generic TCP listener
[[inputs.tcp_listener]]
## Address and port to host TCP listener on
service_address = ":8094"
## Number of TCP messages allowed to queue up. Once filled, the
## TCP listener will start dropping packets.
allowed_pending_messages = 10000
## Maximum number of concurrent TCP connections to allow
max_tcp_connections = 250
## Data format to consume. This can be "json", "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
```

View File

@ -0,0 +1,264 @@
package tcp_listener
import (
"bufio"
"fmt"
"log"
"net"
"sync"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)
type TcpListener struct {
ServiceAddress string
AllowedPendingMessages int
MaxTCPConnections int `toml:"max_tcp_connections"`
sync.Mutex
// Lock for preventing a data race during resource cleanup
cleanup sync.Mutex
wg sync.WaitGroup
in chan []byte
done chan struct{}
// accept channel tracks how many active connections there are, if there
// is an available bool in accept, then we are below the maximum and can
// accept the connection
accept chan bool
// track the listener here so we can close it in Stop()
listener *net.TCPListener
// track current connections so we can close them in Stop()
conns map[string]*net.TCPConn
parser parsers.Parser
acc telegraf.Accumulator
}
var dropwarn = "ERROR: Message queue full. Discarding line [%s] " +
"You may want to increase allowed_pending_messages in the config\n"
const sampleConfig = `
## Address and port to host TCP listener on
service_address = ":8094"
## Number of TCP messages allowed to queue up. Once filled, the
## TCP listener will start dropping packets.
allowed_pending_messages = 10000
## Maximum number of concurrent TCP connections to allow
max_tcp_connections = 250
## Data format to consume. This can be "json", "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
`
func (t *TcpListener) SampleConfig() string {
return sampleConfig
}
func (t *TcpListener) Description() string {
return "Generic TCP listener"
}
// All the work is done in the Start() function, so this is just a dummy
// function.
func (t *TcpListener) Gather(_ telegraf.Accumulator) error {
return nil
}
func (t *TcpListener) SetParser(parser parsers.Parser) {
t.parser = parser
}
// Start starts the tcp listener service.
func (t *TcpListener) Start(acc telegraf.Accumulator) error {
t.Lock()
defer t.Unlock()
t.acc = acc
t.in = make(chan []byte, t.AllowedPendingMessages)
t.done = make(chan struct{})
t.accept = make(chan bool, t.MaxTCPConnections)
t.conns = make(map[string]*net.TCPConn)
for i := 0; i < t.MaxTCPConnections; i++ {
t.accept <- true
}
// Start listener
var err error
address, _ := net.ResolveTCPAddr("tcp", t.ServiceAddress)
t.listener, err = net.ListenTCP("tcp", address)
if err != nil {
log.Fatalf("ERROR: ListenUDP - %s", err)
return err
}
log.Println("TCP server listening on: ", t.listener.Addr().String())
t.wg.Add(2)
go t.tcpListen()
go t.tcpParser()
log.Printf("Started TCP listener service on %s\n", t.ServiceAddress)
return nil
}
// Stop cleans up all resources
func (t *TcpListener) Stop() {
t.Lock()
defer t.Unlock()
close(t.done)
t.listener.Close()
// Close all open TCP connections
// - get all conns from the t.conns map and put into slice
// - this is so the forget() function doesnt conflict with looping
// over the t.conns map
var conns []*net.TCPConn
t.cleanup.Lock()
for _, conn := range t.conns {
conns = append(conns, conn)
}
t.cleanup.Unlock()
for _, conn := range conns {
conn.Close()
}
t.wg.Wait()
close(t.in)
log.Println("Stopped TCP listener service on ", t.ServiceAddress)
}
// tcpListen listens for incoming TCP connections.
func (t *TcpListener) tcpListen() error {
defer t.wg.Done()
for {
select {
case <-t.done:
return nil
default:
// Accept connection:
conn, err := t.listener.AcceptTCP()
if err != nil {
return err
}
log.Printf("Received TCP Connection from %s", conn.RemoteAddr())
select {
case <-t.accept:
// not over connection limit, handle the connection properly.
t.wg.Add(1)
// generate a random id for this TCPConn
id := internal.RandomString(6)
t.remember(id, conn)
go t.handler(conn, id)
default:
// We are over the connection limit, refuse & close.
t.refuser(conn)
}
}
}
}
// refuser refuses a TCP connection
func (t *TcpListener) refuser(conn *net.TCPConn) {
// Tell the connection why we are closing.
fmt.Fprintf(conn, "Telegraf maximum concurrent TCP connections (%d)"+
" reached, closing.\nYou may want to increase max_tcp_connections in"+
" the Telegraf tcp listener configuration.\n", t.MaxTCPConnections)
conn.Close()
log.Printf("Refused TCP Connection from %s", conn.RemoteAddr())
log.Printf("WARNING: Maximum TCP Connections reached, you may want to" +
" adjust max_tcp_connections")
}
// handler handles a single TCP Connection
func (t *TcpListener) handler(conn *net.TCPConn, id string) {
// connection cleanup function
defer func() {
t.wg.Done()
conn.Close()
log.Printf("Closed TCP Connection from %s", conn.RemoteAddr())
// Add one connection potential back to channel when this one closes
t.accept <- true
t.forget(id)
}()
scanner := bufio.NewScanner(conn)
for {
select {
case <-t.done:
return
default:
if !scanner.Scan() {
return
}
buf := scanner.Bytes()
select {
case t.in <- buf:
default:
log.Printf(dropwarn, string(buf))
}
}
}
}
// tcpParser parses the incoming tcp byte packets
func (t *TcpListener) tcpParser() error {
defer t.wg.Done()
for {
select {
case <-t.done:
return nil
case packet := <-t.in:
if len(packet) == 0 {
continue
}
metrics, err := t.parser.Parse(packet)
if err == nil {
t.storeMetrics(metrics)
} else {
log.Printf("Malformed packet: [%s], Error: %s\n",
string(packet), err)
}
}
}
}
func (t *TcpListener) storeMetrics(metrics []telegraf.Metric) error {
t.Lock()
defer t.Unlock()
for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
return nil
}
// forget a TCP connection
func (t *TcpListener) forget(id string) {
t.cleanup.Lock()
defer t.cleanup.Unlock()
delete(t.conns, id)
}
// remember a TCP connection
func (t *TcpListener) remember(id string, conn *net.TCPConn) {
t.cleanup.Lock()
defer t.cleanup.Unlock()
t.conns[id] = conn
}
func init() {
inputs.Add("tcp_listener", func() telegraf.Input {
return &TcpListener{}
})
}

View File

@ -0,0 +1,259 @@
package tcp_listener
import (
"fmt"
"net"
"testing"
"time"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
testMsg = "cpu_load_short,host=server01 value=12.0 1422568543702900257\n"
testMsgs = `
cpu_load_short,host=server02 value=12.0 1422568543702900257
cpu_load_short,host=server03 value=12.0 1422568543702900257
cpu_load_short,host=server04 value=12.0 1422568543702900257
cpu_load_short,host=server05 value=12.0 1422568543702900257
cpu_load_short,host=server06 value=12.0 1422568543702900257
`
)
func newTestTcpListener() (*TcpListener, chan []byte) {
in := make(chan []byte, 1500)
listener := &TcpListener{
ServiceAddress: ":8194",
AllowedPendingMessages: 10000,
MaxTCPConnections: 250,
in: in,
done: make(chan struct{}),
}
return listener, in
}
func TestConnectTCP(t *testing.T) {
listener := TcpListener{
ServiceAddress: ":8194",
AllowedPendingMessages: 10000,
MaxTCPConnections: 250,
}
listener.parser, _ = parsers.NewInfluxParser()
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
time.Sleep(time.Millisecond * 25)
conn, err := net.Dial("tcp", "127.0.0.1:8194")
require.NoError(t, err)
// send single message to socket
fmt.Fprintf(conn, testMsg)
time.Sleep(time.Millisecond * 15)
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"},
)
// send multiple messages to socket
fmt.Fprintf(conn, testMsgs)
time.Sleep(time.Millisecond * 15)
hostTags := []string{"server02", "server03",
"server04", "server05", "server06"}
for _, hostTag := range hostTags {
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": hostTag},
)
}
}
// Test that MaxTCPConections is respected
func TestConcurrentConns(t *testing.T) {
listener := TcpListener{
ServiceAddress: ":8195",
AllowedPendingMessages: 10000,
MaxTCPConnections: 2,
}
listener.parser, _ = parsers.NewInfluxParser()
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
time.Sleep(time.Millisecond * 25)
_, err := net.Dial("tcp", "127.0.0.1:8195")
assert.NoError(t, err)
_, err = net.Dial("tcp", "127.0.0.1:8195")
assert.NoError(t, err)
// Connection over the limit:
conn, err := net.Dial("tcp", "127.0.0.1:8195")
assert.NoError(t, err)
net.Dial("tcp", "127.0.0.1:8195")
buf := make([]byte, 1500)
n, err := conn.Read(buf)
assert.NoError(t, err)
assert.Equal(t,
"Telegraf maximum concurrent TCP connections (2) reached, closing.\n"+
"You may want to increase max_tcp_connections in"+
" the Telegraf tcp listener configuration.\n",
string(buf[:n]))
_, err = conn.Write([]byte(testMsg))
assert.NoError(t, err)
time.Sleep(time.Millisecond * 10)
assert.Zero(t, acc.NFields())
}
// Test that MaxTCPConections is respected when max==1
func TestConcurrentConns1(t *testing.T) {
listener := TcpListener{
ServiceAddress: ":8196",
AllowedPendingMessages: 10000,
MaxTCPConnections: 1,
}
listener.parser, _ = parsers.NewInfluxParser()
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
time.Sleep(time.Millisecond * 25)
_, err := net.Dial("tcp", "127.0.0.1:8196")
assert.NoError(t, err)
// Connection over the limit:
conn, err := net.Dial("tcp", "127.0.0.1:8196")
assert.NoError(t, err)
net.Dial("tcp", "127.0.0.1:8196")
buf := make([]byte, 1500)
n, err := conn.Read(buf)
assert.NoError(t, err)
assert.Equal(t,
"Telegraf maximum concurrent TCP connections (1) reached, closing.\n"+
"You may want to increase max_tcp_connections in"+
" the Telegraf tcp listener configuration.\n",
string(buf[:n]))
_, err = conn.Write([]byte(testMsg))
assert.NoError(t, err)
time.Sleep(time.Millisecond * 10)
assert.Zero(t, acc.NFields())
}
// Test that MaxTCPConections is respected
func TestCloseConcurrentConns(t *testing.T) {
listener := TcpListener{
ServiceAddress: ":8195",
AllowedPendingMessages: 10000,
MaxTCPConnections: 2,
}
listener.parser, _ = parsers.NewInfluxParser()
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
time.Sleep(time.Millisecond * 25)
_, err := net.Dial("tcp", "127.0.0.1:8195")
assert.NoError(t, err)
_, err = net.Dial("tcp", "127.0.0.1:8195")
assert.NoError(t, err)
listener.Stop()
}
func TestRunParser(t *testing.T) {
var testmsg = []byte(testMsg)
listener, in := newTestTcpListener()
acc := testutil.Accumulator{}
listener.acc = &acc
defer close(listener.done)
listener.parser, _ = parsers.NewInfluxParser()
listener.wg.Add(1)
go listener.tcpParser()
in <- testmsg
time.Sleep(time.Millisecond * 25)
listener.Gather(&acc)
if a := acc.NFields(); a != 1 {
t.Errorf("got %v, expected %v", a, 1)
}
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"},
)
}
func TestRunParserInvalidMsg(t *testing.T) {
var testmsg = []byte("cpu_load_short")
listener, in := newTestTcpListener()
acc := testutil.Accumulator{}
listener.acc = &acc
defer close(listener.done)
listener.parser, _ = parsers.NewInfluxParser()
listener.wg.Add(1)
go listener.tcpParser()
in <- testmsg
time.Sleep(time.Millisecond * 25)
if a := acc.NFields(); a != 0 {
t.Errorf("got %v, expected %v", a, 0)
}
}
func TestRunParserGraphiteMsg(t *testing.T) {
var testmsg = []byte("cpu.load.graphite 12 1454780029")
listener, in := newTestTcpListener()
acc := testutil.Accumulator{}
listener.acc = &acc
defer close(listener.done)
listener.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
listener.wg.Add(1)
go listener.tcpParser()
in <- testmsg
time.Sleep(time.Millisecond * 25)
listener.Gather(&acc)
acc.AssertContainsFields(t, "cpu_load_graphite",
map[string]interface{}{"value": float64(12)})
}
func TestRunParserJSONMsg(t *testing.T) {
var testmsg = []byte("{\"a\": 5, \"b\": {\"c\": 6}}\n")
listener, in := newTestTcpListener()
acc := testutil.Accumulator{}
listener.acc = &acc
defer close(listener.done)
listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil)
listener.wg.Add(1)
go listener.tcpParser()
in <- testmsg
time.Sleep(time.Millisecond * 25)
listener.Gather(&acc)
acc.AssertContainsFields(t, "udp_json_test",
map[string]interface{}{
"a": float64(5),
"b_c": float64(6),
})
}