Add Graphite line protocol parsing to exec plugin

closes #637
This commit is contained in:
Henry Hu 2016-02-01 11:43:38 +08:00 committed by Cameron Sparr
parent 6b06a23102
commit 1449c8b887
12 changed files with 951 additions and 60 deletions

View File

@ -159,7 +159,7 @@ Currently implemented sources:
* disque
* docker
* elasticsearch
* exec (generic JSON-emitting executable plugin)
* exec (generic executable plugin, support JSON, influx and graphite)
* haproxy
* httpjson (generic JSON-emitting http service plugin)
* influxdb

View File

@ -0,0 +1,31 @@
package encoding
import (
"fmt"
"github.com/influxdata/telegraf"
)
type Parser interface {
InitConfig(configs map[string]interface{}) error
Parse(buf []byte) ([]telegraf.Metric, error)
ParseLine(line string) (telegraf.Metric, error)
}
type Creator func() Parser
var Parsers = map[string]Creator{}
func Add(name string, creator Creator) {
Parsers[name] = creator
}
func NewParser(dataFormat string, configs map[string]interface{}) (parser Parser, err error) {
creator := Parsers[dataFormat]
if creator == nil {
return nil, fmt.Errorf("Unsupported data format: %s. ", dataFormat)
}
parser = creator()
err = parser.InitConfig(configs)
return parser, err
}

View File

@ -0,0 +1,135 @@
package graphite
import (
"fmt"
"strings"
)
const (
// DefaultSeparator is the default join character to use when joining multiple
// measurment parts in a template.
DefaultSeparator = "."
)
// Config represents the configuration for Graphite endpoints.
type Config struct {
Separator string
Templates []string
}
// Validate validates the config's templates and tags.
func (c *Config) Validate() error {
if err := c.validateTemplates(); err != nil {
return err
}
return nil
}
func (c *Config) validateTemplates() error {
// map to keep track of filters we see
filters := map[string]struct{}{}
for i, t := range c.Templates {
parts := strings.Fields(t)
// Ensure template string is non-empty
if len(parts) == 0 {
return fmt.Errorf("missing template at position: %d", i)
}
if len(parts) == 1 && parts[0] == "" {
return fmt.Errorf("missing template at position: %d", i)
}
if len(parts) > 3 {
return fmt.Errorf("invalid template format: '%s'", t)
}
template := t
filter := ""
tags := ""
if len(parts) >= 2 {
// We could have <filter> <template> or <template> <tags>. Equals is only allowed in
// tags section.
if strings.Contains(parts[1], "=") {
template = parts[0]
tags = parts[1]
} else {
filter = parts[0]
template = parts[1]
}
}
if len(parts) == 3 {
tags = parts[2]
}
// Validate the template has one and only one measurement
if err := c.validateTemplate(template); err != nil {
return err
}
// Prevent duplicate filters in the config
if _, ok := filters[filter]; ok {
return fmt.Errorf("duplicate filter '%s' found at position: %d", filter, i)
}
filters[filter] = struct{}{}
if filter != "" {
// Validate filter expression is valid
if err := c.validateFilter(filter); err != nil {
return err
}
}
if tags != "" {
// Validate tags
for _, tagStr := range strings.Split(tags, ",") {
if err := c.validateTag(tagStr); err != nil {
return err
}
}
}
}
return nil
}
func (c *Config) validateTemplate(template string) error {
hasMeasurement := false
for _, p := range strings.Split(template, ".") {
if p == "measurement" || p == "measurement*" {
hasMeasurement = true
}
}
if !hasMeasurement {
return fmt.Errorf("no measurement in template `%s`", template)
}
return nil
}
func (c *Config) validateFilter(filter string) error {
for _, p := range strings.Split(filter, ".") {
if p == "" {
return fmt.Errorf("filter contains blank section: %s", filter)
}
if strings.Contains(p, "*") && p != "*" {
return fmt.Errorf("invalid filter wildcard section: %s", filter)
}
}
return nil
}
func (c *Config) validateTag(keyValue string) error {
parts := strings.Split(keyValue, "=")
if len(parts) != 2 {
return fmt.Errorf("invalid template tags: '%s'", keyValue)
}
if parts[0] == "" || parts[1] == "" {
return fmt.Errorf("invalid template tags: %s'", keyValue)
}
return nil
}

