From 32d1e71a7ecd8f1e92c97450909f68e1428b76aa Mon Sep 17 00:00:00 2001 From: Nick Neisen Date: Thu, 21 Nov 2019 19:11:17 -0700 Subject: [PATCH] Add decoding and tests to socket_listener (#6660) --- plugins/inputs/socket_listener/README.md | 5 ++ .../inputs/socket_listener/socket_listener.go | 28 ++++++++- .../socket_listener/socket_listener_test.go | 63 +++++++++++++++++-- 3 files changed, 89 insertions(+), 7 deletions(-) diff --git a/plugins/inputs/socket_listener/README.md b/plugins/inputs/socket_listener/README.md index 1740d8bcf..ec1aa0bef 100644 --- a/plugins/inputs/socket_listener/README.md +++ b/plugins/inputs/socket_listener/README.md @@ -66,6 +66,10 @@ This is a sample configuration for the plugin. ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md # data_format = "influx" + + ## Content encoding for message payloads, can be set to "gzip" to or + ## "identity" to apply no encoding. + # content_encoding = "identity" ``` ## A Note on UDP OS Buffer Sizes @@ -84,6 +88,7 @@ at least 8MB before trying to run large amounts of UDP traffic to your instance. 8MB is just a recommendation, and can be adjusted higher. ### Linux + Check the current UDP/IP receive buffer limit & default by typing the following commands: diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go index b5b4d0405..b1e933851 100644 --- a/plugins/inputs/socket_listener/socket_listener.go +++ b/plugins/inputs/socket_listener/socket_listener.go @@ -119,7 +119,14 @@ func (ssl *streamSocketListener) read(c net.Conn) { if !scnr.Scan() { break } - metrics, err := ssl.Parse(scnr.Bytes()) + + body, err := ssl.decoder.Decode(scnr.Bytes()) + if err != nil { + ssl.Log.Errorf("Unable to decode incoming line: %s", err.Error()) + continue + } + + metrics, err := ssl.Parse(body) if err != nil { ssl.Log.Errorf("Unable to parse incoming line: %s", err.Error()) // TODO rate limit @@ -155,7 +162,12 @@ func (psl *packetSocketListener) listen() { break } - metrics, err := psl.Parse(buf[:n]) + body, err := psl.decoder.Decode(buf[:n]) + if err != nil { + psl.Log.Errorf("Unable to decode incoming packet: %s", err.Error()) + } + + metrics, err := psl.Parse(body) if err != nil { psl.Log.Errorf("Unable to parse incoming packet: %s", err.Error()) // TODO rate limit @@ -174,6 +186,7 @@ type SocketListener struct { ReadTimeout *internal.Duration `toml:"read_timeout"` KeepAlivePeriod *internal.Duration `toml:"keep_alive_period"` SocketMode string `toml:"socket_mode"` + ContentEncoding string `toml:"content_encoding"` tlsint.ServerConfig wg sync.WaitGroup @@ -183,6 +196,7 @@ type SocketListener struct { parsers.Parser telegraf.Accumulator io.Closer + decoder internal.ContentDecoder } func (sl *SocketListener) Description() string { @@ -244,6 +258,10 @@ func (sl *SocketListener) SampleConfig() string { ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md # data_format = "influx" + + ## Content encoding for message payloads, can be set to "gzip" to or + ## "identity" to apply no encoding. + # content_encoding = "identity" ` } @@ -265,6 +283,12 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error { protocol := spl[0] addr := spl[1] + var err error + sl.decoder, err = internal.NewContentDecoder(sl.ContentEncoding) + if err != nil { + return err + } + if protocol == "unix" || protocol == "unixpacket" || protocol == "unixgram" { // no good way of testing for "file does not exist". // Instead just ignore error and blow up when we try to listen, which will diff --git a/plugins/inputs/socket_listener/socket_listener_test.go b/plugins/inputs/socket_listener/socket_listener_test.go index 481a0c1a5..c6adf4cde 100644 --- a/plugins/inputs/socket_listener/socket_listener_test.go +++ b/plugins/inputs/socket_listener/socket_listener_test.go @@ -180,12 +180,65 @@ func TestSocketListener_unixgram(t *testing.T) { testSocketListener(t, sl, client) } +func TestSocketListenerDecode_tcp(t *testing.T) { + defer testEmptyLog(t)() + + sl := newSocketListener() + sl.Log = testutil.Logger{} + sl.ServiceAddress = "tcp://127.0.0.1:0" + sl.ReadBufferSize = internal.Size{Size: 1024} + sl.ContentEncoding = "gzip" + + acc := &testutil.Accumulator{} + err := sl.Start(acc) + require.NoError(t, err) + defer sl.Stop() + + client, err := net.Dial("tcp", sl.Closer.(net.Listener).Addr().String()) + require.NoError(t, err) + + testSocketListener(t, sl, client) +} + +func TestSocketListenerDecode_udp(t *testing.T) { + defer testEmptyLog(t)() + + sl := newSocketListener() + sl.Log = testutil.Logger{} + sl.ServiceAddress = "udp://127.0.0.1:0" + sl.ReadBufferSize = internal.Size{Size: 1024} + sl.ContentEncoding = "gzip" + + acc := &testutil.Accumulator{} + err := sl.Start(acc) + require.NoError(t, err) + defer sl.Stop() + + client, err := net.Dial("udp", sl.Closer.(net.PacketConn).LocalAddr().String()) + require.NoError(t, err) + + testSocketListener(t, sl, client) +} + func testSocketListener(t *testing.T, sl *SocketListener, client net.Conn) { - mstr12 := "test,foo=bar v=1i 123456789\ntest,foo=baz v=2i 123456790\n" - mstr3 := "test,foo=zab v=3i 123456791" - client.Write([]byte(mstr12)) - client.Write([]byte(mstr3)) - if _, ok := client.(net.Conn); ok { + mstr12 := []byte("test,foo=bar v=1i 123456789\ntest,foo=baz v=2i 123456790\n") + mstr3 := []byte("test,foo=zab v=3i 123456791") + + if sl.ContentEncoding == "gzip" { + encoder, err := internal.NewContentEncoder(sl.ContentEncoding) + require.NoError(t, err) + mstr12, err = encoder.Encode(mstr12) + require.NoError(t, err) + + encoder, err = internal.NewContentEncoder(sl.ContentEncoding) + require.NoError(t, err) + mstr3, err = encoder.Encode(mstr3) + require.NoError(t, err) + } + + client.Write(mstr12) + client.Write(mstr3) + if client.LocalAddr().Network() != "udp" { // stream connection. needs trailing newline to terminate mstr3 client.Write([]byte{'\n'}) }