telegraf/plugins/inputs/exec/exec.go

176 lines
3.6 KiB
Go

package exec
import (
"bytes"
"fmt"
"os/exec"
"path/filepath"
"sync"
"syscall"
"github.com/gonuts/go-shellquote"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
)
const sampleConfig = `
## Commands array
commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"]
## measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
`
type Exec struct {
Commands []string
Command string
parser parsers.Parser
wg sync.WaitGroup
runner Runner
errChan chan error
}
func NewExec() *Exec {
return &Exec{
runner: CommandRunner{},
}
}
type Runner interface {
Run(*Exec, string, telegraf.Accumulator) ([]byte, error)
}
type CommandRunner struct{}
func AddNagiosState(exitCode error, acc telegraf.Accumulator) error {
nagiosState := 0
if exitCode != nil {
exiterr, ok := exitCode.(*exec.ExitError)
if ok {
status, ok := exiterr.Sys().(syscall.WaitStatus)
if ok {
nagiosState = status.ExitStatus()
} else {
return fmt.Errorf("exec: unable to get nagios plugin exit code")
}
} else {
return fmt.Errorf("exec: unable to get nagios plugin exit code")
}
}
fields := map[string]interface{}{"state": nagiosState}
acc.AddFields("nagios_state", fields, nil)
return nil
}
func (c CommandRunner) Run(e *Exec, command string, acc telegraf.Accumulator) ([]byte, error) {
split_cmd, err := shellquote.Split(command)
if err != nil || len(split_cmd) == 0 {
return nil, fmt.Errorf("exec: unable to parse command, %s", err)
}
cmd := exec.Command(split_cmd[0], split_cmd[1:]...)
var out bytes.Buffer
cmd.Stdout = &out
if err := cmd.Run(); err != nil {
switch e.parser.(type) {
case *nagios.NagiosParser:
AddNagiosState(err, acc)
default:
return nil, fmt.Errorf("exec: %s for command '%s'", err, command)
}
} else {
switch e.parser.(type) {
case *nagios.NagiosParser:
AddNagiosState(nil, acc)
}
}
return out.Bytes(), nil
}
func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator) {
defer e.wg.Done()
out, err := e.runner.Run(e, command, acc)
if err != nil {
e.errChan <- err
return
}
metrics, err := e.parser.Parse(out)
if err != nil {
e.errChan <- err
} else {
for _, metric := range metrics {
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
}
}
func (e *Exec) SampleConfig() string {
return sampleConfig
}
func (e *Exec) Description() string {
return "Read metrics from one or more commands that can output to stdout"
}
func (e *Exec) SetParser(parser parsers.Parser) {
e.parser = parser
}
func (e *Exec) Gather(acc telegraf.Accumulator) error {
// Legacy single command support
if e.Command != "" {
e.Commands = append(e.Commands, e.Command)
e.Command = ""
}
commands := make([]string, len(e.Commands))
for _, pattern := range e.Commands {
matches, err := filepath.Glob(pattern)
if err != nil {
return err
}
commands = append(commands, matches...)
}
e.errChan = make(chan error, len(commands))
e.wg.Add(len(commands))
for _, command := range commands {
go e.ProcessCommand(command, acc)
}
e.wg.Wait()
select {
default:
close(e.errChan)
return nil
case err := <-e.errChan:
close(e.errChan)
return err
}
}
func init() {
inputs.Add("exec", func() telegraf.Input {
return NewExec()
})
}