tcp/udp listeners, remove locks & improve test coverage

This commit is contained in:
Cameron Sparr 2016-07-27 18:16:29 +01:00
parent 841729c0f9
commit c991b579d2
5 changed files with 176 additions and 32 deletions

View File

@ -158,7 +158,6 @@ func (t *TcpListener) tcpListen() error {
if err != nil { if err != nil {
return err return err
} }
// log.Printf("Received TCP Connection from %s", conn.RemoteAddr())
select { select {
case <-t.accept: case <-t.accept:
@ -194,7 +193,6 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) {
defer func() { defer func() {
t.wg.Done() t.wg.Done()
conn.Close() conn.Close()
// log.Printf("Closed TCP Connection from %s", conn.RemoteAddr())
// Add one connection potential back to channel when this one closes // Add one connection potential back to channel when this one closes
t.accept <- true t.accept <- true
t.forget(id) t.forget(id)
@ -239,14 +237,19 @@ func (t *TcpListener) tcpParser() error {
for { for {
select { select {
case <-t.done: case <-t.done:
return nil // drain input packets before finishing:
if len(t.in) == 0 {
return nil
}
case packet = <-t.in: case packet = <-t.in:
if len(packet) == 0 { if len(packet) == 0 {
continue continue
} }
metrics, err = t.parser.Parse(packet) metrics, err = t.parser.Parse(packet)
if err == nil { if err == nil {
t.storeMetrics(metrics) for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
} else { } else {
t.malformed++ t.malformed++
if t.malformed == 1 || t.malformed%1000 == 0 { if t.malformed == 1 || t.malformed%1000 == 0 {
@ -257,15 +260,6 @@ func (t *TcpListener) tcpParser() error {
} }
} }
func (t *TcpListener) storeMetrics(metrics []telegraf.Metric) error {
t.Lock()
defer t.Unlock()
for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
return nil
}
// forget a TCP connection // forget a TCP connection
func (t *TcpListener) forget(id string) { func (t *TcpListener) forget(id string) {
t.cleanup.Lock() t.cleanup.Lock()

View File

@ -37,6 +37,62 @@ func newTestTcpListener() (*TcpListener, chan []byte) {
return listener, in return listener, in
} }
// benchmark how long it takes to accept & process 100,000 metrics:
func BenchmarkTCP(b *testing.B) {
listener := TcpListener{
ServiceAddress: ":8198",
AllowedPendingMessages: 100000,
MaxTCPConnections: 250,
}
listener.parser, _ = parsers.NewInfluxParser()
acc := &testutil.Accumulator{Discard: true}
// send multiple messages to socket
for n := 0; n < b.N; n++ {
err := listener.Start(acc)
if err != nil {
panic(err)
}
time.Sleep(time.Millisecond * 25)
conn, err := net.Dial("tcp", "127.0.0.1:8198")
if err != nil {
panic(err)
}
for i := 0; i < 100000; i++ {
fmt.Fprintf(conn, testMsg)
}
// wait for 100,000 metrics to get added to accumulator
time.Sleep(time.Millisecond)
listener.Stop()
}
}
func TestHighTrafficTCP(t *testing.T) {
listener := TcpListener{
ServiceAddress: ":8199",
AllowedPendingMessages: 100000,
MaxTCPConnections: 250,
}
listener.parser, _ = parsers.NewInfluxParser()
acc := &testutil.Accumulator{}
// send multiple messages to socket
err := listener.Start(acc)
require.NoError(t, err)
time.Sleep(time.Millisecond * 25)
conn, err := net.Dial("tcp", "127.0.0.1:8199")
require.NoError(t, err)
for i := 0; i < 100000; i++ {
fmt.Fprintf(conn, testMsg)
}
time.Sleep(time.Millisecond)
listener.Stop()
assert.Equal(t, 100000, len(acc.Metrics))
}
func TestConnectTCP(t *testing.T) { func TestConnectTCP(t *testing.T) {
listener := TcpListener{ listener := TcpListener{
ServiceAddress: ":8194", ServiceAddress: ":8194",

View File

@ -3,8 +3,8 @@ package udp_listener
import ( import (
"log" "log"
"net" "net"
"strings"
"sync" "sync"
"time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
@ -99,9 +99,11 @@ func (u *UdpListener) Start(acc telegraf.Accumulator) error {
} }
func (u *UdpListener) Stop() { func (u *UdpListener) Stop() {
u.Lock()
defer u.Unlock()
close(u.done) close(u.done)
u.listener.Close()
u.wg.Wait() u.wg.Wait()
u.listener.Close()
close(u.in) close(u.in)
log.Println("Stopped UDP listener service on ", u.ServiceAddress) log.Println("Stopped UDP listener service on ", u.ServiceAddress)
} }
@ -122,9 +124,13 @@ func (u *UdpListener) udpListen() error {
case <-u.done: case <-u.done:
return nil return nil
default: default:
u.listener.SetReadDeadline(time.Now().Add(time.Second))
n, _, err := u.listener.ReadFromUDP(buf) n, _, err := u.listener.ReadFromUDP(buf)
if err != nil && !strings.Contains(err.Error(), "closed network") { if err != nil {
log.Printf("ERROR: %s\n", err.Error()) if err, ok := err.(net.Error); ok && err.Timeout() {
} else {
log.Printf("ERROR: %s\n", err.Error())
}
continue continue
} }
bufCopy := make([]byte, n) bufCopy := make([]byte, n)
@ -151,11 +157,15 @@ func (u *UdpListener) udpParser() error {
for { for {
select { select {
case <-u.done: case <-u.done:
return nil if len(u.in) == 0 {
return nil
}
case packet = <-u.in: case packet = <-u.in:
metrics, err = u.parser.Parse(packet) metrics, err = u.parser.Parse(packet)
if err == nil { if err == nil {
u.storeMetrics(metrics) for _, m := range metrics {
u.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
} else { } else {
u.malformed++ u.malformed++
if u.malformed == 1 || u.malformed%1000 == 0 { if u.malformed == 1 || u.malformed%1000 == 0 {
@ -166,15 +176,6 @@ func (u *UdpListener) udpParser() error {
} }
} }
func (u *UdpListener) storeMetrics(metrics []telegraf.Metric) error {
u.Lock()
defer u.Unlock()
for _, m := range metrics {
u.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
return nil
}
func init() { func init() {
inputs.Add("udp_listener", func() telegraf.Input { inputs.Add("udp_listener", func() telegraf.Input {
return &UdpListener{} return &UdpListener{}

View File

@ -1,20 +1,36 @@
package udp_listener package udp_listener
import ( import (
"fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"net"
"testing" "testing"
"time" "time"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
testMsg = "cpu_load_short,host=server01 value=12.0 1422568543702900257\n"
testMsgs = `
cpu_load_short,host=server02 value=12.0 1422568543702900257
cpu_load_short,host=server03 value=12.0 1422568543702900257
cpu_load_short,host=server04 value=12.0 1422568543702900257
cpu_load_short,host=server05 value=12.0 1422568543702900257
cpu_load_short,host=server06 value=12.0 1422568543702900257
`
) )
func newTestUdpListener() (*UdpListener, chan []byte) { func newTestUdpListener() (*UdpListener, chan []byte) {
in := make(chan []byte, 1500) in := make(chan []byte, 1500)
listener := &UdpListener{ listener := &UdpListener{
ServiceAddress: ":8125", ServiceAddress: ":8125",
UDPPacketSize: 1500,
AllowedPendingMessages: 10000, AllowedPendingMessages: 10000,
in: in, in: in,
done: make(chan struct{}), done: make(chan struct{}),
@ -22,6 +38,72 @@ func newTestUdpListener() (*UdpListener, chan []byte) {
return listener, in return listener, in
} }
func TestHighTrafficUDP(t *testing.T) {
listener := UdpListener{
ServiceAddress: ":8126",
AllowedPendingMessages: 100000,
}
listener.parser, _ = parsers.NewInfluxParser()
acc := &testutil.Accumulator{}
// send multiple messages to socket
err := listener.Start(acc)
require.NoError(t, err)
time.Sleep(time.Millisecond * 25)
conn, err := net.Dial("udp", "127.0.0.1:8126")
require.NoError(t, err)
for i := 0; i < 20000; i++ {
// arbitrary, just to give the OS buffer some slack handling the
// packet storm.
time.Sleep(time.Microsecond)
fmt.Fprintf(conn, testMsgs)
}
time.Sleep(time.Millisecond)
listener.Stop()
// this is not an exact science, since UDP packets can easily get lost or
// dropped, but assume that the OS will be able to
// handle at least 90% of the sent UDP packets.
assert.InDelta(t, 100000, len(acc.Metrics), 10000)
}
func TestConnectUDP(t *testing.T) {
listener := UdpListener{
ServiceAddress: ":8127",
AllowedPendingMessages: 10000,
}
listener.parser, _ = parsers.NewInfluxParser()
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
time.Sleep(time.Millisecond * 25)
conn, err := net.Dial("udp", "127.0.0.1:8127")
require.NoError(t, err)
// send single message to socket
fmt.Fprintf(conn, testMsg)
time.Sleep(time.Millisecond * 15)
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"},
)
// send multiple messages to socket
fmt.Fprintf(conn, testMsgs)
time.Sleep(time.Millisecond * 15)
hostTags := []string{"server02", "server03",
"server04", "server05", "server06"}
for _, hostTag := range hostTags {
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": hostTag},
)
}
}
func TestRunParser(t *testing.T) { func TestRunParser(t *testing.T) {
log.SetOutput(ioutil.Discard) log.SetOutput(ioutil.Discard)
var testmsg = []byte("cpu_load_short,host=server01 value=12.0 1422568543702900257") var testmsg = []byte("cpu_load_short,host=server01 value=12.0 1422568543702900257")

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"sync" "sync"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -27,9 +28,11 @@ func (p *Metric) String() string {
type Accumulator struct { type Accumulator struct {
sync.Mutex sync.Mutex
Metrics []*Metric Metrics []*Metric
Errors []error nMetrics uint64
debug bool Discard bool
Errors []error
debug bool
} }
// Add adds a measurement point to the accumulator // Add adds a measurement point to the accumulator
@ -43,6 +46,10 @@ func (a *Accumulator) Add(
a.AddFields(measurement, fields, tags, t...) a.AddFields(measurement, fields, tags, t...)
} }
func (a *Accumulator) NMetrics() uint64 {
return atomic.LoadUint64(&a.nMetrics)
}
// AddFields adds a measurement point with a specified timestamp. // AddFields adds a measurement point with a specified timestamp.
func (a *Accumulator) AddFields( func (a *Accumulator) AddFields(
measurement string, measurement string,
@ -50,6 +57,10 @@ func (a *Accumulator) AddFields(
tags map[string]string, tags map[string]string,
timestamp ...time.Time, timestamp ...time.Time,
) { ) {
atomic.AddUint64(&a.nMetrics, 1)
if a.Discard {
return
}
a.Lock() a.Lock()
defer a.Unlock() defer a.Unlock()
if tags == nil { if tags == nil {