Log the protocol and address that socket_listener is listening on (#5454)

This commit is contained in:
Daniel Nelson 2019-02-25 11:11:25 -08:00 committed by GitHub
parent 0882479cbf
commit eb794ec30f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 11 deletions

View File

@ -242,14 +242,17 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
return fmt.Errorf("invalid service address: %s", sl.ServiceAddress) return fmt.Errorf("invalid service address: %s", sl.ServiceAddress)
} }
if spl[0] == "unix" || spl[0] == "unixpacket" || spl[0] == "unixgram" { protocol := spl[0]
addr := spl[1]
if protocol == "unix" || protocol == "unixpacket" || protocol == "unixgram" {
// no good way of testing for "file does not exist". // no good way of testing for "file does not exist".
// Instead just ignore error and blow up when we try to listen, which will // Instead just ignore error and blow up when we try to listen, which will
// indicate "address already in use" if file existed and we couldn't remove. // indicate "address already in use" if file existed and we couldn't remove.
os.Remove(spl[1]) os.Remove(addr)
} }
switch spl[0] { switch protocol {
case "tcp", "tcp4", "tcp6", "unix", "unixpacket": case "tcp", "tcp4", "tcp6", "unix", "unixpacket":
var ( var (
err error err error
@ -262,14 +265,16 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
} }
if tlsCfg == nil { if tlsCfg == nil {
l, err = net.Listen(spl[0], spl[1]) l, err = net.Listen(protocol, addr)
} else { } else {
l, err = tls.Listen(spl[0], spl[1], tlsCfg) l, err = tls.Listen(protocol, addr, tlsCfg)
} }
if err != nil { if err != nil {
return err return err
} }
log.Printf("I! [inputs.socket_listener] Listening on %s://%s", protocol, l.Addr())
ssl := &streamSocketListener{ ssl := &streamSocketListener{
Listener: l, Listener: l,
SocketListener: sl, SocketListener: sl,
@ -279,7 +284,7 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
sl.Closer = ssl sl.Closer = ssl
go ssl.listen() go ssl.listen()
case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram": case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram":
pc, err := net.ListenPacket(spl[0], spl[1]) pc, err := net.ListenPacket(protocol, addr)
if err != nil { if err != nil {
return err return err
} }
@ -288,10 +293,12 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
if srb, ok := pc.(setReadBufferer); ok { if srb, ok := pc.(setReadBufferer); ok {
srb.SetReadBuffer(int(sl.ReadBufferSize.Size)) srb.SetReadBuffer(int(sl.ReadBufferSize.Size))
} else { } else {
log.Printf("W! Unable to set read buffer on a %s socket", spl[0]) log.Printf("W! Unable to set read buffer on a %s socket", protocol)
} }
} }
log.Printf("I! [inputs.socket_listener] Listening on %s://%s", protocol, pc.LocalAddr())
psl := &packetSocketListener{ psl := &packetSocketListener{
PacketConn: pc, PacketConn: pc,
SocketListener: sl, SocketListener: sl,
@ -300,10 +307,10 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
sl.Closer = psl sl.Closer = psl
go psl.listen() go psl.listen()
default: default:
return fmt.Errorf("unknown protocol '%s' in '%s'", spl[0], sl.ServiceAddress) return fmt.Errorf("unknown protocol '%s' in '%s'", protocol, sl.ServiceAddress)
} }
if spl[0] == "unix" || spl[0] == "unixpacket" || spl[0] == "unixgram" { if protocol == "unix" || protocol == "unixpacket" || protocol == "unixgram" {
sl.Closer = unixCloser{path: spl[1], closer: sl.Closer} sl.Closer = unixCloser{path: spl[1], closer: sl.Closer}
} }

View File

@ -3,6 +3,7 @@ package socket_listener
import ( import (
"bytes" "bytes"
"crypto/tls" "crypto/tls"
"io"
"io/ioutil" "io/ioutil"
"log" "log"
"net" "net"
@ -13,6 +14,7 @@ import (
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/influxdata/wlog"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -23,11 +25,22 @@ var pki = testutil.NewPKI("../../../testutil/pki")
// Should be called at the start of the test, and returns a function which should run at the end. // Should be called at the start of the test, and returns a function which should run at the end.
func testEmptyLog(t *testing.T) func() { func testEmptyLog(t *testing.T) func() {
buf := bytes.NewBuffer(nil) buf := bytes.NewBuffer(nil)
log.SetOutput(buf) log.SetOutput(wlog.NewWriter(buf))
level := wlog.WARN
wlog.SetLevel(level)
return func() { return func() {
log.SetOutput(os.Stderr) log.SetOutput(os.Stderr)
assert.Empty(t, string(buf.Bytes()), "log not empty")
for {
line, err := buf.ReadBytes('\n')
if err != nil {
assert.Equal(t, io.EOF, err)
break
}
assert.Empty(t, string(line), "log not empty")
}
} }
} }