Add threaded parsing in statsd input plugin (#6922)

This commit is contained in:
josephpeacock 2020-03-03 14:47:33 -08:00 committed by GitHub
parent f04d84994d
commit ab8438dcc6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 88 additions and 17 deletions

View File

@ -31,6 +31,8 @@ const (
defaultSeparator = "_"
defaultAllowPendingMessage = 10000
MaxTCPConnections = 250
parserGoRoutines = 5
)
// Statsd allows the importing of statsd and dogstatsd data.
@ -398,12 +400,14 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error {
}()
}
for i := 1; i <= parserGoRoutines; i++ {
// Start the line parser
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.parser()
}()
}
s.Log.Infof("Started the statsd service on %q", s.ServiceAddress)
return nil
}

View File

@ -2,19 +2,21 @@ package statsd
import (
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"net"
"sync"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
testMsg = "test.tcp.msg:100|c"
producerThreads = 10
)
func NewTestStatsd() *Statsd {
@ -137,15 +139,30 @@ func BenchmarkUDP(b *testing.B) {
if err != nil {
panic(err)
}
for i := 0; i < 250000; i++ {
fmt.Fprintf(conn, testMsg)
var wg sync.WaitGroup
for i := 1; i <= producerThreads; i++ {
wg.Add(1)
go sendRequests(conn, &wg)
}
wg.Wait()
// wait for 250,000 metrics to get added to accumulator
for len(listener.in) > 0 {
fmt.Printf("Left in buffer: %v \n", len(listener.in))
time.Sleep(time.Millisecond)
}
listener.Stop()
}
}
func sendRequests(conn net.Conn, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 25000; i++ {
fmt.Fprintf(conn, testMsg)
}
}
// benchmark how long it takes to accept & process 100,000 metrics:
func BenchmarkTCP(b *testing.B) {
listener := Statsd{
@ -169,11 +186,16 @@ func BenchmarkTCP(b *testing.B) {
if err != nil {
panic(err)
}
for i := 0; i < 250000; i++ {
fmt.Fprintf(conn, testMsg)
var wg sync.WaitGroup
for i := 1; i <= producerThreads; i++ {
wg.Add(1)
go sendRequests(conn, &wg)
}
wg.Wait()
// wait for 250,000 metrics to get added to accumulator
for len(listener.in) > 0 {
time.Sleep(time.Millisecond)
}
listener.Stop()
}
}
@ -1678,3 +1700,48 @@ func TestTCP(t *testing.T) {
testutil.IgnoreTime(),
)
}
func TestUdp(t *testing.T) {
statsd := Statsd{
Log: testutil.Logger{},
Protocol: "udp",
ServiceAddress: "localhost:8125",
AllowedPendingMessages: 250000,
}
var acc testutil.Accumulator
require.NoError(t, statsd.Start(&acc))
defer statsd.Stop()
conn, err := net.Dial("udp", "127.0.0.1:8125")
_, err = conn.Write([]byte("cpu.time_idle:42|c\n"))
require.NoError(t, err)
err = conn.Close()
require.NoError(t, err)
for {
err = statsd.Gather(&acc)
require.NoError(t, err)
if len(acc.Metrics) > 0 {
break
}
}
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
testutil.MustMetric(
"cpu_time_idle",
map[string]string{
"metric_type": "counter",
},
map[string]interface{}{
"value": 42,
},
time.Now(),
telegraf.Counter,
),
},
acc.GetTelegrafMetrics(),
testutil.IgnoreTime(),
)
}