Adds command intervals to exec plugin

This commit is contained in:
Josh Palay 2015-09-23 11:21:42 -07:00 committed by Cameron Sparr
parent c4bbc18cb6
commit 0c6c5718fe
3 changed files with 242 additions and 35 deletions

View File

@ -10,6 +10,11 @@ setup the exec plugin with:
[[exec.commands]] [[exec.commands]]
command = "/usr/bin/mycollector --output=json" command = "/usr/bin/mycollector --output=json"
name = "mycollector" name = "mycollector"
interval = 10
``` ```
The name is used as a prefix for the measurements. The name is used as a prefix for the measurements.
The interval is used to determine how often a particular command should be run. Each
time the exec plugin runs, it will only run a particular command if it has been at least
`interval` seconds since the exec plugin last ran the command.

View File

@ -6,8 +6,10 @@ import (
"fmt" "fmt"
"github.com/gonuts/go-shellquote" "github.com/gonuts/go-shellquote"
"github.com/influxdb/telegraf/plugins" "github.com/influxdb/telegraf/plugins"
"math"
"os/exec" "os/exec"
"sync" "sync"
"time"
) )
const sampleConfig = ` const sampleConfig = `
@ -18,31 +20,45 @@ const sampleConfig = `
# name of the command (used as a prefix for measurements) # name of the command (used as a prefix for measurements)
name = "mycollector" name = "mycollector"
`
type Command struct { # Only run this command if it has been at least this many
Command string # seconds since it last ran
Name string interval = 10
} `
type Exec struct { type Exec struct {
Commands []*Command Commands []*Command
runner Runner runner Runner
clock Clock
}
type Command struct {
Command string
Name string
Interval int
lastRunAt time.Time
} }
type Runner interface { type Runner interface {
Run(string, ...string) ([]byte, error) Run(*Command) ([]byte, error)
} }
type CommandRunner struct { type Clock interface {
Now() time.Time
} }
func NewExec() *Exec { type CommandRunner struct{}
return &Exec{runner: CommandRunner{}}
}
func (c CommandRunner) Run(command string, args ...string) ([]byte, error) { type RealClock struct{}
cmd := exec.Command(command, args...)
func (c CommandRunner) Run(command *Command) ([]byte, error) {
command.lastRunAt = time.Now()
split_cmd, err := shellquote.Split(command.Command)
if err != nil || len(split_cmd) == 0 {
return nil, fmt.Errorf("exec: unable to parse command, %s", err)
}
cmd := exec.Command(split_cmd[0], split_cmd[1:]...)
var out bytes.Buffer var out bytes.Buffer
cmd.Stdout = &out cmd.Stdout = &out
@ -53,6 +69,14 @@ func (c CommandRunner) Run(command string, args ...string) ([]byte, error) {
return out.Bytes(), nil return out.Bytes(), nil
} }
func (c RealClock) Now() time.Time {
return time.Now()
}
func NewExec() *Exec {
return &Exec{runner: CommandRunner{}, clock: RealClock{}}
}
func (e *Exec) SampleConfig() string { func (e *Exec) SampleConfig() string {
return sampleConfig return sampleConfig
} }
@ -80,23 +104,28 @@ func (e *Exec) Gather(acc plugins.Accumulator) error {
} }
func (e *Exec) gatherCommand(c *Command, acc plugins.Accumulator) error { func (e *Exec) gatherCommand(c *Command, acc plugins.Accumulator) error {
words, err := shellquote.Split(c.Command) secondsSinceLastRun := 0.0
if err != nil || len(words) == 0 {
return fmt.Errorf("exec: unable to parse command, %s", err) if c.lastRunAt.Unix() == 0 { // means time is uninitialized
secondsSinceLastRun = math.Inf(1)
} else {
secondsSinceLastRun = (e.clock.Now().Sub(c.lastRunAt)).Seconds()
} }
out, err := e.runner.Run(words[0], words[1:]...) if secondsSinceLastRun >= float64(c.Interval) {
if err != nil { out, err := e.runner.Run(c)
return err if err != nil {
} return err
}
var jsonOut interface{} var jsonOut interface{}
err = json.Unmarshal(out, &jsonOut) err = json.Unmarshal(out, &jsonOut)
if err != nil { if err != nil {
return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s", c.Command, err) return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s", c.Command, err)
} }
processResponse(acc, c.Name, map[string]string{}, jsonOut) processResponse(acc, c.Name, map[string]string{}, jsonOut)
}
return nil return nil
} }

View File

@ -5,9 +5,14 @@ import (
"github.com/influxdb/telegraf/testutil" "github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"math"
"testing" "testing"
"time"
) )
// Midnight 9/22/2015
const baseTimeSeconds = 1442905200
const validJson = ` const validJson = `
{ {
"status": "green", "status": "green",
@ -32,24 +37,52 @@ type runnerMock struct {
err error err error
} }
func newRunnerMock(out []byte, err error) Runner { type clockMock struct {
return &runnerMock{out: out, err: err} now time.Time
} }
func (r runnerMock) Run(command string, args ...string) ([]byte, error) { func newRunnerMock(out []byte, err error) Runner {
return &runnerMock{
out: out,
err: err,
}
}
func (r runnerMock) Run(command *Command) ([]byte, error) {
if r.err != nil { if r.err != nil {
return nil, r.err return nil, r.err
} }
return r.out, nil return r.out, nil
} }
func newClockMock(now time.Time) Clock {
return &clockMock{now: now}
}
func (c clockMock) Now() time.Time {
return c.now
}
func TestExec(t *testing.T) { func TestExec(t *testing.T) {
runner := newRunnerMock([]byte(validJson), nil) runner := newRunnerMock([]byte(validJson), nil)
command := Command{Command: "testcommand arg1", Name: "mycollector"} clock := newClockMock(time.Unix(baseTimeSeconds+20, 0))
e := &Exec{runner: runner, Commands: []*Command{&command}} command := Command{
Command: "testcommand arg1",
Name: "mycollector",
Interval: 10,
lastRunAt: time.Unix(baseTimeSeconds, 0),
}
e := &Exec{
runner: runner,
clock: clock,
Commands: []*Command{&command},
}
var acc testutil.Accumulator var acc testutil.Accumulator
initialPoints := len(acc.Points)
err := e.Gather(&acc) err := e.Gather(&acc)
deltaPoints := len(acc.Points) - initialPoints
require.NoError(t, err) require.NoError(t, err)
checkFloat := []struct { checkFloat := []struct {
@ -66,24 +99,164 @@ func TestExec(t *testing.T) {
assert.True(t, acc.CheckValue(c.name, c.value)) assert.True(t, acc.CheckValue(c.name, c.value))
} }
assert.Equal(t, len(acc.Points), 4, "non-numeric measurements should be ignored") assert.Equal(t, deltaPoints, 4, "non-numeric measurements should be ignored")
} }
func TestExecMalformed(t *testing.T) { func TestExecMalformed(t *testing.T) {
runner := newRunnerMock([]byte(malformedJson), nil) runner := newRunnerMock([]byte(malformedJson), nil)
command := Command{Command: "badcommand arg1", Name: "mycollector"} clock := newClockMock(time.Unix(baseTimeSeconds+20, 0))
e := &Exec{runner: runner, Commands: []*Command{&command}} command := Command{
Command: "badcommand arg1",
Name: "mycollector",
Interval: 10,
lastRunAt: time.Unix(baseTimeSeconds, 0),
}
e := &Exec{
runner: runner,
clock: clock,
Commands: []*Command{&command},
}
var acc testutil.Accumulator var acc testutil.Accumulator
initialPoints := len(acc.Points)
err := e.Gather(&acc) err := e.Gather(&acc)
deltaPoints := len(acc.Points) - initialPoints
require.Error(t, err) require.Error(t, err)
assert.Equal(t, deltaPoints, 0, "No new points should have been added")
} }
func TestCommandError(t *testing.T) { func TestCommandError(t *testing.T) {
runner := newRunnerMock(nil, fmt.Errorf("exit status code 1")) runner := newRunnerMock(nil, fmt.Errorf("exit status code 1"))
command := Command{Command: "badcommand", Name: "mycollector"} clock := newClockMock(time.Unix(baseTimeSeconds+20, 0))
e := &Exec{runner: runner, Commands: []*Command{&command}} command := Command{
Command: "badcommand",
Name: "mycollector",
Interval: 10,
lastRunAt: time.Unix(baseTimeSeconds, 0),
}
e := &Exec{
runner: runner,
clock: clock,
Commands: []*Command{&command},
}
var acc testutil.Accumulator var acc testutil.Accumulator
initialPoints := len(acc.Points)
err := e.Gather(&acc) err := e.Gather(&acc)
deltaPoints := len(acc.Points) - initialPoints
require.Error(t, err) require.Error(t, err)
assert.Equal(t, deltaPoints, 0, "No new points should have been added")
}
func TestExecNotEnoughTime(t *testing.T) {
runner := newRunnerMock([]byte(validJson), nil)
clock := newClockMock(time.Unix(baseTimeSeconds+5, 0))
command := Command{
Command: "testcommand arg1",
Name: "mycollector",
Interval: 10,
lastRunAt: time.Unix(baseTimeSeconds, 0),
}
e := &Exec{
runner: runner,
clock: clock,
Commands: []*Command{&command},
}
var acc testutil.Accumulator
initialPoints := len(acc.Points)
err := e.Gather(&acc)
deltaPoints := len(acc.Points) - initialPoints
require.NoError(t, err)
assert.Equal(t, deltaPoints, 0, "No new points should have been added")
}
func TestExecUninitializedLastRunAt(t *testing.T) {
runner := newRunnerMock([]byte(validJson), nil)
clock := newClockMock(time.Unix(baseTimeSeconds, 0))
command := Command{
Command: "testcommand arg1",
Name: "mycollector",
Interval: math.MaxInt32,
// Uninitialized lastRunAt should default to time.Unix(0, 0), so this should
// run no matter what the interval is
}
e := &Exec{
runner: runner,
clock: clock,
Commands: []*Command{&command},
}
var acc testutil.Accumulator
initialPoints := len(acc.Points)
err := e.Gather(&acc)
deltaPoints := len(acc.Points) - initialPoints
require.NoError(t, err)
checkFloat := []struct {
name string
value float64
}{
{"mycollector_num_processes", 82},
{"mycollector_cpu_used", 8234},
{"mycollector_cpu_free", 32},
{"mycollector_percent", 0.81},
}
for _, c := range checkFloat {
assert.True(t, acc.CheckValue(c.name, c.value))
}
assert.Equal(t, deltaPoints, 4, "non-numeric measurements should be ignored")
}
func TestExecOneNotEnoughTimeAndOneEnoughTime(t *testing.T) {
runner := newRunnerMock([]byte(validJson), nil)
clock := newClockMock(time.Unix(baseTimeSeconds+5, 0))
notEnoughTimeCommand := Command{
Command: "testcommand arg1",
Name: "mycollector",
Interval: 10,
lastRunAt: time.Unix(baseTimeSeconds, 0),
}
enoughTimeCommand := Command{
Command: "testcommand arg1",
Name: "mycollector",
Interval: 3,
lastRunAt: time.Unix(baseTimeSeconds, 0),
}
e := &Exec{
runner: runner,
clock: clock,
Commands: []*Command{&notEnoughTimeCommand, &enoughTimeCommand},
}
var acc testutil.Accumulator
initialPoints := len(acc.Points)
err := e.Gather(&acc)
deltaPoints := len(acc.Points) - initialPoints
require.NoError(t, err)
checkFloat := []struct {
name string
value float64
}{
{"mycollector_num_processes", 82},
{"mycollector_cpu_used", 8234},
{"mycollector_cpu_free", 32},
{"mycollector_percent", 0.81},
}
for _, c := range checkFloat {
assert.True(t, acc.CheckValue(c.name, c.value))
}
assert.Equal(t, deltaPoints, 4, "Only one command should have been run")
} }