diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go index c83f3eb68..d81c45994 100644 --- a/plugins/inputs/socket_listener/socket_listener.go +++ b/plugins/inputs/socket_listener/socket_listener.go @@ -242,14 +242,17 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error { return fmt.Errorf("invalid service address: %s", sl.ServiceAddress) } - if spl[0] == "unix" || spl[0] == "unixpacket" || spl[0] == "unixgram" { + protocol := spl[0] + addr := spl[1] + + if protocol == "unix" || protocol == "unixpacket" || protocol == "unixgram" { // no good way of testing for "file does not exist". // Instead just ignore error and blow up when we try to listen, which will // indicate "address already in use" if file existed and we couldn't remove. - os.Remove(spl[1]) + os.Remove(addr) } - switch spl[0] { + switch protocol { case "tcp", "tcp4", "tcp6", "unix", "unixpacket": var ( err error @@ -262,14 +265,16 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error { } if tlsCfg == nil { - l, err = net.Listen(spl[0], spl[1]) + l, err = net.Listen(protocol, addr) } else { - l, err = tls.Listen(spl[0], spl[1], tlsCfg) + l, err = tls.Listen(protocol, addr, tlsCfg) } if err != nil { return err } + log.Printf("I! [inputs.socket_listener] Listening on %s://%s", protocol, l.Addr()) + ssl := &streamSocketListener{ Listener: l, SocketListener: sl, @@ -279,7 +284,7 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error { sl.Closer = ssl go ssl.listen() case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram": - pc, err := net.ListenPacket(spl[0], spl[1]) + pc, err := net.ListenPacket(protocol, addr) if err != nil { return err } @@ -288,10 +293,12 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error { if srb, ok := pc.(setReadBufferer); ok { srb.SetReadBuffer(int(sl.ReadBufferSize.Size)) } else { - log.Printf("W! Unable to set read buffer on a %s socket", spl[0]) + log.Printf("W! Unable to set read buffer on a %s socket", protocol) } } + log.Printf("I! [inputs.socket_listener] Listening on %s://%s", protocol, pc.LocalAddr()) + psl := &packetSocketListener{ PacketConn: pc, SocketListener: sl, @@ -300,10 +307,10 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error { sl.Closer = psl go psl.listen() default: - return fmt.Errorf("unknown protocol '%s' in '%s'", spl[0], sl.ServiceAddress) + return fmt.Errorf("unknown protocol '%s' in '%s'", protocol, sl.ServiceAddress) } - if spl[0] == "unix" || spl[0] == "unixpacket" || spl[0] == "unixgram" { + if protocol == "unix" || protocol == "unixpacket" || protocol == "unixgram" { sl.Closer = unixCloser{path: spl[1], closer: sl.Closer} } diff --git a/plugins/inputs/socket_listener/socket_listener_test.go b/plugins/inputs/socket_listener/socket_listener_test.go index ae7fef8b9..b4415e092 100644 --- a/plugins/inputs/socket_listener/socket_listener_test.go +++ b/plugins/inputs/socket_listener/socket_listener_test.go @@ -3,6 +3,7 @@ package socket_listener import ( "bytes" "crypto/tls" + "io" "io/ioutil" "log" "net" @@ -13,6 +14,7 @@ import ( "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/testutil" + "github.com/influxdata/wlog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -23,11 +25,22 @@ var pki = testutil.NewPKI("../../../testutil/pki") // Should be called at the start of the test, and returns a function which should run at the end. func testEmptyLog(t *testing.T) func() { buf := bytes.NewBuffer(nil) - log.SetOutput(buf) + log.SetOutput(wlog.NewWriter(buf)) + + level := wlog.WARN + wlog.SetLevel(level) return func() { log.SetOutput(os.Stderr) - assert.Empty(t, string(buf.Bytes()), "log not empty") + + for { + line, err := buf.ReadBytes('\n') + if err != nil { + assert.Equal(t, io.EOF, err) + break + } + assert.Empty(t, string(line), "log not empty") + } } }