From ab8438dcc62130ebff6daac49ebf3237d97348e2 Mon Sep 17 00:00:00 2001 From: josephpeacock <51184065+josephpeacock@users.noreply.github.com> Date: Tue, 3 Mar 2020 14:47:33 -0800 Subject: [PATCH] Add threaded parsing in statsd input plugin (#6922) --- plugins/inputs/statsd/statsd.go | 16 +++-- plugins/inputs/statsd/statsd_test.go | 89 ++++++++++++++++++++++++---- 2 files changed, 88 insertions(+), 17 deletions(-) diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 32b12a7e9..9c5780d00 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -31,6 +31,8 @@ const ( defaultSeparator = "_" defaultAllowPendingMessage = 10000 MaxTCPConnections = 250 + + parserGoRoutines = 5 ) // Statsd allows the importing of statsd and dogstatsd data. @@ -398,12 +400,14 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error { }() } - // Start the line parser - s.wg.Add(1) - go func() { - defer s.wg.Done() - s.parser() - }() + for i := 1; i <= parserGoRoutines; i++ { + // Start the line parser + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.parser() + }() + } s.Log.Infof("Started the statsd service on %q", s.ServiceAddress) return nil } diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index 1215eeb2d..f3daa117b 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -2,19 +2,21 @@ package statsd import ( "fmt" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "net" + "sync" "testing" "time" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) const ( - testMsg = "test.tcp.msg:100|c" + testMsg = "test.tcp.msg:100|c" + producerThreads = 10 ) func NewTestStatsd() *Statsd { @@ -137,15 +139,30 @@ func BenchmarkUDP(b *testing.B) { if err != nil { panic(err) } - for i := 0; i < 250000; i++ { - fmt.Fprintf(conn, testMsg) + + var wg sync.WaitGroup + for i := 1; i <= producerThreads; i++ { + wg.Add(1) + go sendRequests(conn, &wg) } + wg.Wait() + // wait for 250,000 metrics to get added to accumulator - time.Sleep(time.Millisecond) + for len(listener.in) > 0 { + fmt.Printf("Left in buffer: %v \n", len(listener.in)) + time.Sleep(time.Millisecond) + } listener.Stop() } } +func sendRequests(conn net.Conn, wg *sync.WaitGroup) { + defer wg.Done() + for i := 0; i < 25000; i++ { + fmt.Fprintf(conn, testMsg) + } +} + // benchmark how long it takes to accept & process 100,000 metrics: func BenchmarkTCP(b *testing.B) { listener := Statsd{ @@ -169,11 +186,16 @@ func BenchmarkTCP(b *testing.B) { if err != nil { panic(err) } - for i := 0; i < 250000; i++ { - fmt.Fprintf(conn, testMsg) + var wg sync.WaitGroup + for i := 1; i <= producerThreads; i++ { + wg.Add(1) + go sendRequests(conn, &wg) } + wg.Wait() // wait for 250,000 metrics to get added to accumulator - time.Sleep(time.Millisecond) + for len(listener.in) > 0 { + time.Sleep(time.Millisecond) + } listener.Stop() } } @@ -1678,3 +1700,48 @@ func TestTCP(t *testing.T) { testutil.IgnoreTime(), ) } + +func TestUdp(t *testing.T) { + statsd := Statsd{ + Log: testutil.Logger{}, + Protocol: "udp", + ServiceAddress: "localhost:8125", + AllowedPendingMessages: 250000, + } + var acc testutil.Accumulator + require.NoError(t, statsd.Start(&acc)) + defer statsd.Stop() + + conn, err := net.Dial("udp", "127.0.0.1:8125") + _, err = conn.Write([]byte("cpu.time_idle:42|c\n")) + require.NoError(t, err) + err = conn.Close() + require.NoError(t, err) + + for { + err = statsd.Gather(&acc) + require.NoError(t, err) + + if len(acc.Metrics) > 0 { + break + } + } + + testutil.RequireMetricsEqual(t, + []telegraf.Metric{ + testutil.MustMetric( + "cpu_time_idle", + map[string]string{ + "metric_type": "counter", + }, + map[string]interface{}{ + "value": 42, + }, + time.Now(), + telegraf.Counter, + ), + }, + acc.GetTelegrafMetrics(), + testutil.IgnoreTime(), + ) +}