Implement timeouts for all exec command runners

First is to write an internal CombinedOutput and Run function with a
timeout.

Second, the following instances of command runners need to have timeouts:

    plugins/inputs/ping/ping.go
    125:	out, err := c.CombinedOutput()

    plugins/inputs/exec/exec.go
    91:	if err := cmd.Run(); err != nil {

    plugins/inputs/ipmi_sensor/command.go
    31:	err := cmd.Run()

    plugins/inputs/sysstat/sysstat.go
    194:	out, err := cmd.CombinedOutput()

    plugins/inputs/leofs/leofs.go
    185:	defer cmd.Wait()

    plugins/inputs/sysstat/sysstat.go
    282:	if err := cmd.Wait(); err != nil {

closes #1067
This commit is contained in:
Cameron Sparr 2016-04-28 19:23:45 -06:00
parent cbe32c7482
commit 3f807a9432
8 changed files with 210 additions and 55 deletions

View File

@ -2,13 +2,16 @@ package internal
import ( import (
"bufio" "bufio"
"bytes"
"crypto/rand" "crypto/rand"
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log"
"os" "os"
"os/exec"
"strings" "strings"
"time" "time"
"unicode" "unicode"
@ -16,6 +19,12 @@ import (
const alphanum string = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" const alphanum string = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
var (
TimeoutErr = errors.New("Command timed out.")
NotImplementedError = errors.New("not implemented yet")
)
// Duration just wraps time.Duration // Duration just wraps time.Duration
type Duration struct { type Duration struct {
Duration time.Duration Duration time.Duration
@ -33,8 +42,6 @@ func (d *Duration) UnmarshalTOML(b []byte) error {
return nil return nil
} }
var NotImplementedError = errors.New("not implemented yet")
// ReadLines reads contents from a file and splits them by new lines. // ReadLines reads contents from a file and splits them by new lines.
// A convenience wrapper to ReadLinesOffsetN(filename, 0, -1). // A convenience wrapper to ReadLinesOffsetN(filename, 0, -1).
func ReadLines(filename string) ([]string, error) { func ReadLines(filename string) ([]string, error) {
@ -139,3 +146,48 @@ func SnakeCase(in string) string {
return string(out) return string(out)
} }
// CombinedOutputTimeout runs the given command with the given timeout and
// returns the combined output of stdout and stderr.
// If the command times out, it attempts to kill the process.
func CombinedOutputTimeout(c *exec.Cmd, timeout time.Duration) ([]byte, error) {
var b bytes.Buffer
c.Stdout = &b
c.Stderr = &b
if err := c.Start(); err != nil {
return nil, err
}
err := WaitTimeout(c, timeout)
return b.Bytes(), err
}
// RunTimeout runs the given command with the given timeout.
// If the command times out, it attempts to kill the process.
func RunTimeout(c *exec.Cmd, timeout time.Duration) error {
if err := c.Start(); err != nil {
return err
}
return WaitTimeout(c, timeout)
}
// WaitTimeout waits for the given command to finish with a timeout.
// It assumes the command has already been started.
// If the command times out, it attempts to kill the process.
func WaitTimeout(c *exec.Cmd, timeout time.Duration) error {
timer := time.NewTimer(timeout)
done := make(chan error)
go func() { done <- c.Wait() }()
select {
case err := <-done:
timer.Stop()
return err
case <-timer.C:
if err := c.Process.Kill(); err != nil {
log.Printf("FATAL error killing process: %s", err)
return err
}
// wait for the command to return after killing it
<-done
return TimeoutErr
}
}

View File

@ -1,6 +1,12 @@
package internal package internal
import "testing" import (
"os/exec"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
type SnakeTest struct { type SnakeTest struct {
input string input string
@ -30,3 +36,73 @@ func TestSnakeCase(t *testing.T) {
} }
} }
} }
var (
sleepbin, _ = exec.LookPath("sleep")
echobin, _ = exec.LookPath("echo")
)
func TestRunTimeout(t *testing.T) {
if sleepbin == "" {
t.Skip("'sleep' binary not available on OS, skipping.")
}
cmd := exec.Command(sleepbin, "10")
start := time.Now()
err := RunTimeout(cmd, time.Millisecond*20)
elapsed := time.Since(start)
assert.Equal(t, TimeoutErr, err)
// Verify that command gets killed in 20ms, with some breathing room
assert.True(t, elapsed < time.Millisecond*75)
}
func TestCombinedOutputTimeout(t *testing.T) {
if sleepbin == "" {
t.Skip("'sleep' binary not available on OS, skipping.")
}
cmd := exec.Command(sleepbin, "10")
start := time.Now()
_, err := CombinedOutputTimeout(cmd, time.Millisecond*20)
elapsed := time.Since(start)
assert.Equal(t, TimeoutErr, err)
// Verify that command gets killed in 20ms, with some breathing room
assert.True(t, elapsed < time.Millisecond*75)
}
func TestCombinedOutput(t *testing.T) {
if echobin == "" {
t.Skip("'echo' binary not available on OS, skipping.")
}
cmd := exec.Command(echobin, "foo")
out, err := CombinedOutputTimeout(cmd, time.Second)
assert.NoError(t, err)
assert.Equal(t, "foo\n", string(out))
}
// test that CombinedOutputTimeout and exec.Cmd.CombinedOutput return
// the same output from a failed command.
func TestCombinedOutputError(t *testing.T) {
if sleepbin == "" {
t.Skip("'sleep' binary not available on OS, skipping.")
}
cmd := exec.Command(sleepbin, "foo")
expected, err := cmd.CombinedOutput()
cmd2 := exec.Command(sleepbin, "foo")
actual, err := CombinedOutputTimeout(cmd2, time.Second)
assert.Error(t, err)
assert.Equal(t, expected, actual)
}
func TestRunError(t *testing.T) {
if sleepbin == "" {
t.Skip("'sleep' binary not available on OS, skipping.")
}
cmd := exec.Command(sleepbin, "foo")
err := RunTimeout(cmd, time.Second)
assert.Error(t, err)
}

View File

@ -6,10 +6,12 @@ import (
"os/exec" "os/exec"
"sync" "sync"
"syscall" "syscall"
"time"
"github.com/gonuts/go-shellquote" "github.com/gonuts/go-shellquote"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/nagios" "github.com/influxdata/telegraf/plugins/parsers/nagios"
@ -19,6 +21,9 @@ const sampleConfig = `
## Commands array ## Commands array
commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"] commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"]
## Timeout for each command to complete.
timeout = "5s"
## measurement name suffix (for separating different commands) ## measurement name suffix (for separating different commands)
name_suffix = "_mycollector" name_suffix = "_mycollector"
@ -32,6 +37,7 @@ const sampleConfig = `
type Exec struct { type Exec struct {
Commands []string Commands []string
Command string Command string
Timeout internal.Duration
parser parsers.Parser parser parsers.Parser
@ -43,7 +49,8 @@ type Exec struct {
func NewExec() *Exec { func NewExec() *Exec {
return &Exec{ return &Exec{
runner: CommandRunner{}, runner: CommandRunner{},
Timeout: internal.Duration{Duration: time.Second * 5},
} }
} }
@ -73,7 +80,11 @@ func AddNagiosState(exitCode error, acc telegraf.Accumulator) error {
return nil return nil
} }
func (c CommandRunner) Run(e *Exec, command string, acc telegraf.Accumulator) ([]byte, error) { func (c CommandRunner) Run(
e *Exec,
command string,
acc telegraf.Accumulator,
) ([]byte, error) {
split_cmd, err := shellquote.Split(command) split_cmd, err := shellquote.Split(command)
if err != nil || len(split_cmd) == 0 { if err != nil || len(split_cmd) == 0 {
return nil, fmt.Errorf("exec: unable to parse command, %s", err) return nil, fmt.Errorf("exec: unable to parse command, %s", err)
@ -84,7 +95,7 @@ func (c CommandRunner) Run(e *Exec, command string, acc telegraf.Accumulator) ([
var out bytes.Buffer var out bytes.Buffer
cmd.Stdout = &out cmd.Stdout = &out
if err := cmd.Run(); err != nil { if err := internal.RunTimeout(cmd, e.Timeout.Duration); err != nil {
switch e.parser.(type) { switch e.parser.(type) {
case *nagios.NagiosParser: case *nagios.NagiosParser:
AddNagiosState(err, acc) AddNagiosState(err, acc)

View File

@ -1,10 +1,12 @@
package ipmi_sensor package ipmi_sensor
import ( import (
"bytes"
"fmt" "fmt"
"os/exec" "os/exec"
"strings" "strings"
"time"
"github.com/influxdata/telegraf/internal"
) )
type CommandRunner struct{} type CommandRunner struct{}
@ -18,21 +20,16 @@ func (t CommandRunner) cmd(conn *Connection, args ...string) *exec.Cmd {
} }
return exec.Command(path, opts...) return exec.Command(path, opts...)
} }
func (t CommandRunner) Run(conn *Connection, args ...string) (string, error) { func (t CommandRunner) Run(conn *Connection, args ...string) (string, error) {
cmd := t.cmd(conn, args...) cmd := t.cmd(conn, args...)
var stdout bytes.Buffer
var stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err := cmd.Run() output, err := internal.CombinedOutputTimeout(cmd, time.Second*5)
if err != nil { if err != nil {
return "", fmt.Errorf("run %s %s: %s (%s)", return "", fmt.Errorf("run %s %s: %s (%s)",
cmd.Path, strings.Join(cmd.Args, " "), stderr.String(), err) cmd.Path, strings.Join(cmd.Args, " "), string(output), err)
} }
return stdout.String(), err return string(output), err
} }

View File

@ -3,13 +3,16 @@ package leofs
import ( import (
"bufio" "bufio"
"fmt" "fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"net/url" "net/url"
"os/exec" "os/exec"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
) )
const oid = ".1.3.6.1.4.1.35450" const oid = ".1.3.6.1.4.1.35450"
@ -175,14 +178,18 @@ func (l *LeoFS) Gather(acc telegraf.Accumulator) error {
return outerr return outerr
} }
func (l *LeoFS) gatherServer(endpoint string, serverType ServerType, acc telegraf.Accumulator) error { func (l *LeoFS) gatherServer(
endpoint string,
serverType ServerType,
acc telegraf.Accumulator,
) error {
cmd := exec.Command("snmpwalk", "-v2c", "-cpublic", endpoint, oid) cmd := exec.Command("snmpwalk", "-v2c", "-cpublic", endpoint, oid)
stdout, err := cmd.StdoutPipe() stdout, err := cmd.StdoutPipe()
if err != nil { if err != nil {
return err return err
} }
cmd.Start() cmd.Start()
defer cmd.Wait() defer internal.WaitTimeout(cmd, time.Second*5)
scanner := bufio.NewScanner(stdout) scanner := bufio.NewScanner(stdout)
if !scanner.Scan() { if !scanner.Scan() {
return fmt.Errorf("Unable to retrieve the node name") return fmt.Errorf("Unable to retrieve the node name")

View File

@ -9,15 +9,17 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
// HostPinger is a function that runs the "ping" function using a list of // HostPinger is a function that runs the "ping" function using a list of
// passed arguments. This can be easily switched with a mocked ping function // passed arguments. This can be easily switched with a mocked ping function
// for unit test purposes (see ping_test.go) // for unit test purposes (see ping_test.go)
type HostPinger func(args ...string) (string, error) type HostPinger func(timeout float64, args ...string) (string, error)
type Ping struct { type Ping struct {
// Interval at which to ping (ping -i <INTERVAL>) // Interval at which to ping (ping -i <INTERVAL>)
@ -74,7 +76,7 @@ func (p *Ping) Gather(acc telegraf.Accumulator) error {
go func(u string) { go func(u string) {
defer wg.Done() defer wg.Done()
args := p.args(u) args := p.args(u)
out, err := p.pingHost(args...) out, err := p.pingHost(p.Timeout, args...)
if err != nil { if err != nil {
// Combine go err + stderr output // Combine go err + stderr output
errorChannel <- errors.New( errorChannel <- errors.New(
@ -116,13 +118,14 @@ func (p *Ping) Gather(acc telegraf.Accumulator) error {
return errors.New(strings.Join(errorStrings, "\n")) return errors.New(strings.Join(errorStrings, "\n"))
} }
func hostPinger(args ...string) (string, error) { func hostPinger(timeout float64, args ...string) (string, error) {
bin, err := exec.LookPath("ping") bin, err := exec.LookPath("ping")
if err != nil { if err != nil {
return "", err return "", err
} }
c := exec.Command(bin, args...) c := exec.Command(bin, args...)
out, err := c.CombinedOutput() out, err := internal.CombinedOutputTimeout(c,
time.Second*time.Duration(timeout+1))
return string(out), err return string(out), err
} }

View File

@ -124,7 +124,7 @@ func TestArgs(t *testing.T) {
"Expected: %s Actual: %s", expected, actual) "Expected: %s Actual: %s", expected, actual)
} }
func mockHostPinger(args ...string) (string, error) { func mockHostPinger(timeout float64, args ...string) (string, error) {
return linuxPingOutput, nil return linuxPingOutput, nil
} }
@ -161,7 +161,7 @@ PING www.google.com (216.58.218.164) 56(84) bytes of data.
rtt min/avg/max/mdev = 35.225/44.033/51.806/5.325 ms rtt min/avg/max/mdev = 35.225/44.033/51.806/5.325 ms
` `
func mockLossyHostPinger(args ...string) (string, error) { func mockLossyHostPinger(timeout float64, args ...string) (string, error) {
return lossyPingOutput, nil return lossyPingOutput, nil
} }
@ -192,7 +192,7 @@ Request timeout for icmp_seq 0
2 packets transmitted, 0 packets received, 100.0% packet loss 2 packets transmitted, 0 packets received, 100.0% packet loss
` `
func mockErrorHostPinger(args ...string) (string, error) { func mockErrorHostPinger(timeout float64, args ...string) (string, error) {
return errorPingOutput, errors.New("No packets received") return errorPingOutput, errors.New("No packets received")
} }
@ -215,7 +215,7 @@ func TestBadPingGather(t *testing.T) {
acc.AssertContainsTaggedFields(t, "ping", fields, tags) acc.AssertContainsTaggedFields(t, "ping", fields, tags)
} }
func mockFatalHostPinger(args ...string) (string, error) { func mockFatalHostPinger(timeout float64, args ...string) (string, error) {
return fatalPingOutput, errors.New("So very bad") return fatalPingOutput, errors.New("So very bad")
} }

View File

@ -17,6 +17,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -98,31 +99,34 @@ var sampleConfig = `
# group = true # group = true
# #
# #
## Options for the sadf command. The values on the left represent the sadf options and ## Options for the sadf command. The values on the left represent the sadf
## the values on the right their description (wich are used for grouping and prefixing metrics). ## options and the values on the right their description (wich are used for
## grouping and prefixing metrics).
## ##
## Run 'sar -h' or 'man sar' to find out the supported options for your sysstat version. ## Run 'sar -h' or 'man sar' to find out the supported options for your
## sysstat version.
[inputs.sysstat.options] [inputs.sysstat.options]
-C = "cpu" -C = "cpu"
-B = "paging" -B = "paging"
-b = "io" -b = "io"
-d = "disk" # requires DISK activity -d = "disk" # requires DISK activity
"-n ALL" = "network" "-n ALL" = "network"
"-P ALL" = "per_cpu" "-P ALL" = "per_cpu"
-q = "queue" -q = "queue"
-R = "mem" -R = "mem"
-r = "mem_util" -r = "mem_util"
-S = "swap_util" -S = "swap_util"
-u = "cpu_util" -u = "cpu_util"
-v = "inode" -v = "inode"
-W = "swap" -W = "swap"
-w = "task" -w = "task"
# -H = "hugepages" # only available for newer linux distributions # -H = "hugepages" # only available for newer linux distributions
# "-I ALL" = "interrupts" # requires INT activity # "-I ALL" = "interrupts" # requires INT activity
# #
# #
## Device tags can be used to add additional tags for devices. For example the configuration below ## Device tags can be used to add additional tags for devices.
## adds a tag vg with value rootvg for all metrics with sda devices. ## For example the configuration below adds a tag vg with value rootvg for
## all metrics with sda devices.
# [[inputs.sysstat.device_tags.sda]] # [[inputs.sysstat.device_tags.sda]]
# vg = "rootvg" # vg = "rootvg"
` `
@ -174,24 +178,28 @@ func (s *Sysstat) Gather(acc telegraf.Accumulator) error {
return errors.New(strings.Join(errorStrings, "\n")) return errors.New(strings.Join(errorStrings, "\n"))
} }
// collect collects sysstat data with the collector utility sadc. It runs the following command: // collect collects sysstat data with the collector utility sadc.
// It runs the following command:
// Sadc -S <Activity1> -S <Activity2> ... <collectInterval> 2 tmpFile // Sadc -S <Activity1> -S <Activity2> ... <collectInterval> 2 tmpFile
// The above command collects system metrics during <collectInterval> and saves it in binary form to tmpFile. // The above command collects system metrics during <collectInterval> and
// saves it in binary form to tmpFile.
func (s *Sysstat) collect() error { func (s *Sysstat) collect() error {
options := []string{} options := []string{}
for _, act := range s.Activities { for _, act := range s.Activities {
options = append(options, "-S", act) options = append(options, "-S", act)
} }
s.tmpFile = path.Join("/tmp", fmt.Sprintf("sysstat-%d", time.Now().Unix())) s.tmpFile = path.Join("/tmp", fmt.Sprintf("sysstat-%d", time.Now().Unix()))
collectInterval := s.interval - parseInterval // collectInterval has to be smaller than the telegraf data collection interval // collectInterval has to be smaller than the telegraf data collection interval
collectInterval := s.interval - parseInterval
if collectInterval < 0 { // If true, interval is not defined yet and Gather is run for the first time. // If true, interval is not defined yet and Gather is run for the first time.
if collectInterval < 0 {
collectInterval = 1 // In that case we only collect for 1 second. collectInterval = 1 // In that case we only collect for 1 second.
} }
options = append(options, strconv.Itoa(collectInterval), "2", s.tmpFile) options = append(options, strconv.Itoa(collectInterval), "2", s.tmpFile)
cmd := execCommand(s.Sadc, options...) cmd := execCommand(s.Sadc, options...)
out, err := cmd.CombinedOutput() out, err := internal.CombinedOutputTimeout(cmd, time.Second*5)
if err != nil { if err != nil {
return fmt.Errorf("failed to run command %s: %s", strings.Join(cmd.Args, " "), string(out)) return fmt.Errorf("failed to run command %s: %s", strings.Join(cmd.Args, " "), string(out))
} }
@ -279,8 +287,9 @@ func (s *Sysstat) parse(acc telegraf.Accumulator, option string, ts time.Time) e
acc.AddFields(measurement, v.fields, v.tags, ts) acc.AddFields(measurement, v.fields, v.tags, ts)
} }
} }
if err := cmd.Wait(); err != nil { if err := internal.WaitTimeout(cmd, time.Second*5); err != nil {
return fmt.Errorf("command %s failed with %s", strings.Join(cmd.Args, " "), err) return fmt.Errorf("command %s failed with %s",
strings.Join(cmd.Args, " "), err)
} }
return nil return nil
} }