From a20e6953d27397f61a3ea4e385d5e934044b17dc Mon Sep 17 00:00:00 2001 From: Jan Graichen Date: Fri, 28 Feb 2020 19:46:03 +0100 Subject: [PATCH] Add an exec daemon plugin (#4424) --- README.md | 1 + plugins/inputs/all/all.go | 1 + plugins/inputs/execd/README.md | 111 +++++++++++++ plugins/inputs/execd/examples/count.go | 24 +++ plugins/inputs/execd/examples/count.rb | 13 ++ plugins/inputs/execd/examples/count.sh | 12 ++ plugins/inputs/execd/execd.go | 209 +++++++++++++++++++++++++ plugins/inputs/execd/execd_unix.go | 33 ++++ plugins/inputs/execd/execd_win.go | 26 +++ 9 files changed, 430 insertions(+) create mode 100644 plugins/inputs/execd/README.md create mode 100644 plugins/inputs/execd/examples/count.go create mode 100755 plugins/inputs/execd/examples/count.rb create mode 100755 plugins/inputs/execd/examples/count.sh create mode 100644 plugins/inputs/execd/execd.go create mode 100644 plugins/inputs/execd/execd_unix.go create mode 100644 plugins/inputs/execd/execd_win.go diff --git a/README.md b/README.md index 0df0f003c..49a4a456e 100644 --- a/README.md +++ b/README.md @@ -185,6 +185,7 @@ For documentation on the latest development code see the [documentation index][d * [elasticsearch](./plugins/inputs/elasticsearch) * [ethtool](./plugins/inputs/ethtool) * [exec](./plugins/inputs/exec) (generic executable plugin, support JSON, influx, graphite and nagios) +* [execd](./plugins/inputs/execd) * [fail2ban](./plugins/inputs/fail2ban) * [fibaro](./plugins/inputs/fibaro) * [file](./plugins/inputs/file) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index dec04b397..274d7fd41 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -40,6 +40,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/elasticsearch" _ "github.com/influxdata/telegraf/plugins/inputs/ethtool" _ "github.com/influxdata/telegraf/plugins/inputs/exec" + _ "github.com/influxdata/telegraf/plugins/inputs/execd" _ "github.com/influxdata/telegraf/plugins/inputs/fail2ban" _ "github.com/influxdata/telegraf/plugins/inputs/fibaro" _ "github.com/influxdata/telegraf/plugins/inputs/file" diff --git a/plugins/inputs/execd/README.md b/plugins/inputs/execd/README.md new file mode 100644 index 000000000..1205fdd56 --- /dev/null +++ b/plugins/inputs/execd/README.md @@ -0,0 +1,111 @@ +# Execd Input Plugin + +The `execd` plugin runs an external program as a daemon. The programs must output metrics in any one of the accepted [Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md) on its standard output. + +The `signal` can be configured to send a signal the running daemon on each collection interval. + +Program output on standard error is mirrored to the telegraf log. + +### Configuration: + +```toml + ## Program to run as daemon + command = ["telegraf-smartctl", "-d", "/dev/sda"] + + ## Define how the process is signaled on each collection interval. + + ## Valid values are: + ## "none" : Do not signal anything. + ## The process must output metrics by itself. + ## "STDIN" : Send a newline on STDIN. + ## "SIGHUP" : Send a HUP signal. Not available on Windows. + ## "SIGUSR1" : Send a USR1 signal. Not available on Windows. + ## "SIGUSR2" : Send a USR2 signal. Not available on Windows. + signal = "none" + + ## Delay before the process is restarted after an unexpected termination + restart_delay = "10s" + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +``` + +### Example + +##### Daemon written in bash using STDIN signaling + +```bash +#!/bin/bash + +counter=0 + +while IFS= read -r LINE; do + echo "counter_bash count=${counter}" + let counter=counter+1 +done +``` + +```toml +[[inputs.execd]] + command = ["plugins/inputs/execd/examples/count.sh"] + signal = "STDIN" +``` + +##### Go daemon using SIGHUP + +```go +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGHUP) + + counter := 0 + + for { + <-c + + fmt.Printf("counter_go count=%d\n", counter) + counter++ + } +} + +``` + +```toml +[[inputs.execd]] + command = ["plugins/inputs/execd/examples/count.go.exe"] + signal = "SIGHUP" +``` + +##### Ruby daemon running standalone + +```ruby +#!/usr/bin/env ruby + +counter = 0 + +loop do + puts "counter_ruby count=#{counter}" + STDOUT.flush + + counter += 1 + sleep 1 +end +``` + +```toml +[[inputs.execd]] + command = ["plugins/inputs/execd/examples/count.rb"] + signal = "none" +``` diff --git a/plugins/inputs/execd/examples/count.go b/plugins/inputs/execd/examples/count.go new file mode 100644 index 000000000..d5e4a12e1 --- /dev/null +++ b/plugins/inputs/execd/examples/count.go @@ -0,0 +1,24 @@ +package main + +// Example using HUP signaling + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGHUP) + + counter := 0 + + for { + <-c + + fmt.Printf("counter_go count=%d\n", counter) + counter++ + } +} diff --git a/plugins/inputs/execd/examples/count.rb b/plugins/inputs/execd/examples/count.rb new file mode 100755 index 000000000..220848d64 --- /dev/null +++ b/plugins/inputs/execd/examples/count.rb @@ -0,0 +1,13 @@ +#!/usr/bin/env ruby + +## Example in Ruby not using any signaling + +counter = 0 + +loop do + puts "counter_ruby count=#{counter}" + STDOUT.flush + counter += 1 + + sleep 1 +end diff --git a/plugins/inputs/execd/examples/count.sh b/plugins/inputs/execd/examples/count.sh new file mode 100755 index 000000000..aa6932a80 --- /dev/null +++ b/plugins/inputs/execd/examples/count.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +## Example in bash using STDIN signaling + +counter=0 + +while read; do + echo "counter_bash count=${counter}" + let counter=counter+1 +done + +(>&2 echo "terminate") diff --git a/plugins/inputs/execd/execd.go b/plugins/inputs/execd/execd.go new file mode 100644 index 000000000..0d1fc7cc5 --- /dev/null +++ b/plugins/inputs/execd/execd.go @@ -0,0 +1,209 @@ +package execd + +import ( + "bufio" + "context" + "fmt" + "io" + "log" + "os/exec" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" +) + +const sampleConfig = ` + ## Program to run as daemon + command = ["telegraf-smartctl", "-d", "/dev/sda"] + + ## Define how the process is signaled on each collection interval. + + ## Valid values are: + ## "none" : Do not signal anything. + ## The process must output metrics by itself. + ## "STDIN" : Send a newline on STDIN. + ## "SIGHUP" : Send a HUP signal. Not available on Windows. + signal = "none" + + ## Delay before the process is restarted after an unexpected termination + restart_delay = "10s" + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +` + +type Execd struct { + Command []string + Signal string + RestartDelay internal.Duration + + acc telegraf.Accumulator + cmd *exec.Cmd + parser parsers.Parser + stdin io.WriteCloser + cancel context.CancelFunc + wg sync.WaitGroup +} + +func (e *Execd) SampleConfig() string { + return sampleConfig +} + +func (e *Execd) Description() string { + return "Run executable as long-running input plugin" +} + +func (e *Execd) SetParser(parser parsers.Parser) { + e.parser = parser +} + +func (e *Execd) Start(acc telegraf.Accumulator) error { + e.acc = acc + + if len(e.Command) == 0 { + return fmt.Errorf("E! [inputs.execd] FATAL no command specified") + } + + e.wg.Add(1) + + var ctx context.Context + ctx, e.cancel = context.WithCancel(context.Background()) + + go func() { + e.cmdLoop(ctx) + e.wg.Done() + }() + + return nil +} + +func (e *Execd) Stop() { + e.cancel() + e.wg.Wait() +} + +func (e *Execd) cmdLoop(ctx context.Context) { + for { + // Use a buffered channel to ensure goroutine below can exit + // if `ctx.Done` is selected and nothing reads on `done` anymore + done := make(chan error, 1) + go func() { + done <- e.cmdRun() + }() + + select { + case <-ctx.Done(): + // Immediately exit process but with a graceful shutdown + // period before killing + internal.WaitTimeout(e.cmd, 0) + return + case err := <-done: + log.Printf("E! [inputs.execd] Process %s terminated: %s", e.Command, err) + } + + log.Printf("E! [inputs.execd] Restarting in %s...", e.RestartDelay.Duration) + + select { + case <-ctx.Done(): + return + case <-time.After(e.RestartDelay.Duration): + // Continue the loop and restart the process + } + } +} + +func (e *Execd) cmdRun() error { + var wg sync.WaitGroup + + if len(e.Command) > 1 { + e.cmd = exec.Command(e.Command[0], e.Command[1:]...) + } else { + e.cmd = exec.Command(e.Command[0]) + } + + stdin, err := e.cmd.StdinPipe() + if err != nil { + return fmt.Errorf("E! [inputs.execd] Error opening stdin pipe: %s", err) + } + + e.stdin = stdin + + stdout, err := e.cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("E! [inputs.execd] Error opening stdout pipe: %s", err) + } + + stderr, err := e.cmd.StderrPipe() + if err != nil { + return fmt.Errorf("E! [inputs.execd] Error opening stderr pipe: %s", err) + } + + log.Printf("D! [inputs.execd] Starting process: %s", e.Command) + + err = e.cmd.Start() + if err != nil { + return fmt.Errorf("E! [inputs.execd] Error starting process: %s", err) + } + + wg.Add(2) + + go func() { + e.cmdReadOut(stdout) + wg.Done() + }() + + go func() { + e.cmdReadErr(stderr) + wg.Done() + }() + + wg.Wait() + return e.cmd.Wait() +} + +func (e *Execd) cmdReadOut(out io.Reader) { + scanner := bufio.NewScanner(out) + + for scanner.Scan() { + metrics, err := e.parser.Parse(scanner.Bytes()) + if err != nil { + e.acc.AddError(fmt.Errorf("E! [inputs.execd] Parse error: %s", err)) + } + + for _, metric := range metrics { + e.acc.AddMetric(metric) + } + } + + if err := scanner.Err(); err != nil { + e.acc.AddError(fmt.Errorf("E! [inputs.execd] Error reading stdout: %s", err)) + } +} + +func (e *Execd) cmdReadErr(out io.Reader) { + scanner := bufio.NewScanner(out) + + for scanner.Scan() { + log.Printf("E! [inputs.execd] stderr: %q", scanner.Text()) + } + + if err := scanner.Err(); err != nil { + e.acc.AddError(fmt.Errorf("E! [inputs.execd] Error reading stderr: %s", err)) + } +} + +func init() { + inputs.Add("execd", func() telegraf.Input { + return &Execd{ + Signal: "none", + RestartDelay: internal.Duration{Duration: 10 * time.Second}, + } + }) +} diff --git a/plugins/inputs/execd/execd_unix.go b/plugins/inputs/execd/execd_unix.go new file mode 100644 index 000000000..a092cfc80 --- /dev/null +++ b/plugins/inputs/execd/execd_unix.go @@ -0,0 +1,33 @@ +// +build !windows + +package execd + +import ( + "fmt" + "io" + "syscall" + + "github.com/influxdata/telegraf" +) + +func (e *Execd) Gather(acc telegraf.Accumulator) error { + if e.cmd == nil || e.cmd.Process == nil { + return nil + } + + switch e.Signal { + case "SIGHUP": + e.cmd.Process.Signal(syscall.SIGHUP) + case "SIGUSR1": + e.cmd.Process.Signal(syscall.SIGUSR1) + case "SIGUSR2": + e.cmd.Process.Signal(syscall.SIGUSR2) + case "STDIN": + io.WriteString(e.stdin, "\n") + case "none": + default: + return fmt.Errorf("invalid signal: %s", e.Signal) + } + + return nil +} diff --git a/plugins/inputs/execd/execd_win.go b/plugins/inputs/execd/execd_win.go new file mode 100644 index 000000000..85ced4a6a --- /dev/null +++ b/plugins/inputs/execd/execd_win.go @@ -0,0 +1,26 @@ +// +build windows + +package execd + +import ( + "fmt" + "io" + + "github.com/influxdata/telegraf" +) + +func (e *Execd) Gather(acc telegraf.Accumulator) error { + if e.cmd == nil || e.cmd.Process == nil { + return nil + } + + switch e.Signal { + case "STDIN": + io.WriteString(e.stdin, "\n") + case "none": + default: + return fmt.Errorf("invalid signal: %s", e.Signal) + } + + return nil +}