View File

@ -0,0 +1,14 @@
package graphite
import "fmt"
// An UnsupposedValueError is returned when a parsed value is not
// supposed.
type UnsupposedValueError struct {
Field string
Value float64
}
func (err *UnsupposedValueError) Error() string {
return fmt.Sprintf(`field "%s" value: "%v" is unsupported`, err.Field, err.Value)
}

View File

@ -0,0 +1,414 @@
package graphite
import (
"bytes"
"fmt"
"io"
"math"
"sort"
"strconv"
"strings"
"time"
"bufio"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/encoding"
)
// Minimum and maximum supported dates for timestamps.
var (
MinDate = time.Date(1901, 12, 13, 0, 0, 0, 0, time.UTC)
MaxDate = time.Date(2038, 1, 19, 0, 0, 0, 0, time.UTC)
)
// Options are configurable values that can be provided to a Parser
type Options struct {
Separator string
Templates []string
}
// Parser encapsulates a Graphite Parser.
type GraphiteParser struct {
matcher *matcher
}
func NewParser() *GraphiteParser {
return &GraphiteParser{}
}
func (p *GraphiteParser) InitConfig(configs map[string]interface{}) error {
var err error
options := Options{
Templates: configs["Templates"].([]string),
Separator: configs["Separator"].(string)}
matcher := newMatcher()
p.matcher = matcher
defaultTemplate, _ := NewTemplate("measurement*", nil, DefaultSeparator)
matcher.AddDefaultTemplate(defaultTemplate)
for _, pattern := range options.Templates {
template := pattern
filter := ""
// Format is [filter] <template> [tag1=value1,tag2=value2]
parts := strings.Fields(pattern)
if len(parts) < 1 {
continue
} else if len(parts) >= 2 {
if strings.Contains(parts[1], "=") {
template = parts[0]
} else {
filter = parts[0]
template = parts[1]
}
}
// Parse out the default tags specific to this template
tags := models.Tags{}
if strings.Contains(parts[len(parts)-1], "=") {
tagStrs := strings.Split(parts[len(parts)-1], ",")
for _, kv := range tagStrs {
parts := strings.Split(kv, "=")
tags[parts[0]] = parts[1]
}
}
tmpl, err1 := NewTemplate(template, tags, options.Separator)
if err1 != nil {
err = err1
break
}
matcher.Add(filter, tmpl)
}
if err != nil {
return fmt.Errorf("exec input parser config is error: %s ", err.Error())
} else {
return nil
}
}
func init() {
encoding.Add("graphite", func() encoding.Parser {
return NewParser()
})
}
func (p *GraphiteParser) Parse(buf []byte) ([]telegraf.Metric, error) {
// parse even if the buffer begins with a newline
buf = bytes.TrimPrefix(buf, []byte("\n"))
metrics := make([]telegraf.Metric, 0)
buffer := bytes.NewBuffer(buf)
reader := bufio.NewReader(buffer)
for {
// Read up to the next newline.
buf, err := reader.ReadBytes('\n')
if err == io.EOF {
return metrics, nil
}
if err != nil && err != io.EOF {
return metrics, err
}
// Trim the buffer, even though there should be no padding
line := strings.TrimSpace(string(buf))
if metric, err := p.ParseLine(line); err == nil {
metrics = append(metrics, metric)
}
}
}
// Parse performs Graphite parsing of a single line.
func (p *GraphiteParser) ParseLine(line string) (telegraf.Metric, error) {
// Break into 3 fields (name, value, timestamp).
fields := strings.Fields(line)
if len(fields) != 2 && len(fields) != 3 {
return nil, fmt.Errorf("received %q which doesn't have required fields", line)
}
// decode the name and tags
template := p.matcher.Match(fields[0])
measurement, tags, field, err := template.Apply(fields[0])
if err != nil {
return nil, err
}
// Could not extract measurement, use the raw value
if measurement == "" {
measurement = fields[0]
}
// Parse value.
v, err := strconv.ParseFloat(fields[1], 64)
if err != nil {
return nil, fmt.Errorf(`field "%s" value: %s`, fields[0], err)
}
if math.IsNaN(v) || math.IsInf(v, 0) {
return nil, &UnsupposedValueError{Field: fields[0], Value: v}
}
fieldValues := map[string]interface{}{}
if field != "" {
fieldValues[field] = v
} else {
fieldValues["value"] = v
}
// If no 3rd field, use now as timestamp
timestamp := time.Now().UTC()
if len(fields) == 3 {
// Parse timestamp.
unixTime, err := strconv.ParseFloat(fields[2], 64)
if err != nil {
return nil, fmt.Errorf(`field "%s" time: %s`, fields[0], err)
}
// -1 is a special value that gets converted to current UTC time
// See https://github.com/graphite-project/carbon/issues/54
if unixTime != float64(-1) {
// Check if we have fractional seconds
timestamp = time.Unix(int64(unixTime), int64((unixTime-math.Floor(unixTime))*float64(time.Second)))
if timestamp.Before(MinDate) || timestamp.After(MaxDate) {
return nil, fmt.Errorf("timestamp out of range")
}
}
}
return telegraf.NewMetric(measurement, tags, fieldValues, timestamp)
}
// ApplyTemplate extracts the template fields from the given line and
// returns the measurement name and tags.
func (p *GraphiteParser) ApplyTemplate(line string) (string, map[string]string, string, error) {
// Break line into fields (name, value, timestamp), only name is used
fields := strings.Fields(line)
if len(fields) == 0 {
return "", make(map[string]string), "", nil
}
// decode the name and tags
template := p.matcher.Match(fields[0])
name, tags, field, err := template.Apply(fields[0])
return name, tags, field, err
}
// template represents a pattern and tags to map a graphite metric string to a influxdb Point
type template struct {
tags []string
defaultTags models.Tags
greedyMeasurement bool
separator string
}
// NewTemplate returns a new template ensuring it has a measurement
// specified.
func NewTemplate(pattern string, defaultTags models.Tags, separator string) (*template, error) {
tags := strings.Split(pattern, ".")
hasMeasurement := false
template := &template{tags: tags, defaultTags: defaultTags, separator: separator}
for _, tag := range tags {
if strings.HasPrefix(tag, "measurement") {
hasMeasurement = true
}
if tag == "measurement*" {
template.greedyMeasurement = true
}
}
if !hasMeasurement {
return nil, fmt.Errorf("no measurement specified for template. %q", pattern)
}
return template, nil
}
// Apply extracts the template fields from the given line and returns the measurement
// name and tags
func (t *template) Apply(line string) (string, map[string]string, string, error) {
fields := strings.Split(line, ".")
var (
measurement []string
tags = make(map[string]string)
field string
)
// Set any default tags
for k, v := range t.defaultTags {
tags[k] = v
}
for i, tag := range t.tags {
if i >= len(fields) {
continue
}
if tag == "measurement" {
measurement = append(measurement, fields[i])
} else if tag == "field" {
if len(field) != 0 {
return "", nil, "", fmt.Errorf("'field' can only be used once in each template: %q", line)
}
field = fields[i]
} else if tag == "measurement*" {
measurement = append(measurement, fields[i:]...)
break
} else if tag != "" {
tags[tag] = fields[i]
}
}
return strings.Join(measurement, t.separator), tags, field, nil
}
// matcher determines which template should be applied to a given metric
// based on a filter tree.
type matcher struct {
root *node
defaultTemplate *template
}
func newMatcher() *matcher {
return &matcher{
root: &node{},
}
}
// Add inserts the template in the filter tree based the given filter
func (m *matcher) Add(filter string, template *template) {
if filter == "" {
m.AddDefaultTemplate(template)
return
}
m.root.Insert(filter, template)
}
func (m *matcher) AddDefaultTemplate(template *template) {
m.defaultTemplate = template
}
// Match returns the template that matches the given graphite line
func (m *matcher) Match(line string) *template {
tmpl := m.root.Search(line)
if tmpl != nil {
return tmpl
}
return m.defaultTemplate
}
// node is an item in a sorted k-ary tree. Each child is sorted by its value.
// The special value of "*", is always last.
type node struct {
value string
children nodes
template *template
}
func (n *node) insert(values []string, template *template) {
// Add the end, set the template
if len(values) == 0 {
n.template = template
return
}
// See if the the current element already exists in the tree. If so, insert the
// into that sub-tree
for _, v := range n.children {
if v.value == values[0] {
v.insert(values[1:], template)
return
}
}
// New element, add it to the tree and sort the children
newNode := &node{value: values[0]}
n.children = append(n.children, newNode)
sort.Sort(&n.children)
// Now insert the rest of the tree into the new element
newNode.insert(values[1:], template)
}
// Insert inserts the given string template into the tree. The filter string is separated
// on "." and each part is used as the path in the tree.
func (n *node) Insert(filter string, template *template) {
n.insert(strings.Split(filter, "."), template)
}
func (n *node) search(lineParts []string) *template {
// Nothing to search
if len(lineParts) == 0 || len(n.children) == 0 {
return n.template
}
// If last element is a wildcard, don't include in this search since it's sorted
// to the end but lexicographically it would not always be and sort.Search assumes
// the slice is sorted.
length := len(n.children)
if n.children[length-1].value == "*" {
length--
}
// Find the index of child with an exact match
i := sort.Search(length, func(i int) bool {
return n.children[i].value >= lineParts[0]
})
// Found an exact match, so search that child sub-tree
if i < len(n.children) && n.children[i].value == lineParts[0] {
return n.children[i].search(lineParts[1:])
}
// Not an exact match, see if we have a wildcard child to search
if n.children[len(n.children)-1].value == "*" {
return n.children[len(n.children)-1].search(lineParts[1:])
}
return n.template
}
func (n *node) Search(line string) *template {
return n.search(strings.Split(line, "."))
}
type nodes []*node
// Less returns a boolean indicating whether the filter at position j
// is less than the filter at position k. Filters are order by string
// comparison of each component parts. A wildcard value "*" is never
// less than a non-wildcard value.
//
// For example, the filters:
// "*.*"
// "servers.*"
// "servers.localhost"
// "*.localhost"
//
// Would be sorted as:
// "servers.localhost"
// "servers.*"
// "*.localhost"
// "*.*"
func (n *nodes) Less(j, k int) bool {
if (*n)[j].value == "*" && (*n)[k].value != "*" {
return false
}
if (*n)[j].value != "*" && (*n)[k].value == "*" {
return true
}
return (*n)[j].value < (*n)[k].value
}
func (n *nodes) Swap(i, j int) { (*n)[i], (*n)[j] = (*n)[j], (*n)[i] }
func (n *nodes) Len() int { return len(*n) }

