diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go index 9d3a8e1fe..b5c0202cc 100644 --- a/plugins/inputs/socket_listener/socket_listener.go +++ b/plugins/inputs/socket_listener/socket_listener.go @@ -6,6 +6,7 @@ import ( "io" "log" "net" + "os" "strings" "sync" @@ -32,7 +33,9 @@ func (ssl *streamSocketListener) listen() { for { c, err := ssl.Accept() if err != nil { - ssl.AddError(err) + if !strings.HasSuffix(err.Error(), ": use of closed network connection") { + ssl.AddError(err) + } break } @@ -78,7 +81,9 @@ func (ssl *streamSocketListener) read(c net.Conn) { } if err := scnr.Err(); err != nil { - ssl.AddError(err) + if !strings.HasSuffix(err.Error(), ": use of closed network connection") { + ssl.AddError(err) + } } } @@ -92,7 +97,9 @@ func (psl *packetSocketListener) listen() { for { n, _, err := psl.ReadFrom(buf) if err != nil { - psl.AddError(err) + if !strings.HasSuffix(err.Error(), ": use of closed network connection") { + psl.AddError(err) + } break } @@ -170,6 +177,13 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error { return fmt.Errorf("invalid service address: %s", sl.ServiceAddress) } + if spl[0] == "unix" || spl[0] == "unixpacket" || spl[0] == "unixgram" { + // no good way of testing for "file does not exist". + // Instead just ignore error and blow up when we try to listen, which will + // indicate "address already in use" if file existed and we couldn't remove. + os.Remove(spl[1]) + } + switch spl[0] { case "tcp", "tcp4", "tcp6", "unix", "unixpacket": l, err := net.Listen(spl[0], spl[1]) @@ -217,6 +231,10 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error { return fmt.Errorf("unknown protocol '%s' in '%s'", spl[0], sl.ServiceAddress) } + if spl[0] == "unix" || spl[0] == "unixpacket" || spl[0] == "unixgram" { + sl.Closer = unixCloser{path: spl[1], closer: sl.Closer} + } + return nil } @@ -235,6 +253,17 @@ func newSocketListener() *SocketListener { } } +type unixCloser struct { + path string + closer io.Closer +} + +func (uc unixCloser) Close() error { + err := uc.closer.Close() + os.Remove(uc.path) // ignore error + return err +} + func init() { inputs.Add("socket_listener", func() telegraf.Input { return newSocketListener() }) } diff --git a/plugins/inputs/socket_listener/socket_listener_test.go b/plugins/inputs/socket_listener/socket_listener_test.go index 9fa472809..b263e5082 100644 --- a/plugins/inputs/socket_listener/socket_listener_test.go +++ b/plugins/inputs/socket_listener/socket_listener_test.go @@ -18,6 +18,7 @@ func TestSocketListener_tcp(t *testing.T) { acc := &testutil.Accumulator{} err := sl.Start(acc) require.NoError(t, err) + defer sl.Stop() client, err := net.Dial("tcp", sl.Closer.(net.Listener).Addr().String()) require.NoError(t, err) @@ -32,6 +33,7 @@ func TestSocketListener_udp(t *testing.T) { acc := &testutil.Accumulator{} err := sl.Start(acc) require.NoError(t, err) + defer sl.Stop() client, err := net.Dial("udp", sl.Closer.(net.PacketConn).LocalAddr().String()) require.NoError(t, err) @@ -40,13 +42,14 @@ func TestSocketListener_udp(t *testing.T) { } func TestSocketListener_unix(t *testing.T) { - defer os.Remove("/tmp/telegraf_test.sock") + os.Create("/tmp/telegraf_test.sock") sl := newSocketListener() sl.ServiceAddress = "unix:///tmp/telegraf_test.sock" acc := &testutil.Accumulator{} err := sl.Start(acc) require.NoError(t, err) + defer sl.Stop() client, err := net.Dial("unix", "/tmp/telegraf_test.sock") require.NoError(t, err) @@ -55,13 +58,14 @@ func TestSocketListener_unix(t *testing.T) { } func TestSocketListener_unixgram(t *testing.T) { - defer os.Remove("/tmp/telegraf_test.sock") + os.Create("/tmp/telegraf_test.sock") sl := newSocketListener() sl.ServiceAddress = "unixgram:///tmp/telegraf_test.sock" acc := &testutil.Accumulator{} err := sl.Start(acc) require.NoError(t, err) + defer sl.Stop() client, err := net.Dial("unixgram", "/tmp/telegraf_test.sock") require.NoError(t, err) diff --git a/plugins/outputs/socket_writer/socket_writer_test.go b/plugins/outputs/socket_writer/socket_writer_test.go index 3ab9d1e34..6be2b0905 100644 --- a/plugins/outputs/socket_writer/socket_writer_test.go +++ b/plugins/outputs/socket_writer/socket_writer_test.go @@ -44,6 +44,7 @@ func TestSocketWriter_udp(t *testing.T) { } func TestSocketWriter_unix(t *testing.T) { + os.Remove("/tmp/telegraf_test.sock") defer os.Remove("/tmp/telegraf_test.sock") listener, err := net.Listen("unix", "/tmp/telegraf_test.sock") require.NoError(t, err) @@ -61,6 +62,7 @@ func TestSocketWriter_unix(t *testing.T) { } func TestSocketWriter_unixgram(t *testing.T) { + os.Remove("/tmp/telegraf_test.sock") defer os.Remove("/tmp/telegraf_test.sock") listener, err := net.ListenPacket("unixgram", "/tmp/telegraf_test.sock") require.NoError(t, err)