From d4e9fd15ce97467ca1ee9bac6e77d5df95e8628d Mon Sep 17 00:00:00 2001 From: Steven Soroka Date: Thu, 21 May 2020 16:53:07 -0400 Subject: [PATCH] fix randomly failing CI test (#7514) --- plugins/inputs/execd/execd.go | 32 +++++++++++++-------------- plugins/inputs/execd/execd_posix.go | 9 ++++++++ plugins/inputs/execd/execd_windows.go | 5 +++++ 3 files changed, 30 insertions(+), 16 deletions(-) diff --git a/plugins/inputs/execd/execd.go b/plugins/inputs/execd/execd.go index 1ea136a3d..ca9e589d9 100644 --- a/plugins/inputs/execd/execd.go +++ b/plugins/inputs/execd/execd.go @@ -12,7 +12,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" - "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/influx" @@ -47,14 +46,14 @@ type Execd struct { Signal string RestartDelay config.Duration - acc telegraf.Accumulator - cmd *exec.Cmd - parser parsers.Parser - stdin io.WriteCloser - stdout io.ReadCloser - stderr io.ReadCloser - cancel context.CancelFunc - wg sync.WaitGroup + acc telegraf.Accumulator + cmd *exec.Cmd + parser parsers.Parser + stdin io.WriteCloser + stdout io.ReadCloser + stderr io.ReadCloser + cancel context.CancelFunc + mainLoopWg sync.WaitGroup } func (e *Execd) SampleConfig() string { @@ -76,7 +75,7 @@ func (e *Execd) Start(acc telegraf.Accumulator) error { return fmt.Errorf("FATAL no command specified") } - e.wg.Add(1) // for the main loop + e.mainLoopWg.Add(1) ctx, cancel := context.WithCancel(context.Background()) e.cancel = cancel @@ -86,16 +85,19 @@ func (e *Execd) Start(acc telegraf.Accumulator) error { } go func() { - e.cmdLoop(ctx) - e.wg.Done() + if err := e.cmdLoop(ctx); err != nil { + log.Printf("Process quit with message: %s", err.Error()) + } + e.mainLoopWg.Done() }() return nil } func (e *Execd) Stop() { + // don't try to stop before all stream readers have started. e.cancel() - e.wg.Wait() + e.mainLoopWg.Wait() } // cmdLoop watches an already running process, restarting it when appropriate. @@ -112,9 +114,7 @@ func (e *Execd) cmdLoop(ctx context.Context) error { case <-ctx.Done(): if e.stdin != nil { e.stdin.Close() - // Immediately exit process but with a graceful shutdown - // period before killing - internal.WaitTimeout(e.cmd, 200*time.Millisecond) + gracefulStop(e.cmd, 5*time.Second) } return nil case err := <-done: diff --git a/plugins/inputs/execd/execd_posix.go b/plugins/inputs/execd/execd_posix.go index d2389c52f..cc3a8e8bb 100644 --- a/plugins/inputs/execd/execd_posix.go +++ b/plugins/inputs/execd/execd_posix.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "os" + "os/exec" "syscall" "time" @@ -38,3 +39,11 @@ func (e *Execd) Gather(acc telegraf.Accumulator) error { return nil } + +func gracefulStop(cmd *exec.Cmd, timeout time.Duration) { + cmd.Process.Signal(syscall.SIGTERM) + go func() { + <-time.NewTimer(timeout).C + cmd.Process.Kill() + }() +} diff --git a/plugins/inputs/execd/execd_windows.go b/plugins/inputs/execd/execd_windows.go index c0dc0e846..82935d4ac 100644 --- a/plugins/inputs/execd/execd_windows.go +++ b/plugins/inputs/execd/execd_windows.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "os" + "os/exec" "time" "github.com/influxdata/telegraf" @@ -31,3 +32,7 @@ func (e *Execd) Gather(acc telegraf.Accumulator) error { return nil } + +func gracefulStop(cmd *exec.Cmd, timeout time.Duration) { + cmd.Process.Kill() +}