Fix gzip support in socket_listener with tcp sockets (#7446)

This commit is contained in:
Daniel Nelson 2020-05-04 16:26:05 -07:00 committed by GitHub
parent f351e6a68f
commit d16485e1a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 113 additions and 24 deletions

View File

@ -1,18 +1,78 @@
package internal package internal
import ( import (
"bufio"
"bytes" "bytes"
"compress/gzip" "compress/gzip"
"errors" "errors"
"io" "io"
) )
// NewStreamContentDecoder returns a reader that will decode the stream
// according to the encoding type.
func NewStreamContentDecoder(encoding string, r io.Reader) (io.Reader, error) {
switch encoding {
case "gzip":
return NewGzipReader(r)
case "identity", "":
return r, nil
default:
return nil, errors.New("invalid value for content_encoding")
}
}
// GzipReader is similar to gzip.Reader but reads only a single gzip stream per read.
type GzipReader struct {
r io.Reader
z *gzip.Reader
endOfStream bool
}
func NewGzipReader(r io.Reader) (io.Reader, error) {
// We need a read that implements ByteReader in order to line up the next
// stream.
br := bufio.NewReader(r)
// Reads the first gzip stream header.
z, err := gzip.NewReader(br)
if err != nil {
return nil, err
}
// Prevent future calls to Read from reading the following gzip header.
z.Multistream(false)
return &GzipReader{r: br, z: z}, nil
}
func (r *GzipReader) Read(b []byte) (int, error) {
if r.endOfStream {
// Reads the next gzip header and prepares for the next stream.
err := r.z.Reset(r.r)
if err != nil {
return 0, err
}
r.z.Multistream(false)
r.endOfStream = false
}
n, err := r.z.Read(b)
// Since multistream is disabled, io.EOF indicates the end of the gzip
// sequence. On the next read we must read the next gzip header.
if err == io.EOF {
r.endOfStream = true
return n, nil
}
return n, err
}
// NewContentEncoder returns a ContentEncoder for the encoding type. // NewContentEncoder returns a ContentEncoder for the encoding type.
func NewContentEncoder(encoding string) (ContentEncoder, error) { func NewContentEncoder(encoding string) (ContentEncoder, error) {
switch encoding { switch encoding {
case "gzip": case "gzip":
return NewGzipEncoder() return NewGzipEncoder()
case "identity", "": case "identity", "":
return NewIdentityEncoder(), nil return NewIdentityEncoder(), nil
default: default:

View File

@ -1,6 +1,8 @@
package internal package internal
import ( import (
"bytes"
"io/ioutil"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -56,3 +58,37 @@ func TestIdentityEncodeDecode(t *testing.T) {
require.Equal(t, "howdy", string(actual)) require.Equal(t, "howdy", string(actual))
} }
func TestStreamIdentityDecode(t *testing.T) {
var r bytes.Buffer
n, err := r.Write([]byte("howdy"))
require.NoError(t, err)
require.Equal(t, 5, n)
dec, err := NewStreamContentDecoder("identity", &r)
require.NoError(t, err)
data, err := ioutil.ReadAll(dec)
require.NoError(t, err)
require.Equal(t, []byte("howdy"), data)
}
func TestStreamGzipDecode(t *testing.T) {
enc, err := NewGzipEncoder()
require.NoError(t, err)
written, err := enc.Encode([]byte("howdy"))
require.NoError(t, err)
w := bytes.NewBuffer(written)
dec, err := NewStreamContentDecoder("gzip", w)
require.NoError(t, err)
b := make([]byte, 10)
n, err := dec.Read(b)
require.NoError(t, err)
require.Equal(t, 5, n)
require.Equal(t, []byte("howdy"), b[:n])
}

View File

@ -111,7 +111,12 @@ func (ssl *streamSocketListener) read(c net.Conn) {
defer ssl.removeConnection(c) defer ssl.removeConnection(c)
defer c.Close() defer c.Close()
scnr := bufio.NewScanner(c) decoder, err := internal.NewStreamContentDecoder(ssl.ContentEncoding, c)
if err != nil {
ssl.Log.Error("Read error: %v", err)
}
scnr := bufio.NewScanner(decoder)
for { for {
if ssl.ReadTimeout != nil && ssl.ReadTimeout.Duration > 0 { if ssl.ReadTimeout != nil && ssl.ReadTimeout.Duration > 0 {
c.SetReadDeadline(time.Now().Add(ssl.ReadTimeout.Duration)) c.SetReadDeadline(time.Now().Add(ssl.ReadTimeout.Duration))
@ -120,11 +125,7 @@ func (ssl *streamSocketListener) read(c net.Conn) {
break break
} }
body, err := ssl.decoder.Decode(scnr.Bytes()) body := scnr.Bytes()
if err != nil {
ssl.Log.Errorf("Unable to decode incoming line: %s", err.Error())
continue
}
metrics, err := ssl.Parse(body) metrics, err := ssl.Parse(body)
if err != nil { if err != nil {
@ -149,6 +150,7 @@ func (ssl *streamSocketListener) read(c net.Conn) {
type packetSocketListener struct { type packetSocketListener struct {
net.PacketConn net.PacketConn
*SocketListener *SocketListener
decoder internal.ContentDecoder
} }
func (psl *packetSocketListener) listen() { func (psl *packetSocketListener) listen() {
@ -196,7 +198,6 @@ type SocketListener struct {
parsers.Parser parsers.Parser
telegraf.Accumulator telegraf.Accumulator
io.Closer io.Closer
decoder internal.ContentDecoder
} }
func (sl *SocketListener) Description() string { func (sl *SocketListener) Description() string {
@ -283,12 +284,6 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
protocol := spl[0] protocol := spl[0]
addr := spl[1] addr := spl[1]
var err error
sl.decoder, err = internal.NewContentDecoder(sl.ContentEncoding)
if err != nil {
return err
}
if protocol == "unix" || protocol == "unixpacket" || protocol == "unixgram" { if protocol == "unix" || protocol == "unixpacket" || protocol == "unixgram" {
// no good way of testing for "file does not exist". // no good way of testing for "file does not exist".
// Instead just ignore error and blow up when we try to listen, which will // Instead just ignore error and blow up when we try to listen, which will
@ -298,16 +293,12 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
switch protocol { switch protocol {
case "tcp", "tcp4", "tcp6", "unix", "unixpacket": case "tcp", "tcp4", "tcp6", "unix", "unixpacket":
var (
err error
l net.Listener
)
tlsCfg, err := sl.ServerConfig.TLSConfig() tlsCfg, err := sl.ServerConfig.TLSConfig()
if err != nil { if err != nil {
return err return err
} }
var l net.Listener
if tlsCfg == nil { if tlsCfg == nil {
l, err = net.Listen(protocol, addr) l, err = net.Listen(protocol, addr)
} else { } else {
@ -344,6 +335,11 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
ssl.listen() ssl.listen()
}() }()
case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram": case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram":
decoder, err := internal.NewContentDecoder(sl.ContentEncoding)
if err != nil {
return err
}
pc, err := udpListen(protocol, addr) pc, err := udpListen(protocol, addr)
if err != nil { if err != nil {
return err return err
@ -373,6 +369,7 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
psl := &packetSocketListener{ psl := &packetSocketListener{
PacketConn: pc, PacketConn: pc,
SocketListener: sl, SocketListener: sl,
decoder: decoder,
} }
sl.Closer = psl sl.Closer = psl

View File

@ -222,7 +222,7 @@ func TestSocketListenerDecode_udp(t *testing.T) {
func testSocketListener(t *testing.T, sl *SocketListener, client net.Conn) { func testSocketListener(t *testing.T, sl *SocketListener, client net.Conn) {
mstr12 := []byte("test,foo=bar v=1i 123456789\ntest,foo=baz v=2i 123456790\n") mstr12 := []byte("test,foo=bar v=1i 123456789\ntest,foo=baz v=2i 123456790\n")
mstr3 := []byte("test,foo=zab v=3i 123456791") mstr3 := []byte("test,foo=zab v=3i 123456791\n")
if sl.ContentEncoding == "gzip" { if sl.ContentEncoding == "gzip" {
encoder, err := internal.NewContentEncoder(sl.ContentEncoding) encoder, err := internal.NewContentEncoder(sl.ContentEncoding)
@ -238,10 +238,6 @@ func testSocketListener(t *testing.T, sl *SocketListener, client net.Conn) {
client.Write(mstr12) client.Write(mstr12)
client.Write(mstr3) client.Write(mstr3)
if client.LocalAddr().Network() != "udp" {
// stream connection. needs trailing newline to terminate mstr3
client.Write([]byte{'\n'})
}
acc := sl.Accumulator.(*testutil.Accumulator) acc := sl.Accumulator.(*testutil.Accumulator)