From cc927357a44b3bacab39260625eea405b089b6fc Mon Sep 17 00:00:00 2001 From: Steven Soroka Date: Tue, 5 May 2020 10:14:57 -0400 Subject: [PATCH] shim improvements for docs, clean quit, and slow readers (#7452) --- ...legraf_notwindows.go => telegraf_posix.go} | 0 plugins/inputs/execd/README.md | 21 +++-- plugins/inputs/execd/execd.go | 2 +- plugins/inputs/execd/execd_posix.go | 5 ++ plugins/inputs/execd/execd_test.go | 3 +- plugins/inputs/execd/execd_windows.go | 5 ++ plugins/inputs/execd/shim/goshim.go | 82 ++++++++++++------- .../inputs/execd/shim/goshim_notwindows.go | 14 ---- plugins/inputs/execd/shim/goshim_posix.go | 23 ++++++ plugins/inputs/execd/shim/goshim_windows.go | 11 ++- plugins/inputs/execd/shim/shim_posix_test.go | 38 ++++----- plugins/inputs/execd/shim/shim_test.go | 59 ++++++------- 12 files changed, 159 insertions(+), 104 deletions(-) rename cmd/telegraf/{telegraf_notwindows.go => telegraf_posix.go} (100%) delete mode 100644 plugins/inputs/execd/shim/goshim_notwindows.go create mode 100644 plugins/inputs/execd/shim/goshim_posix.go diff --git a/cmd/telegraf/telegraf_notwindows.go b/cmd/telegraf/telegraf_posix.go similarity index 100% rename from cmd/telegraf/telegraf_notwindows.go rename to cmd/telegraf/telegraf_posix.go diff --git a/plugins/inputs/execd/README.md b/plugins/inputs/execd/README.md index 022311924..f8709c8be 100644 --- a/plugins/inputs/execd/README.md +++ b/plugins/inputs/execd/README.md @@ -1,9 +1,13 @@ # Execd Input Plugin -The `execd` plugin runs an external program as a daemon. The programs must output metrics in any one of the accepted -[Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md) on its standard output. +The `execd` plugin runs an external program as a long-running daemon. +The programs must output metrics in any one of the accepted +[Input Data Formats](input_formats) on the process's STDOUT, and is expected to +stay running. If you'd instead like the process to collect metrics and then exit, +check out the [inputs.exec](exec_plugin) plugin. -The `signal` can be configured to send a signal the running daemon on each collection interval. +The `signal` can be configured to send a signal the running daemon on each +collection interval. Program output on standard error is mirrored to the telegraf log. @@ -16,10 +20,10 @@ Program output on standard error is mirrored to the telegraf log. ## Define how the process is signaled on each collection interval. ## Valid values are: - ## "none" : Do not signal anything. - ## The process must output metrics by itself. - ## "STDIN" : Send a newline on STDIN. - ## "SIGHUP" : Send a HUP signal. Not available on Windows. + ## "none" : Do not signal anything. (Recommended for service inputs) + ## The process must output metrics by itself. + ## "STDIN" : Send a newline on STDIN. (Recommended for gather inputs) + ## "SIGHUP" : Send a HUP signal. Not available on Windows. (not recommended) ## "SIGUSR1" : Send a USR1 signal. Not available on Windows. ## "SIGUSR2" : Send a USR2 signal. Not available on Windows. signal = "none" @@ -110,3 +114,6 @@ end command = ["plugins/inputs/execd/examples/count.rb"] signal = "none" ``` + +[input_formats]: https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md +[exec_plugin]: https://github.com/influxdata/telegraf/blob/master/plugins/inputs/exec/README.md diff --git a/plugins/inputs/execd/execd.go b/plugins/inputs/execd/execd.go index 90a5ceffb..b162c9776 100644 --- a/plugins/inputs/execd/execd.go +++ b/plugins/inputs/execd/execd.go @@ -75,7 +75,7 @@ func (e *Execd) Start(acc telegraf.Accumulator) error { return fmt.Errorf("FATAL no command specified") } - e.wg.Add(1) + e.wg.Add(1) // for the main loop ctx, cancel := context.WithCancel(context.Background()) e.cancel = cancel diff --git a/plugins/inputs/execd/execd_posix.go b/plugins/inputs/execd/execd_posix.go index 919447260..d2389c52f 100644 --- a/plugins/inputs/execd/execd_posix.go +++ b/plugins/inputs/execd/execd_posix.go @@ -5,7 +5,9 @@ package execd import ( "fmt" "io" + "os" "syscall" + "time" "github.com/influxdata/telegraf" ) @@ -23,6 +25,9 @@ func (e *Execd) Gather(acc telegraf.Accumulator) error { case "SIGUSR2": e.cmd.Process.Signal(syscall.SIGUSR2) case "STDIN": + if osStdin, ok := e.stdin.(*os.File); ok { + osStdin.SetWriteDeadline(time.Now().Add(1 * time.Second)) + } if _, err := io.WriteString(e.stdin, "\n"); err != nil { return fmt.Errorf("Error writing to stdin: %s", err) } diff --git a/plugins/inputs/execd/execd_test.go b/plugins/inputs/execd/execd_test.go index b78075e95..1c687a9df 100644 --- a/plugins/inputs/execd/execd_test.go +++ b/plugins/inputs/execd/execd_test.go @@ -33,11 +33,12 @@ func TestExternalInputWorks(t *testing.T) { require.NoError(t, e.Start(acc)) require.NoError(t, e.Gather(acc)) - e.Stop() // grab a metric and make sure it's a thing m := readChanWithTimeout(t, metrics, 10*time.Second) + e.Stop() + require.Equal(t, "counter_bash", m.Name()) val, ok := m.GetField("count") require.True(t, ok) diff --git a/plugins/inputs/execd/execd_windows.go b/plugins/inputs/execd/execd_windows.go index 443d8f686..c0dc0e846 100644 --- a/plugins/inputs/execd/execd_windows.go +++ b/plugins/inputs/execd/execd_windows.go @@ -5,6 +5,8 @@ package execd import ( "fmt" "io" + "os" + "time" "github.com/influxdata/telegraf" ) @@ -16,6 +18,9 @@ func (e *Execd) Gather(acc telegraf.Accumulator) error { switch e.Signal { case "STDIN": + if osStdin, ok := e.stdin.(*os.File); ok { + osStdin.SetWriteDeadline(time.Now().Add(1 * time.Second)) + } if _, err := io.WriteString(e.stdin, "\n"); err != nil { return fmt.Errorf("Error writing to stdin: %s", err) } diff --git a/plugins/inputs/execd/shim/goshim.go b/plugins/inputs/execd/shim/goshim.go index cd0c4ddec..3741d2b80 100644 --- a/plugins/inputs/execd/shim/goshim.go +++ b/plugins/inputs/execd/shim/goshim.go @@ -23,9 +23,9 @@ import ( type empty struct{} var ( - gatherPromptChans []chan empty - stdout io.Writer = os.Stdout - stdin io.Reader = os.Stdin + stdout io.Writer = os.Stdout + stdin io.Reader = os.Stdin + forever = 100 * 365 * 24 * time.Hour ) const ( @@ -34,10 +34,15 @@ const ( PollIntervalDisabled = time.Duration(0) ) +// Shim allows you to wrap your inputs and run them as if they were part of Telegraf, +// except built externally. type Shim struct { - Inputs []telegraf.Input + Inputs []telegraf.Input + gatherPromptChans []chan empty + metricCh chan telegraf.Metric } +// New creates a new shim interface func New() *Shim { return &Shim{} } @@ -67,25 +72,26 @@ func (s *Shim) AddInputs(newInputs []telegraf.Input) error { // Run the input plugins.. func (s *Shim) Run(pollInterval time.Duration) error { + // context is used only to close the stdin reader. everything else cascades + // from that point and closes cleanly when it's done. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.metricCh = make(chan telegraf.Metric, 1) + wg := sync.WaitGroup{} quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) collectMetricsPrompt := make(chan os.Signal, 1) - listenForCollectMetricsSignals(collectMetricsPrompt) - - wg.Add(1) // wait for the metric channel to close - metricCh := make(chan telegraf.Metric, 1) + listenForCollectMetricsSignals(ctx, collectMetricsPrompt) serializer := influx.NewSerializer() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - for _, input := range s.Inputs { wrappedInput := inputShim{Input: input} - acc := agent.NewAccumulator(wrappedInput, metricCh) + acc := agent.NewAccumulator(wrappedInput, s.metricCh) acc.SetPrecision(time.Nanosecond) if serviceInput, ok := input.(telegraf.ServiceInput); ok { @@ -94,30 +100,35 @@ func (s *Shim) Run(pollInterval time.Duration) error { } } gatherPromptCh := make(chan empty, 1) - gatherPromptChans = append(gatherPromptChans, gatherPromptCh) + s.gatherPromptChans = append(s.gatherPromptChans, gatherPromptCh) wg.Add(1) go func(input telegraf.Input) { startGathering(ctx, input, acc, gatherPromptCh, pollInterval) if serviceInput, ok := input.(telegraf.ServiceInput); ok { serviceInput.Stop() } + close(gatherPromptCh) wg.Done() }(input) } - go stdinCollectMetricsPrompt(ctx, collectMetricsPrompt) + go s.stdinCollectMetricsPrompt(ctx, cancel, collectMetricsPrompt) + go s.closeMetricChannelWhenInputsFinish(&wg) loop: for { select { - case <-quit: + case <-quit: // user-triggered quit // cancel, but keep looping until the metric channel closes. cancel() - case <-collectMetricsPrompt: - collectMetrics(ctx) - case m, open := <-metricCh: + case _, open := <-collectMetricsPrompt: + if !open { // stdin-close-triggered quit + cancel() + continue + } + s.collectMetrics(ctx) + case m, open := <-s.metricCh: if !open { - wg.Done() break loop } b, err := serializer.Serialize(m) @@ -129,7 +140,6 @@ loop: } } - wg.Wait() return nil } @@ -142,11 +152,16 @@ func hasQuit(ctx context.Context) bool { } } -func stdinCollectMetricsPrompt(ctx context.Context, collectMetricsPrompt chan<- os.Signal) { - s := bufio.NewScanner(stdin) +func (s *Shim) stdinCollectMetricsPrompt(ctx context.Context, cancel context.CancelFunc, collectMetricsPrompt chan<- os.Signal) { + defer func() { + cancel() + close(collectMetricsPrompt) + }() + + scanner := bufio.NewScanner(stdin) // for every line read from stdin, make sure we're not supposed to quit, // then push a message on to the collectMetricsPrompt - for s.Scan() { + for scanner.Scan() { // first check if we should quit if hasQuit(ctx) { return @@ -159,7 +174,7 @@ func stdinCollectMetricsPrompt(ctx context.Context, collectMetricsPrompt chan<- // pushCollectMetricsRequest pushes a non-blocking (nil) message to the // collectMetricsPrompt channel to trigger metric collection. -// The channel is defined with a buffer of 1, so if it's full, duplicated +// The channel is defined with a buffer of 1, so while it's full, subsequent // requests are discarded. func pushCollectMetricsRequest(collectMetricsPrompt chan<- os.Signal) { select { @@ -168,14 +183,14 @@ func pushCollectMetricsRequest(collectMetricsPrompt chan<- os.Signal) { } } -func collectMetrics(ctx context.Context) { +func (s *Shim) collectMetrics(ctx context.Context) { if hasQuit(ctx) { return } - for i := 0; i < len(gatherPromptChans); i++ { + for i := 0; i < len(s.gatherPromptChans); i++ { // push a message out to each channel to collect metrics. don't block. select { - case gatherPromptChans[i] <- empty{}: + case s.gatherPromptChans[i] <- empty{}: default: } } @@ -196,7 +211,11 @@ func startGathering(ctx context.Context, input telegraf.Input, acc telegraf.Accu select { case <-ctx.Done(): return - case <-gatherPromptCh: + case _, open := <-gatherPromptCh: + if !open { + // stdin has closed. + return + } if err := input.Gather(acc); err != nil { fmt.Fprintf(os.Stderr, "failed to gather metrics: %s", err) } @@ -229,7 +248,7 @@ func DefaultImportedPlugins() (i []telegraf.Input, e error) { // LoadConfig loads the config and returns inputs that later need to be loaded. func LoadConfig(filePath *string) ([]telegraf.Input, error) { - if filePath == nil { + if filePath == nil || *filePath == "" { return DefaultImportedPlugins() } @@ -276,3 +295,8 @@ func loadConfigIntoInputs(md toml.MetaData, inputConfigs map[string][]toml.Primi } return renderedInputs, nil } + +func (s *Shim) closeMetricChannelWhenInputsFinish(wg *sync.WaitGroup) { + wg.Wait() + close(s.metricCh) +} diff --git a/plugins/inputs/execd/shim/goshim_notwindows.go b/plugins/inputs/execd/shim/goshim_notwindows.go deleted file mode 100644 index 67d41884f..000000000 --- a/plugins/inputs/execd/shim/goshim_notwindows.go +++ /dev/null @@ -1,14 +0,0 @@ -// +build !windows - -package shim - -import ( - "os" - "os/signal" - "syscall" -) - -func listenForCollectMetricsSignals(collectMetricsPrompt chan os.Signal) { - // just listen to all the signals. - signal.Notify(collectMetricsPrompt, syscall.SIGHUP, syscall.SIGUSR1, syscall.SIGUSR2) -} diff --git a/plugins/inputs/execd/shim/goshim_posix.go b/plugins/inputs/execd/shim/goshim_posix.go new file mode 100644 index 000000000..4e4a04f14 --- /dev/null +++ b/plugins/inputs/execd/shim/goshim_posix.go @@ -0,0 +1,23 @@ +// +build !windows + +package shim + +import ( + "context" + "os" + "os/signal" + "syscall" +) + +func listenForCollectMetricsSignals(ctx context.Context, collectMetricsPrompt chan os.Signal) { + // just listen to all the signals. + signal.Notify(collectMetricsPrompt, syscall.SIGHUP, syscall.SIGUSR1, syscall.SIGUSR2) + + go func() { + select { + case <-ctx.Done(): + // context done. stop to signals to avoid pushing messages to a closed channel + signal.Stop(collectMetricsPrompt) + } + }() +} diff --git a/plugins/inputs/execd/shim/goshim_windows.go b/plugins/inputs/execd/shim/goshim_windows.go index a6bfd1ede..317f8a2f3 100644 --- a/plugins/inputs/execd/shim/goshim_windows.go +++ b/plugins/inputs/execd/shim/goshim_windows.go @@ -3,11 +3,20 @@ package shim import ( + "context" "os" "os/signal" "syscall" ) -func listenForCollectMetricsSignals(collectMetricsPrompt chan os.Signal) { +func listenForCollectMetricsSignals(ctx context.Context, collectMetricsPrompt chan os.Signal) { signal.Notify(collectMetricsPrompt, syscall.SIGHUP) + + go func() { + select { + case <-ctx.Done(): + // context done. stop to signals to avoid pushing messages to a closed channel + signal.Stop(collectMetricsPrompt) + } + }() } diff --git a/plugins/inputs/execd/shim/shim_posix_test.go b/plugins/inputs/execd/shim/shim_posix_test.go index 85053130f..de549cc3c 100644 --- a/plugins/inputs/execd/shim/shim_posix_test.go +++ b/plugins/inputs/execd/shim/shim_posix_test.go @@ -3,11 +3,11 @@ package shim import ( - "bytes" + "bufio" "context" + "io" "os" "runtime" - "strings" "syscall" "testing" "time" @@ -20,15 +20,15 @@ func TestShimUSR1SignalingWorks(t *testing.T) { t.Skip() return } - stdoutBytes := bytes.NewBufferString("") - stdout = stdoutBytes + stdinReader, stdinWriter := io.Pipe() + stdoutReader, stdoutWriter := io.Pipe() + + stdin = stdinReader + stdout = stdoutWriter ctx, cancel := context.WithCancel(context.Background()) defer cancel() - wait := runInputPlugin(t, 40*time.Second) - - // sleep a bit to avoid a race condition where the input hasn't loaded yet. - time.Sleep(10 * time.Millisecond) + metricProcessed, exited := runInputPlugin(t, 20*time.Minute) // signal USR1 to yourself. pid := os.Getpid() @@ -54,23 +54,17 @@ func TestShimUSR1SignalingWorks(t *testing.T) { timeout := time.NewTimer(10 * time.Second) select { - case <-wait: + case <-metricProcessed: case <-timeout.C: require.Fail(t, "Timeout waiting for metric to arrive") } + cancel() - for stdoutBytes.Len() == 0 { - select { - case <-timeout.C: - require.Fail(t, "Timeout waiting to read metric from stdout") - return - default: - time.Sleep(10 * time.Millisecond) - } - } + r := bufio.NewReader(stdoutReader) + out, err := r.ReadString('\n') + require.NoError(t, err) + require.Equal(t, "measurement,tag=tag field=1i 1234000005678\n", out) - out := string(stdoutBytes.Bytes()) - require.Contains(t, out, "\n") - metricLine := strings.Split(out, "\n")[0] - require.Equal(t, "measurement,tag=tag field=1i 1234000005678", metricLine) + stdinWriter.Close() + <-exited } diff --git a/plugins/inputs/execd/shim/shim_test.go b/plugins/inputs/execd/shim/shim_test.go index 9d97bd239..498ef4ab5 100644 --- a/plugins/inputs/execd/shim/shim_test.go +++ b/plugins/inputs/execd/shim/shim_test.go @@ -1,7 +1,9 @@ package shim import ( + "bufio" "bytes" + "io" "strings" "testing" "time" @@ -15,11 +17,13 @@ func TestShimWorks(t *testing.T) { stdoutBytes := bytes.NewBufferString("") stdout = stdoutBytes + stdin, _ = io.Pipe() // hold the stdin pipe open + timeout := time.NewTimer(10 * time.Second) - wait := runInputPlugin(t, 10*time.Millisecond) + metricProcessed, _ := runInputPlugin(t, 10*time.Millisecond) select { - case <-wait: + case <-metricProcessed: case <-timeout.C: require.Fail(t, "Timeout waiting for metric to arrive") } @@ -40,55 +44,52 @@ func TestShimWorks(t *testing.T) { } func TestShimStdinSignalingWorks(t *testing.T) { - stdoutBytes := bytes.NewBufferString("") - stdout = stdoutBytes - stdinBytes := bytes.NewBufferString("") - stdin = stdinBytes + stdinReader, stdinWriter := io.Pipe() + stdoutReader, stdoutWriter := io.Pipe() + + stdin = stdinReader + stdout = stdoutWriter timeout := time.NewTimer(10 * time.Second) - wait := runInputPlugin(t, 40*time.Second) + metricProcessed, exited := runInputPlugin(t, 40*time.Second) - stdinBytes.WriteString("\n") + stdinWriter.Write([]byte("\n")) select { - case <-wait: + case <-metricProcessed: case <-timeout.C: require.Fail(t, "Timeout waiting for metric to arrive") } - for stdoutBytes.Len() == 0 { - select { - case <-timeout.C: - require.Fail(t, "Timeout waiting to read metric from stdout") - return - default: - time.Sleep(10 * time.Millisecond) - } - } + r := bufio.NewReader(stdoutReader) + out, err := r.ReadString('\n') + require.NoError(t, err) + require.Equal(t, "measurement,tag=tag field=1i 1234000005678\n", out) - out := string(stdoutBytes.Bytes()) - require.Contains(t, out, "\n") - metricLine := strings.Split(out, "\n")[0] - require.Equal(t, "measurement,tag=tag field=1i 1234000005678", metricLine) + stdinWriter.Close() + // check that it exits cleanly + <-exited } -func runInputPlugin(t *testing.T, timeout time.Duration) chan bool { - wait := make(chan bool) +func runInputPlugin(t *testing.T, interval time.Duration) (metricProcessed chan bool, exited chan bool) { + metricProcessed = make(chan bool) + exited = make(chan bool) inp := &testInput{ - wait: wait, + metricProcessed: metricProcessed, } shim := New() shim.AddInput(inp) go func() { - err := shim.Run(timeout) // we aren't using the timer here + err := shim.Run(interval) require.NoError(t, err) + exited <- true }() - return wait + return metricProcessed, exited } type testInput struct { - wait chan bool + metricProcessed chan bool } func (i *testInput) SampleConfig() string { @@ -107,7 +108,7 @@ func (i *testInput) Gather(acc telegraf.Accumulator) error { map[string]string{ "tag": "tag", }, time.Unix(1234, 5678)) - i.wait <- true + i.metricProcessed <- true return nil }