Add ContentEncoder to socket_writer for datagram sockets (#7417)

This commit is contained in:
William Austin 2020-04-30 13:21:34 -07:00 committed by GitHub
parent 07c6b78c8f
commit 59acbd4f13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 48 additions and 9 deletions

View File

@ -32,6 +32,11 @@ It can output data in any of the [supported output formats](https://github.com/i
## Defaults to the OS configuration. ## Defaults to the OS configuration.
# keep_alive_period = "5m" # keep_alive_period = "5m"
## Content encoding for message payloads, can be set to "gzip" or to
## "identity" to apply no encoding.
##
# content_encoding = "identity"
## Data format to generate. ## Data format to generate.
## Each data format has its own unique set of configuration options, read ## Each data format has its own unique set of configuration options, read
## more about them here: ## more about them here:

View File

@ -15,12 +15,15 @@ import (
) )
type SocketWriter struct { type SocketWriter struct {
ContentEncoding string `toml:"content_encoding"`
Address string Address string
KeepAlivePeriod *internal.Duration KeepAlivePeriod *internal.Duration
tlsint.ClientConfig tlsint.ClientConfig
serializers.Serializer serializers.Serializer
encoder internal.ContentEncoder
net.Conn net.Conn
} }
@ -55,6 +58,11 @@ func (sw *SocketWriter) SampleConfig() string {
## Defaults to the OS configuration. ## Defaults to the OS configuration.
# keep_alive_period = "5m" # keep_alive_period = "5m"
## Content encoding for packet-based connections (i.e. UDP, unixgram).
## Can be set to "gzip" or to "identity" to apply no encoding.
##
# content_encoding = "identity"
## Data format to generate. ## Data format to generate.
## Each data format has its own unique set of configuration options, read ## Each data format has its own unique set of configuration options, read
## more about them here: ## more about them here:
@ -91,6 +99,11 @@ func (sw *SocketWriter) Connect() error {
if err := sw.setKeepAlive(c); err != nil { if err := sw.setKeepAlive(c); err != nil {
log.Printf("unable to configure keep alive (%s): %s", sw.Address, err) log.Printf("unable to configure keep alive (%s): %s", sw.Address, err)
} }
//set encoder
sw.encoder, err = internal.NewContentEncoder(sw.ContentEncoding)
if err != nil {
return err
}
sw.Conn = c sw.Conn = c
return nil return nil
@ -130,6 +143,13 @@ func (sw *SocketWriter) Write(metrics []telegraf.Metric) error {
log.Printf("D! [outputs.socket_writer] Could not serialize metric: %v", err) log.Printf("D! [outputs.socket_writer] Could not serialize metric: %v", err)
continue continue
} }
bs, err = sw.encoder.Encode(bs)
if err != nil {
log.Printf("D! [outputs.socket_writer] Could not encode metric: %v", err)
continue
}
if _, err := sw.Conn.Write(bs); err != nil { if _, err := sw.Conn.Write(bs); err != nil {
//TODO log & keep going with remaining strings //TODO log & keep going with remaining strings
if err, ok := err.(net.Error); !ok || !err.Temporary() { if err, ok := err.(net.Error); !ok || !err.Temporary() {

View File

@ -2,7 +2,6 @@ package socket_writer
import ( import (
"bufio" "bufio"
"bytes"
"io/ioutil" "io/ioutil"
"net" "net"
"os" "os"
@ -88,8 +87,10 @@ func testSocketWriter_stream(t *testing.T, sw *SocketWriter, lconn net.Conn) {
metrics := []telegraf.Metric{} metrics := []telegraf.Metric{}
metrics = append(metrics, testutil.TestMetric(1, "test")) metrics = append(metrics, testutil.TestMetric(1, "test"))
mbs1out, _ := sw.Serialize(metrics[0]) mbs1out, _ := sw.Serialize(metrics[0])
mbs1out, _ = sw.encoder.Encode(mbs1out)
metrics = append(metrics, testutil.TestMetric(2, "test")) metrics = append(metrics, testutil.TestMetric(2, "test"))
mbs2out, _ := sw.Serialize(metrics[1]) mbs2out, _ := sw.Serialize(metrics[1])
mbs2out, _ = sw.encoder.Encode(mbs2out)
err := sw.Write(metrics) err := sw.Write(metrics)
require.NoError(t, err) require.NoError(t, err)
@ -108,8 +109,12 @@ func testSocketWriter_packet(t *testing.T, sw *SocketWriter, lconn net.PacketCon
metrics := []telegraf.Metric{} metrics := []telegraf.Metric{}
metrics = append(metrics, testutil.TestMetric(1, "test")) metrics = append(metrics, testutil.TestMetric(1, "test"))
mbs1out, _ := sw.Serialize(metrics[0]) mbs1out, _ := sw.Serialize(metrics[0])
mbs1out, _ = sw.encoder.Encode(mbs1out)
mbs1str := string(mbs1out)
metrics = append(metrics, testutil.TestMetric(2, "test")) metrics = append(metrics, testutil.TestMetric(2, "test"))
mbs2out, _ := sw.Serialize(metrics[1]) mbs2out, _ := sw.Serialize(metrics[1])
mbs2out, _ = sw.encoder.Encode(mbs2out)
mbs2str := string(mbs2out)
err := sw.Write(metrics) err := sw.Write(metrics)
require.NoError(t, err) require.NoError(t, err)
@ -119,17 +124,12 @@ func testSocketWriter_packet(t *testing.T, sw *SocketWriter, lconn net.PacketCon
for len(mstrins) < 2 { for len(mstrins) < 2 {
n, _, err := lconn.ReadFrom(buf) n, _, err := lconn.ReadFrom(buf)
require.NoError(t, err) require.NoError(t, err)
for _, bs := range bytes.Split(buf[:n], []byte{'\n'}) { mstrins = append(mstrins, string(buf[:n]))
if len(bs) == 0 {
continue
}
mstrins = append(mstrins, string(bs)+"\n")
}
} }
require.Len(t, mstrins, 2) require.Len(t, mstrins, 2)
assert.Equal(t, string(mbs1out), mstrins[0]) assert.Equal(t, mbs1str, mstrins[0])
assert.Equal(t, string(mbs2out), mstrins[1]) assert.Equal(t, mbs2str, mstrins[1])
} }
func TestSocketWriter_Write_err(t *testing.T) { func TestSocketWriter_Write_err(t *testing.T) {
@ -195,3 +195,17 @@ func TestSocketWriter_Write_reconnect(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, string(mbsout), string(buf[:n])) assert.Equal(t, string(mbsout), string(buf[:n]))
} }
func TestSocketWriter_udp_gzip(t *testing.T) {
listener, err := net.ListenPacket("udp", "127.0.0.1:0")
require.NoError(t, err)
sw := newSocketWriter()
sw.Address = "udp://" + listener.LocalAddr().String()
sw.ContentEncoding = "gzip"
err = sw.Connect()
require.NoError(t, err)
testSocketWriter_packet(t, sw, listener)
}