telegraf/plugins/inputs/socket_listener/socket_listener_test.go

266 lines
6.5 KiB
Go

package socket_listener
import (
"bytes"
"crypto/tls"
"io"
"io/ioutil"
"log"
"net"
"os"
"path/filepath"
"testing"
"time"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/testutil"
"github.com/influxdata/wlog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var pki = testutil.NewPKI("../../../testutil/pki")
// testEmptyLog is a helper function to ensure no data is written to log.
// Should be called at the start of the test, and returns a function which should run at the end.
func testEmptyLog(t *testing.T) func() {
buf := bytes.NewBuffer(nil)
log.SetOutput(wlog.NewWriter(buf))
level := wlog.WARN
wlog.SetLevel(level)
return func() {
log.SetOutput(os.Stderr)
for {
line, err := buf.ReadBytes('\n')
if err != nil {
assert.Equal(t, io.EOF, err)
break
}
assert.Empty(t, string(line), "log not empty")
}
}
}
func TestSocketListener_tcp_tls(t *testing.T) {
defer testEmptyLog(t)()
sl := newSocketListener()
sl.Log = testutil.Logger{}
sl.ServiceAddress = "tcp://127.0.0.1:0"
sl.ServerConfig = *pki.TLSServerConfig()
acc := &testutil.Accumulator{}
err := sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()
tlsCfg, err := pki.TLSClientConfig().TLSConfig()
require.NoError(t, err)
secureClient, err := tls.Dial("tcp", sl.Closer.(net.Listener).Addr().String(), tlsCfg)
require.NoError(t, err)
testSocketListener(t, sl, secureClient)
}
func TestSocketListener_unix_tls(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "telegraf")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
sock := filepath.Join(tmpdir, "sl.TestSocketListener_unix_tls.sock")
sl := newSocketListener()
sl.Log = testutil.Logger{}
sl.ServiceAddress = "unix://" + sock
sl.ServerConfig = *pki.TLSServerConfig()
acc := &testutil.Accumulator{}
err = sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()
tlsCfg, err := pki.TLSClientConfig().TLSConfig()
tlsCfg.InsecureSkipVerify = true
require.NoError(t, err)
secureClient, err := tls.Dial("unix", sock, tlsCfg)
require.NoError(t, err)
testSocketListener(t, sl, secureClient)
}
func TestSocketListener_tcp(t *testing.T) {
defer testEmptyLog(t)()
sl := newSocketListener()
sl.Log = testutil.Logger{}
sl.ServiceAddress = "tcp://127.0.0.1:0"
sl.ReadBufferSize = internal.Size{Size: 1024}
acc := &testutil.Accumulator{}
err := sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()
client, err := net.Dial("tcp", sl.Closer.(net.Listener).Addr().String())
require.NoError(t, err)
testSocketListener(t, sl, client)
}
func TestSocketListener_udp(t *testing.T) {
defer testEmptyLog(t)()
sl := newSocketListener()
sl.Log = testutil.Logger{}
sl.ServiceAddress = "udp://127.0.0.1:0"
sl.ReadBufferSize = internal.Size{Size: 1024}
acc := &testutil.Accumulator{}
err := sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()
client, err := net.Dial("udp", sl.Closer.(net.PacketConn).LocalAddr().String())
require.NoError(t, err)
testSocketListener(t, sl, client)
}
func TestSocketListener_unix(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "telegraf")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
sock := filepath.Join(tmpdir, "sl.TestSocketListener_unix.sock")
defer testEmptyLog(t)()
os.Create(sock)
sl := newSocketListener()
sl.Log = testutil.Logger{}
sl.ServiceAddress = "unix://" + sock
sl.ReadBufferSize = internal.Size{Size: 1024}
acc := &testutil.Accumulator{}
err = sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()
client, err := net.Dial("unix", sock)
require.NoError(t, err)
testSocketListener(t, sl, client)
}
func TestSocketListener_unixgram(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "telegraf")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
sock := filepath.Join(tmpdir, "sl.TestSocketListener_unixgram.sock")
defer testEmptyLog(t)()
os.Create(sock)
sl := newSocketListener()
sl.Log = testutil.Logger{}
sl.ServiceAddress = "unixgram://" + sock
sl.ReadBufferSize = internal.Size{Size: 1024}
acc := &testutil.Accumulator{}
err = sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()
client, err := net.Dial("unixgram", sock)
require.NoError(t, err)
testSocketListener(t, sl, client)
}
func TestSocketListenerDecode_tcp(t *testing.T) {
defer testEmptyLog(t)()
sl := newSocketListener()
sl.Log = testutil.Logger{}
sl.ServiceAddress = "tcp://127.0.0.1:0"
sl.ReadBufferSize = internal.Size{Size: 1024}
sl.ContentEncoding = "gzip"
acc := &testutil.Accumulator{}
err := sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()
client, err := net.Dial("tcp", sl.Closer.(net.Listener).Addr().String())
require.NoError(t, err)
testSocketListener(t, sl, client)
}
func TestSocketListenerDecode_udp(t *testing.T) {
defer testEmptyLog(t)()
sl := newSocketListener()
sl.Log = testutil.Logger{}
sl.ServiceAddress = "udp://127.0.0.1:0"
sl.ReadBufferSize = internal.Size{Size: 1024}
sl.ContentEncoding = "gzip"
acc := &testutil.Accumulator{}
err := sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()
client, err := net.Dial("udp", sl.Closer.(net.PacketConn).LocalAddr().String())
require.NoError(t, err)
testSocketListener(t, sl, client)
}
func testSocketListener(t *testing.T, sl *SocketListener, client net.Conn) {
mstr12 := []byte("test,foo=bar v=1i 123456789\ntest,foo=baz v=2i 123456790\n")
mstr3 := []byte("test,foo=zab v=3i 123456791\n")
if sl.ContentEncoding == "gzip" {
encoder, err := internal.NewContentEncoder(sl.ContentEncoding)
require.NoError(t, err)
mstr12, err = encoder.Encode(mstr12)
require.NoError(t, err)
encoder, err = internal.NewContentEncoder(sl.ContentEncoding)
require.NoError(t, err)
mstr3, err = encoder.Encode(mstr3)
require.NoError(t, err)
}
client.Write(mstr12)
client.Write(mstr3)
acc := sl.Accumulator.(*testutil.Accumulator)
acc.Wait(3)
acc.Lock()
m1 := acc.Metrics[0]
m2 := acc.Metrics[1]
m3 := acc.Metrics[2]
acc.Unlock()
assert.Equal(t, "test", m1.Measurement)
assert.Equal(t, map[string]string{"foo": "bar"}, m1.Tags)
assert.Equal(t, map[string]interface{}{"v": int64(1)}, m1.Fields)
assert.True(t, time.Unix(0, 123456789).Equal(m1.Time))
assert.Equal(t, "test", m2.Measurement)
assert.Equal(t, map[string]string{"foo": "baz"}, m2.Tags)
assert.Equal(t, map[string]interface{}{"v": int64(2)}, m2.Fields)
assert.True(t, time.Unix(0, 123456790).Equal(m2.Time))
assert.Equal(t, "test", m3.Measurement)
assert.Equal(t, map[string]string{"foo": "zab"}, m3.Tags)
assert.Equal(t, map[string]interface{}{"v": int64(3)}, m3.Fields)
assert.True(t, time.Unix(0, 123456791).Equal(m3.Time))
}