Add TCP listener for statsd input (#2293)

This commit is contained in:
Slawomir Skowron 2017-08-08 20:41:26 +02:00 committed by Daniel Nelson
parent 6ebb93abcc
commit b9b5b74ede
5 changed files with 350 additions and 8 deletions

View File

@ -26,6 +26,7 @@
- [#2487](https://github.com/influxdata/telegraf/pull/2487): Add Kafka 0.9+ consumer support - [#2487](https://github.com/influxdata/telegraf/pull/2487): Add Kafka 0.9+ consumer support
- [#2773](https://github.com/influxdata/telegraf/pull/2773): Add support for self-signed certs to InfluxDB input plugin - [#2773](https://github.com/influxdata/telegraf/pull/2773): Add support for self-signed certs to InfluxDB input plugin
- [#2293](https://github.com/influxdata/telegraf/pull/2293): Add TCP listener for statsd input
- [#2581](https://github.com/influxdata/telegraf/pull/2581): Add Docker container environment variables as tags. Only whitelisted - [#2581](https://github.com/influxdata/telegraf/pull/2581): Add Docker container environment variables as tags. Only whitelisted
- [#2817](https://github.com/influxdata/telegraf/pull/2817): Add timeout option to IPMI sensor plugin - [#2817](https://github.com/influxdata/telegraf/pull/2817): Add timeout option to IPMI sensor plugin
- [#2883](https://github.com/influxdata/telegraf/pull/2883): Add support for an optional SSL/TLS configuration to nginx input plugin - [#2883](https://github.com/influxdata/telegraf/pull/2883): Add support for an optional SSL/TLS configuration to nginx input plugin

View File

@ -2452,6 +2452,10 @@
# # Statsd Server # # Statsd Server
# [[inputs.statsd]] # [[inputs.statsd]]
# ## Protocol, must be "tcp" or "udp"
# protocol = "udp"
# ## Maximum number of concurrent TCP connections to allow
# max_tcp_connections = 250
# ## Address and port to host UDP listener on # ## Address and port to host UDP listener on
# service_address = ":8125" # service_address = ":8125"
# #

View File

@ -5,6 +5,12 @@
```toml ```toml
# Statsd Server # Statsd Server
[[inputs.statsd]] [[inputs.statsd]]
## Protocol, must be "tcp" or "udp" (default=udp)
protocol = "udp"
## MaxTCPConnection - applicable when protocol is set to tcp (default=250)
max_tcp_connections = 250
## Address and port to host UDP listener on ## Address and port to host UDP listener on
service_address = ":8125" service_address = ":8125"
@ -146,6 +152,9 @@ metric type:
### Plugin arguments ### Plugin arguments
- **protocol** string: Protocol used in listener - tcp or udp options
- **max_tcp_connections** []int: Maximum number of concurrent TCP connections
to allow. Used when protocol is set to tcp.
- **service_address** string: Address to listen for statsd UDP packets on - **service_address** string: Address to listen for statsd UDP packets on
- **delete_gauges** boolean: Delete gauges on every collection interval - **delete_gauges** boolean: Delete gauges on every collection interval
- **delete_counters** boolean: Delete counters on every collection interval - **delete_counters** boolean: Delete counters on every collection interval

View File

@ -1,6 +1,7 @@
package statsd package statsd
import ( import (
"bufio"
"errors" "errors"
"fmt" "fmt"
"log" "log"
@ -14,7 +15,9 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/graphite" "github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/selfstat"
) )
const ( const (
@ -24,15 +27,24 @@ const (
defaultFieldName = "value" defaultFieldName = "value"
defaultProtocol = "udp"
defaultSeparator = "_" defaultSeparator = "_"
defaultAllowPendingMessage = 10000 defaultAllowPendingMessage = 10000
MaxTCPConnections = 250
) )
var dropwarn = "E! Error: statsd message queue full. " + var dropwarn = "E! Error: statsd message queue full. " +
"We have dropped %d messages so far. " + "We have dropped %d messages so far. " +
"You may want to increase allowed_pending_messages in the config\n" "You may want to increase allowed_pending_messages in the config\n"
var malformedwarn = "E! Statsd over TCP has received %d malformed packets" +
" thus far."
type Statsd struct { type Statsd struct {
// Protocol used on listener - udp or tcp
Protocol string `toml:"protocol"`
// Address & Port to serve from // Address & Port to serve from
ServiceAddress string ServiceAddress string
@ -64,9 +76,17 @@ type Statsd struct {
UDPPacketSize int `toml:"udp_packet_size"` UDPPacketSize int `toml:"udp_packet_size"`
sync.Mutex sync.Mutex
// Lock for preventing a data race during resource cleanup
cleanup sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
// accept channel tracks how many active connections there are, if there
// is an available bool in accept, then we are below the maximum and can
// accept the connection
accept chan bool
// drops tracks the number of dropped metrics. // drops tracks the number of dropped metrics.
drops int drops int
// malformed tracks the number of malformed packets
malformed int
// Channel for all incoming statsd packets // Channel for all incoming statsd packets
in chan []byte in chan []byte
@ -83,9 +103,24 @@ type Statsd struct {
// bucket -> influx templates // bucket -> influx templates
Templates []string Templates []string
listener *net.UDPConn // Protocol listeners
UDPlistener *net.UDPConn
TCPlistener *net.TCPListener
// track current connections so we can close them in Stop()
conns map[string]*net.TCPConn
MaxTCPConnections int `toml:"max_tcp_connections"`
graphiteParser *graphite.GraphiteParser graphiteParser *graphite.GraphiteParser
acc telegraf.Accumulator
MaxConnections selfstat.Stat
CurrentConnections selfstat.Stat
TotalConnections selfstat.Stat
PacketsRecv selfstat.Stat
BytesRecv selfstat.Stat
} }
// One statsd metric, form is <bucket>:<value>|<mtype>|@<samplerate> // One statsd metric, form is <bucket>:<value>|<mtype>|@<samplerate>
@ -128,10 +163,16 @@ type cachedtimings struct {
} }
func (_ *Statsd) Description() string { func (_ *Statsd) Description() string {
return "Statsd Server" return "Statsd UDP/TCP Server"
} }
const sampleConfig = ` const sampleConfig = `
## Protocol, must be "tcp" or "udp" (default=udp)
protocol = "udp"
## MaxTCPConnection - applicable when protocol is set to tcp (default=250)
max_tcp_connections = 250
## Address and port to host UDP listener on ## Address and port to host UDP listener on
service_address = ":8125" service_address = ":8125"
@ -247,6 +288,27 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error {
s.sets = make(map[string]cachedset) s.sets = make(map[string]cachedset)
s.timings = make(map[string]cachedtimings) s.timings = make(map[string]cachedtimings)
s.Lock()
defer s.Unlock()
//
tags := map[string]string{
"address": s.ServiceAddress,
}
s.MaxConnections = selfstat.Register("statsd", "tcp_max_connections", tags)
s.MaxConnections.Set(int64(s.MaxTCPConnections))
s.CurrentConnections = selfstat.Register("statsd", "tcp_current_connections", tags)
s.TotalConnections = selfstat.Register("statsd", "tcp_total_connections", tags)
s.PacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags)
s.BytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags)
s.in = make(chan []byte, s.AllowedPendingMessages)
s.done = make(chan struct{})
s.accept = make(chan bool, s.MaxTCPConnections)
s.conns = make(map[string]*net.TCPConn)
for i := 0; i < s.MaxTCPConnections; i++ {
s.accept <- true
}
if s.ConvertNames { if s.ConvertNames {
log.Printf("I! WARNING statsd: convert_names config option is deprecated," + log.Printf("I! WARNING statsd: convert_names config option is deprecated," +
" please use metric_separator instead") " please use metric_separator instead")
@ -258,23 +320,67 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error {
s.wg.Add(2) s.wg.Add(2)
// Start the UDP listener // Start the UDP listener
switch s.Protocol {
case "udp":
go s.udpListen() go s.udpListen()
case "tcp":
go s.tcpListen()
}
// Start the line parser // Start the line parser
go s.parser() go s.parser()
log.Printf("I! Started the statsd service on %s\n", s.ServiceAddress) log.Printf("I! Started the statsd service on %s\n", s.ServiceAddress)
return nil return nil
} }
// tcpListen() starts listening for udp packets on the configured port.
func (s *Statsd) tcpListen() error {
defer s.wg.Done()
// Start listener
var err error
address, _ := net.ResolveTCPAddr("tcp", s.ServiceAddress)
s.TCPlistener, err = net.ListenTCP("tcp", address)
if err != nil {
log.Fatalf("ERROR: ListenTCP - %s", err)
return err
}
log.Println("I! TCP Statsd listening on: ", s.TCPlistener.Addr().String())
for {
select {
case <-s.done:
return nil
default:
// Accept connection:
conn, err := s.TCPlistener.AcceptTCP()
if err != nil {
return err
}
select {
case <-s.accept:
// not over connection limit, handle the connection properly.
s.wg.Add(1)
// generate a random id for this TCPConn
id := internal.RandomString(6)
s.remember(id, conn)
go s.handler(conn, id)
default:
// We are over the connection limit, refuse & close.
s.refuser(conn)
}
}
}
}
// udpListen starts listening for udp packets on the configured port. // udpListen starts listening for udp packets on the configured port.
func (s *Statsd) udpListen() error { func (s *Statsd) udpListen() error {
defer s.wg.Done() defer s.wg.Done()
var err error var err error
address, _ := net.ResolveUDPAddr("udp", s.ServiceAddress) address, _ := net.ResolveUDPAddr("udp", s.ServiceAddress)
s.listener, err = net.ListenUDP("udp", address) s.UDPlistener, err = net.ListenUDP("udp", address)
if err != nil { if err != nil {
log.Fatalf("ERROR: ListenUDP - %s", err) log.Fatalf("ERROR: ListenUDP - %s", err)
} }
log.Println("I! Statsd listener listening on: ", s.listener.LocalAddr().String()) log.Println("I! Statsd UDP listener listening on: ", s.UDPlistener.LocalAddr().String())
buf := make([]byte, UDP_MAX_PACKET_SIZE) buf := make([]byte, UDP_MAX_PACKET_SIZE)
for { for {
@ -282,7 +388,7 @@ func (s *Statsd) udpListen() error {
case <-s.done: case <-s.done:
return nil return nil
default: default:
n, _, err := s.listener.ReadFromUDP(buf) n, _, err := s.UDPlistener.ReadFromUDP(buf)
if err != nil && !strings.Contains(err.Error(), "closed network") { if err != nil && !strings.Contains(err.Error(), "closed network") {
log.Printf("E! Error READ: %s\n", err.Error()) log.Printf("E! Error READ: %s\n", err.Error())
continue continue
@ -637,20 +743,111 @@ func (s *Statsd) aggregate(m metric) {
} }
} }
// handler handles a single TCP Connection
func (s *Statsd) handler(conn *net.TCPConn, id string) {
s.CurrentConnections.Incr(1)
s.TotalConnections.Incr(1)
// connection cleanup function
defer func() {
s.wg.Done()
conn.Close()
// Add one connection potential back to channel when this one closes
s.accept <- true
s.forget(id)
s.CurrentConnections.Incr(-1)
}()
var n int
scanner := bufio.NewScanner(conn)
for {
select {
case <-s.done:
return
default:
if !scanner.Scan() {
return
}
n = len(scanner.Bytes())
if n == 0 {
continue
}
s.BytesRecv.Incr(int64(n))
s.PacketsRecv.Incr(1)
bufCopy := make([]byte, n+1)
copy(bufCopy, scanner.Bytes())
bufCopy[n] = '\n'
select {
case s.in <- bufCopy:
default:
s.drops++
if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 {
log.Printf(dropwarn, s.drops)
}
}
}
}
}
// refuser refuses a TCP connection
func (s *Statsd) refuser(conn *net.TCPConn) {
conn.Close()
log.Printf("I! Refused TCP Connection from %s", conn.RemoteAddr())
log.Printf("I! WARNING: Maximum TCP Connections reached, you may want to" +
" adjust max_tcp_connections")
}
// forget a TCP connection
func (s *Statsd) forget(id string) {
s.cleanup.Lock()
defer s.cleanup.Unlock()
delete(s.conns, id)
}
// remember a TCP connection
func (s *Statsd) remember(id string, conn *net.TCPConn) {
s.cleanup.Lock()
defer s.cleanup.Unlock()
s.conns[id] = conn
}
func (s *Statsd) Stop() { func (s *Statsd) Stop() {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
log.Println("I! Stopping the statsd service") log.Println("I! Stopping the statsd service")
close(s.done) close(s.done)
s.listener.Close() switch s.Protocol {
case "udp":
s.UDPlistener.Close()
case "tcp":
s.TCPlistener.Close()
// Close all open TCP connections
// - get all conns from the s.conns map and put into slice
// - this is so the forget() function doesnt conflict with looping
// over the s.conns map
var conns []*net.TCPConn
s.cleanup.Lock()
for _, conn := range s.conns {
conns = append(conns, conn)
}
s.cleanup.Unlock()
for _, conn := range conns {
conn.Close()
}
default:
s.UDPlistener.Close()
}
s.wg.Wait() s.wg.Wait()
close(s.in) close(s.in)
log.Println("I! Stopped Statsd listener service on ", s.ServiceAddress)
} }
func init() { func init() {
inputs.Add("statsd", func() telegraf.Input { inputs.Add("statsd", func() telegraf.Input {
return &Statsd{ return &Statsd{
Protocol: defaultProtocol,
ServiceAddress: ":8125", ServiceAddress: ":8125",
MaxTCPConnections: 250,
MetricSeparator: "_", MetricSeparator: "_",
AllowedPendingMessages: defaultAllowPendingMessage, AllowedPendingMessages: defaultAllowPendingMessage,
DeleteCounters: true, DeleteCounters: true,

View File

@ -3,11 +3,32 @@ package statsd
import ( import (
"errors" "errors"
"fmt" "fmt"
"net"
"testing" "testing"
"time"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
const (
testMsg = "test.tcp.msg:100|c"
)
func newTestTcpListener() (*Statsd, chan []byte) {
in := make(chan []byte, 1500)
listener := &Statsd{
Protocol: "tcp",
ServiceAddress: ":8125",
AllowedPendingMessages: 10000,
MaxTCPConnections: 250,
in: in,
done: make(chan struct{}),
}
return listener, in
}
func NewTestStatsd() *Statsd { func NewTestStatsd() *Statsd {
s := Statsd{} s := Statsd{}
@ -24,6 +45,116 @@ func NewTestStatsd() *Statsd {
return &s return &s
} }
// Test that MaxTCPConections is respected
func TestConcurrentConns(t *testing.T) {
listener := Statsd{
Protocol: "tcp",
ServiceAddress: ":8125",
AllowedPendingMessages: 10000,
MaxTCPConnections: 2,
}
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
time.Sleep(time.Millisecond * 25)
_, err := net.Dial("tcp", "127.0.0.1:8125")
assert.NoError(t, err)
_, err = net.Dial("tcp", "127.0.0.1:8125")
assert.NoError(t, err)
// Connection over the limit:
conn, err := net.Dial("tcp", "127.0.0.1:8125")
assert.NoError(t, err)
net.Dial("tcp", "127.0.0.1:8125")
assert.NoError(t, err)
_, err = conn.Write([]byte(testMsg))
assert.NoError(t, err)
time.Sleep(time.Millisecond * 10)
assert.Zero(t, acc.NFields())
}
// Test that MaxTCPConections is respected when max==1
func TestConcurrentConns1(t *testing.T) {
listener := Statsd{
Protocol: "tcp",
ServiceAddress: ":8125",
AllowedPendingMessages: 10000,
MaxTCPConnections: 1,
}
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
time.Sleep(time.Millisecond * 25)
_, err := net.Dial("tcp", "127.0.0.1:8125")
assert.NoError(t, err)
// Connection over the limit:
conn, err := net.Dial("tcp", "127.0.0.1:8125")
assert.NoError(t, err)
net.Dial("tcp", "127.0.0.1:8125")
assert.NoError(t, err)
_, err = conn.Write([]byte(testMsg))
assert.NoError(t, err)
time.Sleep(time.Millisecond * 10)
assert.Zero(t, acc.NFields())
}
// Test that MaxTCPConections is respected
func TestCloseConcurrentConns(t *testing.T) {
listener := Statsd{
Protocol: "tcp",
ServiceAddress: ":8125",
AllowedPendingMessages: 10000,
MaxTCPConnections: 2,
}
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
time.Sleep(time.Millisecond * 25)
_, err := net.Dial("tcp", "127.0.0.1:8125")
assert.NoError(t, err)
_, err = net.Dial("tcp", "127.0.0.1:8125")
assert.NoError(t, err)
listener.Stop()
}
// benchmark how long it takes to accept & process 100,000 metrics:
func BenchmarkTCP(b *testing.B) {
listener := Statsd{
Protocol: "tcp",
ServiceAddress: ":8125",
AllowedPendingMessages: 250000,
MaxTCPConnections: 250,
}
acc := &testutil.Accumulator{Discard: true}
// send multiple messages to socket
for n := 0; n < b.N; n++ {
err := listener.Start(acc)
if err != nil {
panic(err)
}
time.Sleep(time.Millisecond * 25)
conn, err := net.Dial("tcp", "127.0.0.1:8125")
if err != nil {
panic(err)
}
for i := 0; i < 250000; i++ {
fmt.Fprintf(conn, testMsg)
}
// wait for 250,000 metrics to get added to accumulator
time.Sleep(time.Millisecond)
listener.Stop()
}
}
// Valid lines should be parsed and their values should be cached // Valid lines should be parsed and their values should be cached
func TestParse_ValidLines(t *testing.T) { func TestParse_ValidLines(t *testing.T) {
s := NewTestStatsd() s := NewTestStatsd()