From 59acbd4f1305909ac773ab8b4b621413907cb8bc Mon Sep 17 00:00:00 2001 From: William Austin Date: Thu, 30 Apr 2020 13:21:34 -0700 Subject: [PATCH] Add ContentEncoder to socket_writer for datagram sockets (#7417) --- plugins/outputs/socket_writer/README.md | 5 +++ .../outputs/socket_writer/socket_writer.go | 20 ++++++++++++ .../socket_writer/socket_writer_test.go | 32 +++++++++++++------ 3 files changed, 48 insertions(+), 9 deletions(-) diff --git a/plugins/outputs/socket_writer/README.md b/plugins/outputs/socket_writer/README.md index 149cda2a6..5dc9d0246 100644 --- a/plugins/outputs/socket_writer/README.md +++ b/plugins/outputs/socket_writer/README.md @@ -32,6 +32,11 @@ It can output data in any of the [supported output formats](https://github.com/i ## Defaults to the OS configuration. # keep_alive_period = "5m" + ## Content encoding for message payloads, can be set to "gzip" or to + ## "identity" to apply no encoding. + ## + # content_encoding = "identity" + ## Data format to generate. ## Each data format has its own unique set of configuration options, read ## more about them here: diff --git a/plugins/outputs/socket_writer/socket_writer.go b/plugins/outputs/socket_writer/socket_writer.go index 833122dfc..eb286d919 100644 --- a/plugins/outputs/socket_writer/socket_writer.go +++ b/plugins/outputs/socket_writer/socket_writer.go @@ -15,12 +15,15 @@ import ( ) type SocketWriter struct { + ContentEncoding string `toml:"content_encoding"` Address string KeepAlivePeriod *internal.Duration tlsint.ClientConfig serializers.Serializer + encoder internal.ContentEncoder + net.Conn } @@ -55,6 +58,11 @@ func (sw *SocketWriter) SampleConfig() string { ## Defaults to the OS configuration. # keep_alive_period = "5m" + ## Content encoding for packet-based connections (i.e. UDP, unixgram). + ## Can be set to "gzip" or to "identity" to apply no encoding. + ## + # content_encoding = "identity" + ## Data format to generate. ## Each data format has its own unique set of configuration options, read ## more about them here: @@ -91,6 +99,11 @@ func (sw *SocketWriter) Connect() error { if err := sw.setKeepAlive(c); err != nil { log.Printf("unable to configure keep alive (%s): %s", sw.Address, err) } + //set encoder + sw.encoder, err = internal.NewContentEncoder(sw.ContentEncoding) + if err != nil { + return err + } sw.Conn = c return nil @@ -130,6 +143,13 @@ func (sw *SocketWriter) Write(metrics []telegraf.Metric) error { log.Printf("D! [outputs.socket_writer] Could not serialize metric: %v", err) continue } + + bs, err = sw.encoder.Encode(bs) + if err != nil { + log.Printf("D! [outputs.socket_writer] Could not encode metric: %v", err) + continue + } + if _, err := sw.Conn.Write(bs); err != nil { //TODO log & keep going with remaining strings if err, ok := err.(net.Error); !ok || !err.Temporary() { diff --git a/plugins/outputs/socket_writer/socket_writer_test.go b/plugins/outputs/socket_writer/socket_writer_test.go index f7eb159ea..14b25e6c5 100644 --- a/plugins/outputs/socket_writer/socket_writer_test.go +++ b/plugins/outputs/socket_writer/socket_writer_test.go @@ -2,7 +2,6 @@ package socket_writer import ( "bufio" - "bytes" "io/ioutil" "net" "os" @@ -88,8 +87,10 @@ func testSocketWriter_stream(t *testing.T, sw *SocketWriter, lconn net.Conn) { metrics := []telegraf.Metric{} metrics = append(metrics, testutil.TestMetric(1, "test")) mbs1out, _ := sw.Serialize(metrics[0]) + mbs1out, _ = sw.encoder.Encode(mbs1out) metrics = append(metrics, testutil.TestMetric(2, "test")) mbs2out, _ := sw.Serialize(metrics[1]) + mbs2out, _ = sw.encoder.Encode(mbs2out) err := sw.Write(metrics) require.NoError(t, err) @@ -108,8 +109,12 @@ func testSocketWriter_packet(t *testing.T, sw *SocketWriter, lconn net.PacketCon metrics := []telegraf.Metric{} metrics = append(metrics, testutil.TestMetric(1, "test")) mbs1out, _ := sw.Serialize(metrics[0]) + mbs1out, _ = sw.encoder.Encode(mbs1out) + mbs1str := string(mbs1out) metrics = append(metrics, testutil.TestMetric(2, "test")) mbs2out, _ := sw.Serialize(metrics[1]) + mbs2out, _ = sw.encoder.Encode(mbs2out) + mbs2str := string(mbs2out) err := sw.Write(metrics) require.NoError(t, err) @@ -119,17 +124,12 @@ func testSocketWriter_packet(t *testing.T, sw *SocketWriter, lconn net.PacketCon for len(mstrins) < 2 { n, _, err := lconn.ReadFrom(buf) require.NoError(t, err) - for _, bs := range bytes.Split(buf[:n], []byte{'\n'}) { - if len(bs) == 0 { - continue - } - mstrins = append(mstrins, string(bs)+"\n") - } + mstrins = append(mstrins, string(buf[:n])) } require.Len(t, mstrins, 2) - assert.Equal(t, string(mbs1out), mstrins[0]) - assert.Equal(t, string(mbs2out), mstrins[1]) + assert.Equal(t, mbs1str, mstrins[0]) + assert.Equal(t, mbs2str, mstrins[1]) } func TestSocketWriter_Write_err(t *testing.T) { @@ -195,3 +195,17 @@ func TestSocketWriter_Write_reconnect(t *testing.T) { require.NoError(t, err) assert.Equal(t, string(mbsout), string(buf[:n])) } + +func TestSocketWriter_udp_gzip(t *testing.T) { + listener, err := net.ListenPacket("udp", "127.0.0.1:0") + require.NoError(t, err) + + sw := newSocketWriter() + sw.Address = "udp://" + listener.LocalAddr().String() + sw.ContentEncoding = "gzip" + + err = sw.Connect() + require.NoError(t, err) + + testSocketWriter_packet(t, sw, listener) +}