Add decoding and tests to socket_listener (#6660)
This commit is contained in:
parent
a000ad3553
commit
32d1e71a7e
|
@ -66,6 +66,10 @@ This is a sample configuration for the plugin.
|
|||
## more about them here:
|
||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||
# data_format = "influx"
|
||||
|
||||
## Content encoding for message payloads, can be set to "gzip" to or
|
||||
## "identity" to apply no encoding.
|
||||
# content_encoding = "identity"
|
||||
```
|
||||
|
||||
## A Note on UDP OS Buffer Sizes
|
||||
|
@ -84,6 +88,7 @@ at least 8MB before trying to run large amounts of UDP traffic to your instance.
|
|||
8MB is just a recommendation, and can be adjusted higher.
|
||||
|
||||
### Linux
|
||||
|
||||
Check the current UDP/IP receive buffer limit & default by typing the following
|
||||
commands:
|
||||
|
||||
|
|
|
@ -119,7 +119,14 @@ func (ssl *streamSocketListener) read(c net.Conn) {
|
|||
if !scnr.Scan() {
|
||||
break
|
||||
}
|
||||
metrics, err := ssl.Parse(scnr.Bytes())
|
||||
|
||||
body, err := ssl.decoder.Decode(scnr.Bytes())
|
||||
if err != nil {
|
||||
ssl.Log.Errorf("Unable to decode incoming line: %s", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
metrics, err := ssl.Parse(body)
|
||||
if err != nil {
|
||||
ssl.Log.Errorf("Unable to parse incoming line: %s", err.Error())
|
||||
// TODO rate limit
|
||||
|
@ -155,7 +162,12 @@ func (psl *packetSocketListener) listen() {
|
|||
break
|
||||
}
|
||||
|
||||
metrics, err := psl.Parse(buf[:n])
|
||||
body, err := psl.decoder.Decode(buf[:n])
|
||||
if err != nil {
|
||||
psl.Log.Errorf("Unable to decode incoming packet: %s", err.Error())
|
||||
}
|
||||
|
||||
metrics, err := psl.Parse(body)
|
||||
if err != nil {
|
||||
psl.Log.Errorf("Unable to parse incoming packet: %s", err.Error())
|
||||
// TODO rate limit
|
||||
|
@ -174,6 +186,7 @@ type SocketListener struct {
|
|||
ReadTimeout *internal.Duration `toml:"read_timeout"`
|
||||
KeepAlivePeriod *internal.Duration `toml:"keep_alive_period"`
|
||||
SocketMode string `toml:"socket_mode"`
|
||||
ContentEncoding string `toml:"content_encoding"`
|
||||
tlsint.ServerConfig
|
||||
|
||||
wg sync.WaitGroup
|
||||
|
@ -183,6 +196,7 @@ type SocketListener struct {
|
|||
parsers.Parser
|
||||
telegraf.Accumulator
|
||||
io.Closer
|
||||
decoder internal.ContentDecoder
|
||||
}
|
||||
|
||||
func (sl *SocketListener) Description() string {
|
||||
|
@ -244,6 +258,10 @@ func (sl *SocketListener) SampleConfig() string {
|
|||
## more about them here:
|
||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||
# data_format = "influx"
|
||||
|
||||
## Content encoding for message payloads, can be set to "gzip" to or
|
||||
## "identity" to apply no encoding.
|
||||
# content_encoding = "identity"
|
||||
`
|
||||
}
|
||||
|
||||
|
@ -265,6 +283,12 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
|
|||
protocol := spl[0]
|
||||
addr := spl[1]
|
||||
|
||||
var err error
|
||||
sl.decoder, err = internal.NewContentDecoder(sl.ContentEncoding)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if protocol == "unix" || protocol == "unixpacket" || protocol == "unixgram" {
|
||||
// no good way of testing for "file does not exist".
|
||||
// Instead just ignore error and blow up when we try to listen, which will
|
||||
|
|
|
@ -180,12 +180,65 @@ func TestSocketListener_unixgram(t *testing.T) {
|
|||
testSocketListener(t, sl, client)
|
||||
}
|
||||
|
||||
func TestSocketListenerDecode_tcp(t *testing.T) {
|
||||
defer testEmptyLog(t)()
|
||||
|
||||
sl := newSocketListener()
|
||||
sl.Log = testutil.Logger{}
|
||||
sl.ServiceAddress = "tcp://127.0.0.1:0"
|
||||
sl.ReadBufferSize = internal.Size{Size: 1024}
|
||||
sl.ContentEncoding = "gzip"
|
||||
|
||||
acc := &testutil.Accumulator{}
|
||||
err := sl.Start(acc)
|
||||
require.NoError(t, err)
|
||||
defer sl.Stop()
|
||||
|
||||
client, err := net.Dial("tcp", sl.Closer.(net.Listener).Addr().String())
|
||||
require.NoError(t, err)
|
||||
|
||||
testSocketListener(t, sl, client)
|
||||
}
|
||||
|
||||
func TestSocketListenerDecode_udp(t *testing.T) {
|
||||
defer testEmptyLog(t)()
|
||||
|
||||
sl := newSocketListener()
|
||||
sl.Log = testutil.Logger{}
|
||||
sl.ServiceAddress = "udp://127.0.0.1:0"
|
||||
sl.ReadBufferSize = internal.Size{Size: 1024}
|
||||
sl.ContentEncoding = "gzip"
|
||||
|
||||
acc := &testutil.Accumulator{}
|
||||
err := sl.Start(acc)
|
||||
require.NoError(t, err)
|
||||
defer sl.Stop()
|
||||
|
||||
client, err := net.Dial("udp", sl.Closer.(net.PacketConn).LocalAddr().String())
|
||||
require.NoError(t, err)
|
||||
|
||||
testSocketListener(t, sl, client)
|
||||
}
|
||||
|
||||
func testSocketListener(t *testing.T, sl *SocketListener, client net.Conn) {
|
||||
mstr12 := "test,foo=bar v=1i 123456789\ntest,foo=baz v=2i 123456790\n"
|
||||
mstr3 := "test,foo=zab v=3i 123456791"
|
||||
client.Write([]byte(mstr12))
|
||||
client.Write([]byte(mstr3))
|
||||
if _, ok := client.(net.Conn); ok {
|
||||
mstr12 := []byte("test,foo=bar v=1i 123456789\ntest,foo=baz v=2i 123456790\n")
|
||||
mstr3 := []byte("test,foo=zab v=3i 123456791")
|
||||
|
||||
if sl.ContentEncoding == "gzip" {
|
||||
encoder, err := internal.NewContentEncoder(sl.ContentEncoding)
|
||||
require.NoError(t, err)
|
||||
mstr12, err = encoder.Encode(mstr12)
|
||||
require.NoError(t, err)
|
||||
|
||||
encoder, err = internal.NewContentEncoder(sl.ContentEncoding)
|
||||
require.NoError(t, err)
|
||||
mstr3, err = encoder.Encode(mstr3)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
client.Write(mstr12)
|
||||
client.Write(mstr3)
|
||||
if client.LocalAddr().Network() != "udp" {
|
||||
// stream connection. needs trailing newline to terminate mstr3
|
||||
client.Write([]byte{'\n'})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue