Add nagios parser for exec input plugin

closes #762
This commit is contained in:
Thibault Cohen 2016-02-24 23:32:22 -05:00 committed by Michele Fadda
parent f188f8eade
commit f603b4bfc8
8 changed files with 264 additions and 8 deletions

View File

@ -10,6 +10,7 @@
- [#844](https://github.com/influxdata/telegraf/pull/844): postgres_extensible plugin added. Thanks @menardorama! - [#844](https://github.com/influxdata/telegraf/pull/844): postgres_extensible plugin added. Thanks @menardorama!
- [#866](https://github.com/influxdata/telegraf/pull/866): couchbase input plugin. Thanks @ljosa! - [#866](https://github.com/influxdata/telegraf/pull/866): couchbase input plugin. Thanks @ljosa!
- [#789](https://github.com/influxdata/telegraf/pull/789): Support multiple field specification and `field*` in graphite templates. Thanks @chrusty! - [#789](https://github.com/influxdata/telegraf/pull/789): Support multiple field specification and `field*` in graphite templates. Thanks @chrusty!
- [#762](https://github.com/influxdata/telegraf/pull/762): Nagios parser for the exec plugin. Thanks @titilambert!
### Bugfixes ### Bugfixes
- [#890](https://github.com/influxdata/telegraf/issues/890): Create TLS config even if only ssl_ca is provided. - [#890](https://github.com/influxdata/telegraf/issues/890): Create TLS config even if only ssl_ca is provided.

View File

@ -191,7 +191,7 @@ Currently implemented sources:
* docker * docker
* dovecot * dovecot
* elasticsearch * elasticsearch
* exec (generic executable plugin, support JSON, influx and graphite) * exec (generic executable plugin, support JSON, influx, graphite and nagios)
* haproxy * haproxy
* httpjson (generic JSON-emitting http service plugin) * httpjson (generic JSON-emitting http service plugin)
* influxdb * influxdb

View File

@ -326,3 +326,27 @@ There are many more options available,
"measurement*" "measurement*"
] ]
``` ```
## Nagios:
There are no additional configuration options for Nagios line-protocol. The
metrics are parsed directly into Telegraf metrics.
Note: Nagios Input Data Formats is only supported in `exec` input plugin.
#### Nagios Configuration:
```toml
[[inputs.exec]]
## Commands array
commands = ["/usr/lib/nagios/plugins/check_load", "-w 5,6,7 -c 7,8,9"]
## measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
## Data format to consume. This can be "json", "influx", "graphite" or "nagios"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "nagios"
```

View File

@ -5,12 +5,14 @@ import (
"fmt" "fmt"
"os/exec" "os/exec"
"sync" "sync"
"syscall"
"github.com/gonuts/go-shellquote" "github.com/gonuts/go-shellquote"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
) )
const sampleConfig = ` const sampleConfig = `
@ -20,7 +22,7 @@ const sampleConfig = `
## measurement name suffix (for separating different commands) ## measurement name suffix (for separating different commands)
name_suffix = "_mycollector" name_suffix = "_mycollector"
## Data format to consume. This can be "json", "influx" or "graphite" ## Data format to consume. This can be "json", "influx", "graphite" or "nagios
## Each data format has it's own unique set of configuration options, read ## Each data format has it's own unique set of configuration options, read
## more about them here: ## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
@ -46,12 +48,32 @@ func NewExec() *Exec {
} }
type Runner interface { type Runner interface {
Run(*Exec, string) ([]byte, error) Run(*Exec, string, telegraf.Accumulator) ([]byte, error)
} }
type CommandRunner struct{} type CommandRunner struct{}
func (c CommandRunner) Run(e *Exec, command string) ([]byte, error) { func AddNagiosState(exitCode error, acc telegraf.Accumulator) error {
nagiosState := 0
if exitCode != nil {
exiterr, ok := exitCode.(*exec.ExitError)
if ok {
status, ok := exiterr.Sys().(syscall.WaitStatus)
if ok {
nagiosState = status.ExitStatus()
} else {
return fmt.Errorf("exec: unable to get nagios plugin exit code")
}
} else {
return fmt.Errorf("exec: unable to get nagios plugin exit code")
}
}
fields := map[string]interface{}{"state": nagiosState}
acc.AddFields("nagios_state", fields, nil)
return nil
}
func (c CommandRunner) Run(e *Exec, command string, acc telegraf.Accumulator) ([]byte, error) {
split_cmd, err := shellquote.Split(command) split_cmd, err := shellquote.Split(command)
if err != nil || len(split_cmd) == 0 { if err != nil || len(split_cmd) == 0 {
return nil, fmt.Errorf("exec: unable to parse command, %s", err) return nil, fmt.Errorf("exec: unable to parse command, %s", err)
@ -63,7 +85,17 @@ func (c CommandRunner) Run(e *Exec, command string) ([]byte, error) {
cmd.Stdout = &out cmd.Stdout = &out
if err := cmd.Run(); err != nil { if err := cmd.Run(); err != nil {
return nil, fmt.Errorf("exec: %s for command '%s'", err, command) switch e.parser.(type) {
case *nagios.NagiosParser:
AddNagiosState(err, acc)
default:
return nil, fmt.Errorf("exec: %s for command '%s'", err, command)
}
} else {
switch e.parser.(type) {
case *nagios.NagiosParser:
AddNagiosState(nil, acc)
}
} }
return out.Bytes(), nil return out.Bytes(), nil
@ -72,7 +104,7 @@ func (c CommandRunner) Run(e *Exec, command string) ([]byte, error) {
func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator) { func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator) {
defer e.wg.Done() defer e.wg.Done()
out, err := e.runner.Run(e, command) out, err := e.runner.Run(e, command, acc)
if err != nil { if err != nil {
e.errChan <- err e.errChan <- err
return return

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"testing" "testing"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
@ -57,7 +58,7 @@ func newRunnerMock(out []byte, err error) Runner {
} }
} }
func (r runnerMock) Run(e *Exec, command string) ([]byte, error) { func (r runnerMock) Run(e *Exec, command string, acc telegraf.Accumulator) ([]byte, error) {
if r.err != nil { if r.err != nil {
return nil, r.err return nil, r.err
} }

View File

@ -0,0 +1,102 @@
package nagios
import (
"regexp"
"strings"
"time"
"github.com/influxdata/telegraf"
)
type NagiosParser struct {
MetricName string
DefaultTags map[string]string
}
// Got from Alignak
// https://github.com/Alignak-monitoring/alignak/blob/develop/alignak/misc/perfdata.py
var perfSplitRegExp, _ = regexp.Compile(`([^=]+=\S+)`)
var nagiosRegExp, _ = regexp.Compile(`^([^=]+)=([\d\.\-\+eE]+)([\w\/%]*);?([\d\.\-\+eE:~@]+)?;?([\d\.\-\+eE:~@]+)?;?([\d\.\-\+eE]+)?;?([\d\.\-\+eE]+)?;?\s*`)
func (p *NagiosParser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line))
return metrics[0], err
}
func (p *NagiosParser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}
//> rta,host=absol,unit=ms critical=6000,min=0,value=0.332,warning=4000 1456374625003628099
//> pl,host=absol,unit=% critical=90,min=0,value=0,warning=80 1456374625003693967
func (p *NagiosParser) Parse(buf []byte) ([]telegraf.Metric, error) {
metrics := make([]telegraf.Metric, 0)
// Convert to string
out := string(buf)
// Prepare output for splitting
// Delete escaped pipes
out = strings.Replace(out, `\|`, "___PROTECT_PIPE___", -1)
// Split lines and get the first one
lines := strings.Split(out, "\n")
// Split output and perfdatas
data_splitted := strings.Split(lines[0], "|")
if len(data_splitted) <= 1 {
// No pipe == no perf data
return nil, nil
}
// Get perfdatas
perfdatas := data_splitted[1]
// Add escaped pipes
perfdatas = strings.Replace(perfdatas, "___PROTECT_PIPE___", `\|`, -1)
// Split perfs
unParsedPerfs := perfSplitRegExp.FindAllSubmatch([]byte(perfdatas), -1)
// Iterate on all perfs
for _, unParsedPerfs := range unParsedPerfs {
// Get metrics
// Trim perf
trimedPerf := strings.Trim(string(unParsedPerfs[0]), " ")
// Parse perf
perf := nagiosRegExp.FindAllSubmatch([]byte(trimedPerf), -1)
// Bad string
if len(perf) == 0 {
continue
}
if len(perf[0]) <= 2 {
continue
}
if perf[0][1] == nil || perf[0][2] == nil {
continue
}
fieldName := string(perf[0][1])
tags := make(map[string]string)
if perf[0][3] != nil {
tags["unit"] = string(perf[0][3])
}
fields := make(map[string]interface{})
fields["value"] = perf[0][2]
// TODO should we set empty field
// if metric if there is no data ?
if perf[0][4] != nil {
fields["warning"] = perf[0][4]
}
if perf[0][5] != nil {
fields["critical"] = perf[0][5]
}
if perf[0][6] != nil {
fields["min"] = perf[0][6]
}
if perf[0][7] != nil {
fields["max"] = perf[0][7]
}
// Create metric
metric, err := telegraf.NewMetric(fieldName, tags, fields, time.Now().UTC())
if err != nil {
return nil, err
}
// Add Metric
metrics = append(metrics, metric)
}
return metrics, nil
}

View File

@ -0,0 +1,89 @@
package nagios
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const validOutput1 = `PING OK - Packet loss = 0%, RTA = 0.30 ms|rta=0.298000ms;4000.000000;6000.000000;0.000000 pl=0%;80;90;0;100
This is a long output
with three lines
`
const validOutput2 = "TCP OK - 0.008 second response time on port 80|time=0.008457s;;;0.000000;10.000000"
const validOutput3 = "TCP OK - 0.008 second response time on port 80|time=0.008457"
const invalidOutput3 = "PING OK - Packet loss = 0%, RTA = 0.30 ms"
const invalidOutput4 = "PING OK - Packet loss = 0%, RTA = 0.30 ms| =3;;;; dgasdg =;;;; sff=;;;;"
func TestParseValidOutput(t *testing.T) {
parser := NagiosParser{
MetricName: "nagios_test",
}
// Output1
metrics, err := parser.Parse([]byte(validOutput1))
require.NoError(t, err)
assert.Len(t, metrics, 2)
// rta
assert.Equal(t, "rta", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"value": float64(0.298),
"warning": float64(4000),
"critical": float64(6000),
"min": float64(0),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{"unit": "ms"}, metrics[0].Tags())
// pl
assert.Equal(t, "pl", metrics[1].Name())
assert.Equal(t, map[string]interface{}{
"value": float64(0),
"warning": float64(80),
"critical": float64(90),
"min": float64(0),
"max": float64(100),
}, metrics[1].Fields())
assert.Equal(t, map[string]string{"unit": "%"}, metrics[1].Tags())
// Output2
metrics, err = parser.Parse([]byte(validOutput2))
require.NoError(t, err)
assert.Len(t, metrics, 1)
// time
assert.Equal(t, "time", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"value": float64(0.008457),
"min": float64(0),
"max": float64(10),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{"unit": "s"}, metrics[0].Tags())
// Output3
metrics, err = parser.Parse([]byte(validOutput3))
require.NoError(t, err)
assert.Len(t, metrics, 1)
// time
assert.Equal(t, "time", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"value": float64(0.008457),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{}, metrics[0].Tags())
}
func TestParseInvalidOutput(t *testing.T) {
parser := NagiosParser{
MetricName: "nagios_test",
}
// invalidOutput3
metrics, err := parser.Parse([]byte(invalidOutput3))
require.NoError(t, err)
assert.Len(t, metrics, 0)
// invalidOutput4
metrics, err = parser.Parse([]byte(invalidOutput4))
require.NoError(t, err)
assert.Len(t, metrics, 0)
}

View File

@ -8,6 +8,7 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/graphite" "github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
"github.com/influxdata/telegraf/plugins/parsers/value" "github.com/influxdata/telegraf/plugins/parsers/value"
) )
@ -39,7 +40,7 @@ type Parser interface {
// Config is a struct that covers the data types needed for all parser types, // Config is a struct that covers the data types needed for all parser types,
// and can be used to instantiate _any_ of the parsers. // and can be used to instantiate _any_ of the parsers.
type Config struct { type Config struct {
// Dataformat can be one of: json, influx, graphite, value // Dataformat can be one of: json, influx, graphite, value, nagios
DataFormat string DataFormat string
// Separator only applied to Graphite data. // Separator only applied to Graphite data.
@ -72,6 +73,8 @@ func NewParser(config *Config) (Parser, error) {
config.DataType, config.DefaultTags) config.DataType, config.DefaultTags)
case "influx": case "influx":
parser, err = NewInfluxParser() parser, err = NewInfluxParser()
case "nagios":
parser, err = NewNagiosParser()
case "graphite": case "graphite":
parser, err = NewGraphiteParser(config.Separator, parser, err = NewGraphiteParser(config.Separator,
config.Templates, config.DefaultTags) config.Templates, config.DefaultTags)
@ -94,6 +97,10 @@ func NewJSONParser(
return parser, nil return parser, nil
} }
func NewNagiosParser() (Parser, error) {
return &nagios.NagiosParser{}, nil
}
func NewInfluxParser() (Parser, error) { func NewInfluxParser() (Parser, error) {
return &influx.InfluxParser{}, nil return &influx.InfluxParser{}, nil
} }