View File

@ -0,0 +1,48 @@
package influx
import (
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/encoding"
)
type InfluxParser struct {
}
func (p *InfluxParser) Parse(buf []byte) ([]telegraf.Metric, error) {
metrics, err := telegraf.ParseMetrics(buf)
if err != nil {
return nil, err
}
return metrics, nil
}
func (p *InfluxParser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line + "\n"))
if err != nil {
return nil, err
}
if len(metrics) < 1 {
return nil, fmt.Errorf("Can not parse the line: %s, for data format: influx ", line)
}
return metrics[0], nil
}
func NewParser() *InfluxParser {
return &InfluxParser{}
}
func (p *InfluxParser) InitConfig(configs map[string]interface{}) error {
return nil
}
func init() {
encoding.Add("influx", func() encoding.Parser {
return NewParser()
})
}

View File

@ -0,0 +1,68 @@
package json
import (
"encoding/json"
"fmt"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/encoding"
)
type JsonParser struct {
}
func (p *JsonParser) Parse(buf []byte) ([]telegraf.Metric, error) {
metrics := make([]telegraf.Metric, 0)
var jsonOut interface{}
err := json.Unmarshal(buf, &jsonOut)
if err != nil {
err = fmt.Errorf("unable to parse out as JSON, %s", err)
return nil, err
}
f := internal.JSONFlattener{}
err = f.FlattenJSON("", jsonOut)
if err != nil {
return nil, err
}
metric, err := telegraf.NewMetric("exec", nil, f.Fields, time.Now().UTC())
if err != nil {
return nil, err
}
return append(metrics, metric), nil
}
func (p *JsonParser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line + "\n"))
if err != nil {
return nil, err
}
if len(metrics) < 1 {
return nil, fmt.Errorf("Can not parse the line: %s, for data format: influx ", line)
}
return metrics[0], nil
}
func NewParser() *JsonParser {
return &JsonParser{}
}
func (p *JsonParser) InitConfig(configs map[string]interface{}) error {
return nil
}
func init() {
encoding.Add("json", func() encoding.Parser {
return NewParser()
})
}

