fix randomly failing CI test (#7514)

This commit is contained in:
Steven Soroka 2020-05-21 16:53:07 -04:00 committed by GitHub
parent 58ad64a43b
commit d4e9fd15ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 30 additions and 16 deletions

View File

@ -12,7 +12,6 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/influx"
@ -47,14 +46,14 @@ type Execd struct {
Signal string Signal string
RestartDelay config.Duration RestartDelay config.Duration
acc telegraf.Accumulator acc telegraf.Accumulator
cmd *exec.Cmd cmd *exec.Cmd
parser parsers.Parser parser parsers.Parser
stdin io.WriteCloser stdin io.WriteCloser
stdout io.ReadCloser stdout io.ReadCloser
stderr io.ReadCloser stderr io.ReadCloser
cancel context.CancelFunc cancel context.CancelFunc
wg sync.WaitGroup mainLoopWg sync.WaitGroup
} }
func (e *Execd) SampleConfig() string { func (e *Execd) SampleConfig() string {
@ -76,7 +75,7 @@ func (e *Execd) Start(acc telegraf.Accumulator) error {
return fmt.Errorf("FATAL no command specified") return fmt.Errorf("FATAL no command specified")
} }
e.wg.Add(1) // for the main loop e.mainLoopWg.Add(1)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
e.cancel = cancel e.cancel = cancel
@ -86,16 +85,19 @@ func (e *Execd) Start(acc telegraf.Accumulator) error {
} }
go func() { go func() {
e.cmdLoop(ctx) if err := e.cmdLoop(ctx); err != nil {
e.wg.Done() log.Printf("Process quit with message: %s", err.Error())
}
e.mainLoopWg.Done()
}() }()
return nil return nil
} }
func (e *Execd) Stop() { func (e *Execd) Stop() {
// don't try to stop before all stream readers have started.
e.cancel() e.cancel()
e.wg.Wait() e.mainLoopWg.Wait()
} }
// cmdLoop watches an already running process, restarting it when appropriate. // cmdLoop watches an already running process, restarting it when appropriate.
@ -112,9 +114,7 @@ func (e *Execd) cmdLoop(ctx context.Context) error {
case <-ctx.Done(): case <-ctx.Done():
if e.stdin != nil { if e.stdin != nil {
e.stdin.Close() e.stdin.Close()
// Immediately exit process but with a graceful shutdown gracefulStop(e.cmd, 5*time.Second)
// period before killing
internal.WaitTimeout(e.cmd, 200*time.Millisecond)
} }
return nil return nil
case err := <-done: case err := <-done:

View File

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"os/exec"
"syscall" "syscall"
"time" "time"
@ -38,3 +39,11 @@ func (e *Execd) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
func gracefulStop(cmd *exec.Cmd, timeout time.Duration) {
cmd.Process.Signal(syscall.SIGTERM)
go func() {
<-time.NewTimer(timeout).C
cmd.Process.Kill()
}()
}

View File

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"os/exec"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
@ -31,3 +32,7 @@ func (e *Execd) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
func gracefulStop(cmd *exec.Cmd, timeout time.Duration) {
cmd.Process.Kill()
}