View File

@ -1,7 +1,23 @@
# Exec Input Plugin
The exec plugin can execute arbitrary commands which output JSON or
InfluxDB [line-protocol](https://docs.influxdata.com/influxdb/v0.9/write_protocols/line/).
The exec plugin can execute arbitrary commands which output:
* JSON
* InfluxDB [line-protocol](https://docs.influxdata.com/influxdb/v0.9/write_protocols/line/)
* Graphite [graphite-protocol](http://graphite.readthedocs.org/en/latest/feeding-carbon.html)
> Graphite understands messages with this format:
> ```
metric_path value timestamp\n
```
> __metric_path__ is the metric namespace that you want to populate.
> __value__ is the value that you want to assign to the metric at this time.
> __timestamp__ is the unix epoch time.
If using JSON, only numeric values are parsed and turned into floats. Booleans
and strings will be ignored.
@ -11,21 +27,43 @@ and strings will be ignored.
```
# Read flattened metrics from one or more commands that output JSON to stdout
[[inputs.exec]]
# the command to run
command = "/usr/bin/mycollector --foo=bar"
# Shell/commands array
# compatible with old version
# we can still use the old command configuration
# command = "/usr/bin/mycollector --foo=bar"
commands = ["/tmp/test.sh","/tmp/test2.sh"]
# Data format to consume. This can be "json" or "influx" (line-protocol)
# Data format to consume. This can be "json", "influx" or "graphite" (line-protocol)
# NOTE json only reads numerical measurements, strings and booleans are ignored.
data_format = "json"
# measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
### Below configuration will be used for data_format = "graphite", can be ignored for other data_format
### If matching multiple measurement files, this string will be used to join the matched values.
#separator = "."
### Each template line requires a template pattern. It can have an optional
### filter before the template and separated by spaces. It can also have optional extra
### tags following the template. Multiple tags should be separated by commas and no spaces
### similar to the line protocol format. The can be only one default template.
### Templates support below format:
### 1. filter + template
### 2. filter + template + extra tag
### 3. filter + template with field key
### 4. default template
#templates = [
# "*.app env.service.resource.measurement",
# "stats.* .host.measurement* region=us-west,agent=sensu",
# "stats2.* .host.measurement.field",
# "measurement*"
#]
```
Other options for modifying the measurement names are:
```
name_override = "measurement_name"
name_prefix = "prefix_"
```
@ -57,8 +95,11 @@ Now let's say we have the following configuration:
```
[[inputs.exec]]
# the command to run
command = "/usr/bin/line_protocol_collector"
# Shell/commands array
# compatible with old version
# we can still use the old command configuration
# command = "/usr/bin/line_protocol_collector"
commands = ["/usr/bin/line_protocol_collector","/tmp/test2.sh"]
# Data format to consume. This can be "json" or "influx" (line-protocol)
# NOTE json only reads numerical measurements, strings and booleans are ignored.
@ -80,3 +121,63 @@ cpu,cpu=cpu6,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
You will get data in InfluxDB exactly as it is defined above,
tags are cpu=cpuN, host=foo, and datacenter=us-east with fields usage_idle
and usage_busy. They will receive a timestamp at collection time.
### Example 3
We can also change the data_format to "graphite" to use the metrics collecting scripts such as (compatible with graphite):
* Nagios [Mertics Plugins] (https://exchange.nagios.org/directory/Plugins)
* Sensu [Mertics Plugins] (https://github.com/sensu-plugins)
#### Configuration
```
# Read flattened metrics from one or more commands that output JSON to stdout
[[inputs.exec]]
# Shell/commands array
commands = ["/tmp/test.sh","/tmp/test2.sh"]
# Data format to consume. This can be "json", "influx" or "graphite" (line-protocol)
# NOTE json only reads numerical measurements, strings and booleans are ignored.
data_format = "graphite"
# measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
### Below configuration will be used for data_format = "graphite", can be ignored for other data_format
### If matching multiple measurement files, this string will be used to join the matched values.
separator = "."
### Each template line requires a template pattern. It can have an optional
### filter before the template and separated by spaces. It can also have optional extra
### tags following the template. Multiple tags should be separated by commas and no spaces
### similar to the line protocol format. The can be only one default template.
### Templates support below format:
### 1. filter + template
### 2. filter + template + extra tag
### 3. filter + template with field key
### 4. default template
templates = [
"*.app env.service.resource.measurement",
"stats.* .host.measurement* region=us-west,agent=sensu",
"stats2.* .host.measurement.field",
"measurement*"
]
```
And test.sh/test2.sh will output:
```
sensu.metric.net.server0.eth0.rx_packets 461295119435 1444234982
sensu.metric.net.server0.eth0.tx_bytes 1093086493388480 1444234982
sensu.metric.net.server0.eth0.rx_bytes 1015633926034834 1444234982
sensu.metric.net.server0.eth0.tx_errors 0 1444234982
sensu.metric.net.server0.eth0.rx_errors 0 1444234982
sensu.metric.net.server0.eth0.tx_dropped 0 1444234982
sensu.metric.net.server0.eth0.rx_dropped 0 1444234982
```
The templates configuration will be used to parse the graphite metrics to support influxdb/opentsdb tagging store engines.
More detail information about templates, please refer to [The graphite Input] (https://github.com/influxdata/influxdb/blob/master/services/graphite/README.md)

View File

@ -2,55 +2,94 @@ package exec
import (
"bytes"
"encoding/json"
"fmt"
"os/exec"
"time"
"sync"
"github.com/gonuts/go-shellquote"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/encoding"
"github.com/influxdata/telegraf/plugins/inputs"
_ "github.com/influxdata/telegraf/internal/encoding/graphite"
_ "github.com/influxdata/telegraf/internal/encoding/influx"
_ "github.com/influxdata/telegraf/internal/encoding/json"
)
const sampleConfig = `
# the command to run
command = "/usr/bin/mycollector --foo=bar"
# Shell/commands array
# compatible with old version
# we can still use the old command configuration
# command = "/usr/bin/mycollector --foo=bar"
commands = ["/tmp/test.sh","/tmp/test2.sh"]
# Data format to consume. This can be "json" or "influx" (line-protocol)
# Data format to consume. This can be "json", "influx" or "graphite" (line-protocol)
# NOTE json only reads numerical measurements, strings and booleans are ignored.
data_format = "json"
# measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
### Below configuration will be used for data_format = "graphite", can be ignored for other data_format
### If matching multiple measurement files, this string will be used to join the matched values.
separator = "."
### Each template line requires a template pattern. It can have an optional
### filter before the template and separated by spaces. It can also have optional extra
### tags following the template. Multiple tags should be separated by commas and no spaces
### similar to the line protocol format. The can be only one default template.
### Templates support below format:
### 1. filter + template
### 2. filter + template + extra tag
### 3. filter + template with field key
### 4. default template
templates = [
"*.app env.service.resource.measurement",
"stats.* .host.measurement* region=us-west,agent=sensu",
"stats2.* .host.measurement.field",
"measurement*"
]
`
type Exec struct {
Commands []string
Command string
DataFormat string
Separator string
Templates []string
encodingParser encoding.Parser
initedConfig bool
wg sync.WaitGroup
sync.Mutex
runner Runner
errc chan error
}
type Runner interface {
Run(*Exec) ([]byte, error)
Run(*Exec, string) ([]byte, error)
}
type CommandRunner struct{}
func (c CommandRunner) Run(e *Exec) ([]byte, error) {
split_cmd, err := shellquote.Split(e.Command)
func (c CommandRunner) Run(e *Exec, command string) ([]byte, error) {
split_cmd, err := shellquote.Split(command)
if err != nil || len(split_cmd) == 0 {
return nil, fmt.Errorf("exec: unable to parse command, %s", err)
}
cmd := exec.Command(split_cmd[0], split_cmd[1:]...)
var out bytes.Buffer
cmd.Stdout = &out
if err := cmd.Run(); err != nil {
return nil, fmt.Errorf("exec: %s for command '%s'", err, e.Command)
return nil, fmt.Errorf("exec: %s for command '%s'", err, command)
}
return out.Bytes(), nil
@ -60,47 +99,88 @@ func NewExec() *Exec {
return &Exec{runner: CommandRunner{}}
}
func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator) {
defer e.wg.Done()
out, err := e.runner.Run(e, command)
if err != nil {
e.errc <- err
return
}
metrics, err := e.encodingParser.Parse(out)
if err != nil {
e.errc <- err
} else {
for _, metric := range metrics {
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
}
}
func (e *Exec) initConfig() error {
e.Lock()
defer e.Unlock()
if e.Command != "" && len(e.Commands) < 1 {
e.Commands = []string{e.Command}
}
if e.DataFormat == "" {
e.DataFormat = "json"
}
var err error
configs := make(map[string]interface{})
configs["Separator"] = e.Separator
configs["Templates"] = e.Templates
e.encodingParser, err = encoding.NewParser(e.DataFormat, configs)
if err != nil {
return fmt.Errorf("exec configuration is error: %s ", err.Error())
}
return nil
}
func (e *Exec) SampleConfig() string {
return sampleConfig
}
func (e *Exec) Description() string {
return "Read flattened metrics from one or more commands that output JSON to stdout"
return "Read metrics from one or more commands that can output JSON, influx or graphite line protocol to stdout"
}
func (e *Exec) Gather(acc telegraf.Accumulator) error {
out, err := e.runner.Run(e)
if err != nil {
if !e.initedConfig {
if err := e.initConfig(); err != nil {
return err
}
e.initedConfig = true
}
switch e.DataFormat {
case "", "json":
var jsonOut interface{}
err = json.Unmarshal(out, &jsonOut)
if err != nil {
return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s",
e.Command, err)
}
e.Lock()
e.errc = make(chan error, 10)
e.Unlock()
f := internal.JSONFlattener{}
err = f.FlattenJSON("", jsonOut)
if err != nil {
return err
for _, command := range e.Commands {
e.wg.Add(1)
go e.ProcessCommand(command, acc)
}
acc.AddFields("exec", f.Fields, nil)
case "influx":
now := time.Now()
metrics, err := telegraf.ParseMetrics(out)
for _, metric := range metrics {
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), now)
}
return err
e.wg.Wait()
select {
default:
return fmt.Errorf("Unsupported data format: %s. Must be either json "+
"or influx.", e.DataFormat)
}
close(e.errc)
return nil
case err := <-e.errc:
close(e.errc)
return err
}
}
func init() {

View File

@ -55,7 +55,7 @@ func newRunnerMock(out []byte, err error) Runner {
}
}
func (r runnerMock) Run(e *Exec) ([]byte, error) {
func (r runnerMock) Run(e *Exec, command string) ([]byte, error) {
if r.err != nil {
return nil, r.err
}
@ -65,7 +65,7 @@ func (r runnerMock) Run(e *Exec) ([]byte, error) {
func TestExec(t *testing.T) {
e := &Exec{
runner: newRunnerMock([]byte(validJson), nil),
Command: "testcommand arg1",
Commands: []string{"testcommand arg1"},
}
var acc testutil.Accumulator
@ -89,7 +89,7 @@ func TestExec(t *testing.T) {
func TestExecMalformed(t *testing.T) {
e := &Exec{
runner: newRunnerMock([]byte(malformedJson), nil),
Command: "badcommand arg1",
Commands: []string{"badcommand arg1"},
}
var acc testutil.Accumulator
@ -101,7 +101,7 @@ func TestExecMalformed(t *testing.T) {
func TestCommandError(t *testing.T) {
e := &Exec{
runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")),
Command: "badcommand",
Commands: []string{"badcommand"},
}
var acc testutil.Accumulator
@ -113,7 +113,7 @@ func TestCommandError(t *testing.T) {
func TestLineProtocolParse(t *testing.T) {
e := &Exec{
runner: newRunnerMock([]byte(lineProtocol), nil),
Command: "line-protocol",
Commands: []string{"line-protocol"},
DataFormat: "influx",
}
@ -135,7 +135,7 @@ func TestLineProtocolParse(t *testing.T) {
func TestLineProtocolParseMultiple(t *testing.T) {
e := &Exec{
runner: newRunnerMock([]byte(lineProtocolMulti), nil),
Command: "line-protocol",
Commands: []string{"line-protocol"},
DataFormat: "influx",
}
@ -162,7 +162,7 @@ func TestLineProtocolParseMultiple(t *testing.T) {
func TestInvalidDataFormat(t *testing.T) {
e := &Exec{
runner: newRunnerMock([]byte(lineProtocol), nil),
Command: "bad data format",
Commands: []string{"bad data format"},
DataFormat: "FooBar",
}