From 3a54ef33f160e0c201e848d187424a4a2b6ac61e Mon Sep 17 00:00:00 2001 From: Henry Hu Date: Wed, 3 Feb 2016 11:25:01 +0800 Subject: [PATCH] Add graphite protocol support to exec plugin, and refactor the telegraf mertic paser to the internal/encoding direcotry. --- Godeps | 3 +- Godeps_windows | 3 +- .../encoding}/graphite/errors.go | 0 .../encoding/graphite/innerconfig.go | 29 +- .../encoding}/graphite/parser.go | 32 ++ internal/encoding/parser.go | 57 +++ plugins/inputs/all/all.go | 3 - plugins/inputs/exec/README.md | 115 ++++- plugins/inputs/exec/config.go | 39 ++ plugins/inputs/exec/exec.go | 156 +++++-- plugins/inputs/exec/exec_test.go | 20 +- plugins/inputs/execline/README.md | 151 ------- plugins/inputs/execline/config.go | 172 -------- plugins/inputs/execline/errors.go | 14 - plugins/inputs/execline/execline.go | 179 -------- plugins/inputs/execline/parser.go | 392 ------------------ plugins/inputs/graphite/README.md | 160 ------- plugins/inputs/graphite/config.go | 205 --------- plugins/inputs/graphite/graphite.go | 315 -------------- plugins/inputs/tail/README.md | 159 ------- plugins/inputs/tail/errors.go | 14 - plugins/inputs/tail/parser.go | 392 ------------------ plugins/inputs/tail/tail.go | 186 --------- 23 files changed, 374 insertions(+), 2422 deletions(-) rename {plugins/inputs => internal/encoding}/graphite/errors.go (100%) rename plugins/inputs/tail/config.go => internal/encoding/graphite/innerconfig.go (83%) rename {plugins/inputs => internal/encoding}/graphite/parser.go (93%) create mode 100644 internal/encoding/parser.go create mode 100644 plugins/inputs/exec/config.go delete mode 100644 plugins/inputs/execline/README.md delete mode 100644 plugins/inputs/execline/config.go delete mode 100644 plugins/inputs/execline/errors.go delete mode 100644 plugins/inputs/execline/execline.go delete mode 100644 plugins/inputs/execline/parser.go delete mode 100644 plugins/inputs/graphite/README.md delete mode 100644 plugins/inputs/graphite/config.go delete mode 100644 plugins/inputs/graphite/graphite.go delete mode 100644 plugins/inputs/tail/README.md delete mode 100644 plugins/inputs/tail/errors.go delete mode 100644 plugins/inputs/tail/parser.go delete mode 100644 plugins/inputs/tail/tail.go diff --git a/Godeps b/Godeps index 3967c32b8..7e43ed610 100644 --- a/Godeps +++ b/Godeps @@ -56,5 +56,4 @@ golang.org/x/text 6d3c22c4525a4da167968fa2479be5524d2e8bd0 gopkg.in/dancannon/gorethink.v1 6f088135ff288deb9d5546f4c71919207f891a70 gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715 gopkg.in/mgo.v2 03c9f3ee4c14c8e51ee521a6a7d0425658dd6f64 -gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4 -github.com/hpcloud/tail 1a0242e795eeefe54261ff308dc685f7d29cc58c \ No newline at end of file +gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4 \ No newline at end of file diff --git a/Godeps_windows b/Godeps_windows index 0a661a901..829e2cb35 100644 --- a/Godeps_windows +++ b/Godeps_windows @@ -60,5 +60,4 @@ golang.org/x/text 6fc2e00a0d64b1f7fc1212dae5b0c939cf6d9ac4 gopkg.in/dancannon/gorethink.v1 6f088135ff288deb9d5546f4c71919207f891a70 gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715 gopkg.in/mgo.v2 03c9f3ee4c14c8e51ee521a6a7d0425658dd6f64 -gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4 -github.com/hpcloud/tail 1a0242e795eeefe54261ff308dc685f7d29cc58c \ No newline at end of file +gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4 \ No newline at end of file diff --git a/plugins/inputs/graphite/errors.go b/internal/encoding/graphite/errors.go similarity index 100% rename from plugins/inputs/graphite/errors.go rename to internal/encoding/graphite/errors.go diff --git a/plugins/inputs/tail/config.go b/internal/encoding/graphite/innerconfig.go similarity index 83% rename from plugins/inputs/tail/config.go rename to internal/encoding/graphite/innerconfig.go index 0d6f19e7f..2fdab3687 100644 --- a/plugins/inputs/tail/config.go +++ b/internal/encoding/graphite/innerconfig.go @@ -1,4 +1,4 @@ -package tail +package graphite import ( "fmt" @@ -14,25 +14,14 @@ const ( ) // Config represents the configuration for Graphite endpoints. -type Config struct { - Files []string +type InnerConfig struct { Separator string Tags []string Templates []string } -// WithDefaults takes the given config and returns a new config with any required -// default values set. -func (c *Config) WithDefaults() *Config { - d := *c - if d.Separator == "" { - d.Separator = DefaultSeparator - } - return &d -} - // DefaultTags returns the config's tags. -func (c *Config) DefaultTags() models.Tags { +func (c *InnerConfig) DefaultTags() models.Tags { tags := models.Tags{} for _, t := range c.Tags { parts := strings.Split(t, "=") @@ -42,7 +31,7 @@ func (c *Config) DefaultTags() models.Tags { } // Validate validates the config's templates and tags. -func (c *Config) Validate() error { +func (c *InnerConfig) Validate() error { if err := c.validateTemplates(); err != nil { return err } @@ -54,7 +43,7 @@ func (c *Config) Validate() error { return nil } -func (c *Config) validateTemplates() error { +func (c *InnerConfig) validateTemplates() error { // map to keep track of filters we see filters := map[string]struct{}{} @@ -121,7 +110,7 @@ func (c *Config) validateTemplates() error { return nil } -func (c *Config) validateTags() error { +func (c *InnerConfig) validateTags() error { for _, t := range c.Tags { if err := c.validateTag(t); err != nil { return err @@ -130,7 +119,7 @@ func (c *Config) validateTags() error { return nil } -func (c *Config) validateTemplate(template string) error { +func (c *InnerConfig) validateTemplate(template string) error { hasMeasurement := false for _, p := range strings.Split(template, ".") { if p == "measurement" || p == "measurement*" { @@ -145,7 +134,7 @@ func (c *Config) validateTemplate(template string) error { return nil } -func (c *Config) validateFilter(filter string) error { +func (c *InnerConfig) validateFilter(filter string) error { for _, p := range strings.Split(filter, ".") { if p == "" { return fmt.Errorf("filter contains blank section: %s", filter) @@ -158,7 +147,7 @@ func (c *Config) validateFilter(filter string) error { return nil } -func (c *Config) validateTag(keyValue string) error { +func (c *InnerConfig) validateTag(keyValue string) error { parts := strings.Split(keyValue, "=") if len(parts) != 2 { return fmt.Errorf("invalid template tags: '%s'", keyValue) diff --git a/plugins/inputs/graphite/parser.go b/internal/encoding/graphite/parser.go similarity index 93% rename from plugins/inputs/graphite/parser.go rename to internal/encoding/graphite/parser.go index d28595f21..02611943b 100644 --- a/plugins/inputs/graphite/parser.go +++ b/internal/encoding/graphite/parser.go @@ -1,13 +1,17 @@ package graphite import ( + "bytes" "fmt" + "io" "math" "sort" "strconv" "strings" "time" + "bufio" + "github.com/influxdata/influxdb/models" "github.com/influxdata/telegraf" ) @@ -93,6 +97,34 @@ func NewParser(templates []string, defaultTags models.Tags) (*Parser, error) { }) } +func (p *Parser) ParseMetrics(buf []byte) ([]telegraf.Metric, error) { + // parse even if the buffer begins with a newline + buf = bytes.TrimPrefix(buf, []byte("\n")) + + metrics := make([]telegraf.Metric, 0) + + buffer := bytes.NewBuffer(buf) + reader := bufio.NewReader(buffer) + for { + // Read up to the next newline. + buf, err := reader.ReadBytes('\n') + if err == io.EOF { + return metrics, nil + } + if err != nil && err != io.EOF { + return metrics, err + } + + // Trim the buffer, even though there should be no padding + line := strings.TrimSpace(string(buf)) + if metric, err := p.Parse(line); err == nil { + metrics = append(metrics, metric) + } + } + + return metrics, nil +} + // Parse performs Graphite parsing of a single line. func (p *Parser) Parse(line string) (telegraf.Metric, error) { // Break into 3 fields (name, value, timestamp). diff --git a/internal/encoding/parser.go b/internal/encoding/parser.go new file mode 100644 index 000000000..e6bd53520 --- /dev/null +++ b/internal/encoding/parser.go @@ -0,0 +1,57 @@ +package encoding + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/encoding/graphite" +) + +type Parser struct { + graphiteParser *graphite.Parser +} + +func NewParser(parser *graphite.Parser) *Parser { + return &Parser{graphiteParser: parser} +} + +func (p *Parser) Parse(dataFormat string, out []byte, acc telegraf.Accumulator) error { + var err error + var metrics []telegraf.Metric + var metric telegraf.Metric + + switch dataFormat { + case "json": + var jsonOut interface{} + err = json.Unmarshal(out, &jsonOut) + if err != nil { + err = fmt.Errorf("unable to parse out as JSON, %s", err) + break + } + + f := internal.JSONFlattener{} + err = f.FlattenJSON("", jsonOut) + if err != nil { + break + } + acc.AddFields("exec", f.Fields, nil) + case "influx": + now := time.Now() + metrics, err = telegraf.ParseMetrics(out) + for _, metric = range metrics { + acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), now) + } + case "graphite": + metrics, err = p.graphiteParser.ParseMetrics(out) + for _, metric = range metrics { + acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) + } + default: + err = fmt.Errorf("Unsupported data format: %s. Must be either json, influx or graphite ", dataFormat) + } + + return err +} diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index bc3d6f311..52ab428f8 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -8,9 +8,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/docker" _ "github.com/influxdata/telegraf/plugins/inputs/elasticsearch" _ "github.com/influxdata/telegraf/plugins/inputs/exec" - _ "github.com/influxdata/telegraf/plugins/inputs/execline" _ "github.com/influxdata/telegraf/plugins/inputs/github_webhooks" - _ "github.com/influxdata/telegraf/plugins/inputs/graphite" _ "github.com/influxdata/telegraf/plugins/inputs/haproxy" _ "github.com/influxdata/telegraf/plugins/inputs/httpjson" _ "github.com/influxdata/telegraf/plugins/inputs/influxdb" @@ -40,7 +38,6 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/sqlserver" _ "github.com/influxdata/telegraf/plugins/inputs/statsd" _ "github.com/influxdata/telegraf/plugins/inputs/system" - _ "github.com/influxdata/telegraf/plugins/inputs/tail" _ "github.com/influxdata/telegraf/plugins/inputs/trig" _ "github.com/influxdata/telegraf/plugins/inputs/twemproxy" _ "github.com/influxdata/telegraf/plugins/inputs/win_perf_counters" diff --git a/plugins/inputs/exec/README.md b/plugins/inputs/exec/README.md index 1172140c7..5dabada6b 100644 --- a/plugins/inputs/exec/README.md +++ b/plugins/inputs/exec/README.md @@ -1,7 +1,23 @@ # Exec Input Plugin -The exec plugin can execute arbitrary commands which output JSON or -InfluxDB [line-protocol](https://docs.influxdata.com/influxdb/v0.9/write_protocols/line/). +The exec plugin can execute arbitrary commands which output: + +* JSON +* InfluxDB [line-protocol](https://docs.influxdata.com/influxdb/v0.9/write_protocols/line/) +* Graphite [graphite-protocol](http://graphite.readthedocs.org/en/latest/feeding-carbon.html) + +> Graphite understands messages with this format: + +> ``` +metric_path value timestamp\n +``` + +> __metric_path__ is the metric namespace that you want to populate. + +> __value__ is the value that you want to assign to the metric at this time. + +> __timestamp__ is the unix epoch time. + If using JSON, only numeric values are parsed and turned into floats. Booleans and strings will be ignored. @@ -11,21 +27,44 @@ and strings will be ignored. ``` # Read flattened metrics from one or more commands that output JSON to stdout [[inputs.exec]] - # the command to run - command = "/usr/bin/mycollector --foo=bar" + # Shell/commands array + commands = ["/tmp/test.sh","/tmp/test2.sh"] - # Data format to consume. This can be "json" or "influx" (line-protocol) + # Data format to consume. This can be "json", "influx" or "graphite" (line-protocol) # NOTE json only reads numerical measurements, strings and booleans are ignored. data_format = "json" # measurement name suffix (for separating different commands) name_suffix = "_mycollector" + + ### Below configuration will be used for data_format = "graphite", can be ignored for other data_format + ### If matching multiple measurement files, this string will be used to join the matched values. + #separator = "." + + ### Default tags that will be added to all metrics. These can be overridden at the template level + ### or by tags extracted from metric + #tags = ["region=north-east", "zone=1c"] + + ### Each template line requires a template pattern. It can have an optional + ### filter before the template and separated by spaces. It can also have optional extra + ### tags following the template. Multiple tags should be separated by commas and no spaces + ### similar to the line protocol format. The can be only one default template. + ### Templates support below format: + ### filter + template + ### filter + template + extra tag + ### filter + template with field key + ### default template. Ignore the first graphite component "servers" + #templates = [ + # "*.app env.service.resource.measurement", + # "stats.* .host.measurement* region=us-west,agent=sensu", + # "stats2.* .host.measurement.field", + # "measurement*" + #] ``` Other options for modifying the measurement names are: ``` -name_override = "measurement_name" name_prefix = "prefix_" ``` @@ -80,3 +119,67 @@ cpu,cpu=cpu6,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 You will get data in InfluxDB exactly as it is defined above, tags are cpu=cpuN, host=foo, and datacenter=us-east with fields usage_idle and usage_busy. They will receive a timestamp at collection time. + + +### Example 3 + +We can also change the data_format to "graphite" to use the metrics collecting scripts such as (compatible with graphite): + +* Nagios [Mertics Plugins] (https://exchange.nagios.org/directory/Plugins) +* Sensu [Mertics Plugins] (https://github.com/sensu-plugins) + +#### Configuration +``` +# Read flattened metrics from one or more commands that output JSON to stdout +[[inputs.exec]] + # Shell/commands array + commands = ["/tmp/test.sh","/tmp/test2.sh"] + + # Data format to consume. This can be "json", "influx" or "graphite" (line-protocol) + # NOTE json only reads numerical measurements, strings and booleans are ignored. + data_format = "graphite" + + # measurement name suffix (for separating different commands) + name_suffix = "_mycollector" + + ### Below configuration will be used for data_format = "graphite", can be ignored for other data_format + ### If matching multiple measurement files, this string will be used to join the matched values. + separator = "." + + ### Default tags that will be added to all metrics. These can be overridden at the template level + ### or by tags extracted from metric + tags = ["region=north-east", "zone=1c"] + + ### Each template line requires a template pattern. It can have an optional + ### filter before the template and separated by spaces. It can also have optional extra + ### tags following the template. Multiple tags should be separated by commas and no spaces + ### similar to the line protocol format. The can be only one default template. + ### Templates support below format: + ### filter + template + ### filter + template + extra tag + ### filter + template with field key + ### default template. Ignore the first graphite component "servers" + templates = [ + "*.app env.service.resource.measurement", + "stats.* .host.measurement* region=us-west,agent=sensu", + "stats2.* .host.measurement.field", + "measurement*" + ] +``` + +And test.sh/test2.sh will output: + +``` +sensu.metric.net.server0.eth0.rx_packets 461295119435 1444234982 +sensu.metric.net.server0.eth0.tx_bytes 1093086493388480 1444234982 +sensu.metric.net.server0.eth0.rx_bytes 1015633926034834 1444234982 +sensu.metric.net.server0.eth0.tx_errors 0 1444234982 +sensu.metric.net.server0.eth0.rx_errors 0 1444234982 +sensu.metric.net.server0.eth0.tx_dropped 0 1444234982 +sensu.metric.net.server0.eth0.rx_dropped 0 1444234982 +``` + +The templates configuration will be used to parse the graphite metrics to support influxdb/opentsdb tagging store engines. + +More detail information about templates, please refer to [The graphite Input] (https://github.com/influxdata/influxdb/blob/master/services/graphite/README.md) + diff --git a/plugins/inputs/exec/config.go b/plugins/inputs/exec/config.go new file mode 100644 index 000000000..b413cde69 --- /dev/null +++ b/plugins/inputs/exec/config.go @@ -0,0 +1,39 @@ +package exec + +import ( + + "github.com/influxdata/telegraf/internal/encoding/graphite" +) + +const ( +// DefaultSeparator is the default join character to use when joining multiple +// measurment parts in a template. + DefaultSeparator = "." +) + +// Config represents the configuration for Graphite endpoints. +type Config struct { + Commands []string + graphite.InnerConfig +} + +// New Config instance. +func NewConfig(commands, tags, templates []string, separator string) *Config { + c := &Config{} + c.Commands = commands + c.Tags = tags + c.Templates = templates + c.Separator = separator + return c +} + +// WithDefaults takes the given config and returns a new config with any required +// default values set. +func (c *Config) WithDefaults() *Config { + d := *c + if d.Separator == "" { + d.Separator = DefaultSeparator + } + return &d +} + diff --git a/plugins/inputs/exec/exec.go b/plugins/inputs/exec/exec.go index c4bb634ba..171c5ed5e 100644 --- a/plugins/inputs/exec/exec.go +++ b/plugins/inputs/exec/exec.go @@ -2,55 +2,96 @@ package exec import ( "bytes" - "encoding/json" "fmt" "os/exec" - "time" + "sync" "github.com/gonuts/go-shellquote" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/encoding" + "github.com/influxdata/telegraf/internal/encoding/graphite" "github.com/influxdata/telegraf/plugins/inputs" ) const sampleConfig = ` - # the command to run - command = "/usr/bin/mycollector --foo=bar" + # Shell/commands array + commands = ["/tmp/test.sh","/tmp/test2.sh"] - # Data format to consume. This can be "json" or "influx" (line-protocol) + # Data format to consume. This can be "json", "influx" or "graphite" (line-protocol) # NOTE json only reads numerical measurements, strings and booleans are ignored. data_format = "json" # measurement name suffix (for separating different commands) name_suffix = "_mycollector" + + ### Below configuration will be used for data_format = "graphite", can be ignored for other data_format + ### If matching multiple measurement files, this string will be used to join the matched values. + separator = "." + + ### Default tags that will be added to all metrics. These can be overridden at the template level + ### or by tags extracted from metric + tags = ["region=north-east", "zone=1c"] + + ### Each template line requires a template pattern. It can have an optional + ### filter before the template and separated by spaces. It can also have optional extra + ### tags following the template. Multiple tags should be separated by commas and no spaces + ### similar to the line protocol format. The can be only one default template. + ### Templates support below format: + ### filter + template + ### filter + template + extra tag + ### filter + template with field key + ### default template. Ignore the first graphite component "servers" + templates = [ + "*.app env.service.resource.measurement", + "stats.* .host.measurement* region=us-west,agent=sensu", + "stats2.* .host.measurement.field", + "measurement*" + ] ` type Exec struct { - Command string + Commands []string DataFormat string + Separator string + Tags []string + Templates []string + + encodingParser *encoding.Parser + + config *Config + + initedConfig bool + + wg sync.WaitGroup + sync.Mutex + runner Runner + errc chan error } type Runner interface { - Run(*Exec) ([]byte, error) + Run(*Exec, string) ([]byte, error) } type CommandRunner struct{} -func (c CommandRunner) Run(e *Exec) ([]byte, error) { - split_cmd, err := shellquote.Split(e.Command) +func (c CommandRunner) Run(e *Exec, command string) ([]byte, error) { + split_cmd, err := shellquote.Split(command) if err != nil || len(split_cmd) == 0 { return nil, fmt.Errorf("exec: unable to parse command, %s", err) } cmd := exec.Command(split_cmd[0], split_cmd[1:]...) + //name := strings.Replace(filepath.Base(cmd.Path), "/", "_", -1) + //name = strings.Replace(name, ".", "_", -1) + var out bytes.Buffer cmd.Stdout = &out if err := cmd.Run(); err != nil { - return nil, fmt.Errorf("exec: %s for command '%s'", err, e.Command) + return nil, fmt.Errorf("exec: %s for command '%s'", err, command) } return out.Bytes(), nil @@ -60,47 +101,82 @@ func NewExec() *Exec { return &Exec{runner: CommandRunner{}} } +func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator) { + defer e.wg.Done() + + out, err := e.runner.Run(e, command) + if err != nil { + e.errc <- err + return + } + + if err = e.encodingParser.Parse(e.DataFormat, out, acc); err != nil { + e.errc <- err + } +} + +func (e *Exec) initConfig() error { + e.Lock() + defer e.Unlock() + + c := NewConfig(e.Commands, e.Tags, e.Templates, e.Separator) + c.WithDefaults() + if err := c.Validate(); err != nil { + return fmt.Errorf("exec configuration is error! ", err.Error()) + + } + e.config = c + + graphiteParser, err := graphite.NewParserWithOptions(graphite.Options{ + Templates: e.config.Templates, + DefaultTags: e.config.DefaultTags(), + Separator: e.config.Separator}) + + if err != nil { + return fmt.Errorf("exec input parser config is error! ", err.Error()) + } + + e.encodingParser = encoding.NewParser(graphiteParser) + + return nil +} + func (e *Exec) SampleConfig() string { return sampleConfig } func (e *Exec) Description() string { - return "Read flattened metrics from one or more commands that output JSON to stdout" + return "Read metrics from one or more commands that output graphite line protocol to stdout" } func (e *Exec) Gather(acc telegraf.Accumulator) error { - out, err := e.runner.Run(e) - if err != nil { - return err - } - switch e.DataFormat { - case "", "json": - var jsonOut interface{} - err = json.Unmarshal(out, &jsonOut) - if err != nil { - return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s", - e.Command, err) - } - - f := internal.JSONFlattener{} - err = f.FlattenJSON("", jsonOut) - if err != nil { + if !e.initedConfig { + if err := e.initConfig(); err != nil { return err } - acc.AddFields("exec", f.Fields, nil) - case "influx": - now := time.Now() - metrics, err := telegraf.ParseMetrics(out) - for _, metric := range metrics { - acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), now) - } - return err - default: - return fmt.Errorf("Unsupported data format: %s. Must be either json "+ - "or influx.", e.DataFormat) + e.initedConfig = true } - return nil + + e.Lock() + e.errc = make(chan error, 10) + e.Unlock() + + for _, command := range e.Commands { + e.wg.Add(1) + go e.ProcessCommand(command, acc) + } + e.wg.Wait() + + select { + default: + close(e.errc) + return nil + case err := <-e.errc: + close(e.errc) + return err + } + } func init() { diff --git a/plugins/inputs/exec/exec_test.go b/plugins/inputs/exec/exec_test.go index 709308fce..4be11e611 100644 --- a/plugins/inputs/exec/exec_test.go +++ b/plugins/inputs/exec/exec_test.go @@ -55,7 +55,7 @@ func newRunnerMock(out []byte, err error) Runner { } } -func (r runnerMock) Run(e *Exec) ([]byte, error) { +func (r runnerMock) Run(e *Exec, command string) ([]byte, error) { if r.err != nil { return nil, r.err } @@ -64,8 +64,8 @@ func (r runnerMock) Run(e *Exec) ([]byte, error) { func TestExec(t *testing.T) { e := &Exec{ - runner: newRunnerMock([]byte(validJson), nil), - Command: "testcommand arg1", + runner: newRunnerMock([]byte(validJson), nil), + Commands: []string{"testcommand arg1"}, } var acc testutil.Accumulator @@ -88,8 +88,8 @@ func TestExec(t *testing.T) { func TestExecMalformed(t *testing.T) { e := &Exec{ - runner: newRunnerMock([]byte(malformedJson), nil), - Command: "badcommand arg1", + runner: newRunnerMock([]byte(malformedJson), nil), + Commands: []string{"badcommand arg1"}, } var acc testutil.Accumulator @@ -100,8 +100,8 @@ func TestExecMalformed(t *testing.T) { func TestCommandError(t *testing.T) { e := &Exec{ - runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")), - Command: "badcommand", + runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")), + Commands: []string{"badcommand"}, } var acc testutil.Accumulator @@ -113,7 +113,7 @@ func TestCommandError(t *testing.T) { func TestLineProtocolParse(t *testing.T) { e := &Exec{ runner: newRunnerMock([]byte(lineProtocol), nil), - Command: "line-protocol", + Commands: []string{"line-protocol"}, DataFormat: "influx", } @@ -135,7 +135,7 @@ func TestLineProtocolParse(t *testing.T) { func TestLineProtocolParseMultiple(t *testing.T) { e := &Exec{ runner: newRunnerMock([]byte(lineProtocolMulti), nil), - Command: "line-protocol", + Commands: []string{"line-protocol"}, DataFormat: "influx", } @@ -162,7 +162,7 @@ func TestLineProtocolParseMultiple(t *testing.T) { func TestInvalidDataFormat(t *testing.T) { e := &Exec{ runner: newRunnerMock([]byte(lineProtocol), nil), - Command: "bad data format", + Commands: []string{"bad data format"}, DataFormat: "FooBar", } diff --git a/plugins/inputs/execline/README.md b/plugins/inputs/execline/README.md deleted file mode 100644 index d73520e11..000000000 --- a/plugins/inputs/execline/README.md +++ /dev/null @@ -1,151 +0,0 @@ -# ExecLine Plugin -The exec plugin can execute arbitrary commands which output graphite line protocol. - -## Parsing Metrics - -The graphite plugin allows measurements to be saved using the graphite line protocol. By default, enabling the graphite plugin will allow you to collect metrics and store them using the metric name as the measurement. If you send a metric named `servers.localhost.cpu.loadavg.10`, it will store the full metric name as the measurement with no extracted tags. - -While this default setup works, it is not the ideal way to store measurements in InfluxDB since it does not take advantage of tags. It also will not perform optimally with a large dataset sizes since queries will be forced to use regexes which is known to not scale well. - -To extract tags from metrics, one or more templates must be configured to parse metrics into tags and measurements. - -## Templates - -Templates allow matching parts of a metric name to be used as tag keys in the stored metric. They have a similar format to graphite metric names. The values in between the separators are used as the tag keys. The location of the tag key that matches the same position as the graphite metric section is used as the value. If there is no value, the graphite portion is skipped. - -The special value _measurement_ is used to define the measurement name. It can have a trailing `*` to indicate that the remainder of the metric should be used. If a _measurement_ is not specified, the full metric name is used. - -### Basic Matching - -`servers.localhost.cpu.loadavg.10` -* Template: `.host.resource.measurement*` -* Output: _measurement_ =`loadavg.10` _tags_ =`host=localhost resource=cpu` - -### Multiple Measurement Matching - -The _measurement_ can be specified multiple times in a template to provide more control over the measurement name. Multiple values -will be joined together using the _Separator_ config variable. By default, this value is `.`. - -`servers.localhost.cpu.cpu0.user` -* Template: `.host.measurement.cpu.measurement` -* Output: _measurement_ = `cpu.user` _tags_ = `host=localhost cpu=cpu0` - -Since '.' requires queries on measurements to be double-quoted, you may want to set this to `_` to simplify querying parsed metrics. - -`servers.localhost.cpu.cpu0.user` -* Separator: `_` -* Template: `.host.measurement.cpu.measurement` -* Output: _measurement_ = `cpu_user` _tags_ = `host=localhost cpu=cpu0` - -### Adding Tags - -Additional tags can be added to a metric that don't exist on the received metric. You can add additional tags by specifying them after the pattern. Tags have the same format as the line protocol. Multiple tags are separated by commas. - -`servers.localhost.cpu.loadavg.10` -* Template: `.host.resource.measurement* region=us-west,zone=1a` -* Output: _measurement_ = `loadavg.10` _tags_ = `host=localhost resource=cpu region=us-west zone=1a` - -### Fields - -A field key can be specified by using the keyword _field_. By default if no _field_ keyword is specified then the metric will be written to a field named _value_. - -When using the current default engine _BZ1_, it's recommended to use a single field per value for performance reasons. - -When using the _TSM1_ engine it's possible to amend measurement metrics with additional fields, e.g: - -Input: -``` -sensu.metric.net.server0.eth0.rx_packets 461295119435 1444234982 -sensu.metric.net.server0.eth0.tx_bytes 1093086493388480 1444234982 -sensu.metric.net.server0.eth0.rx_bytes 1015633926034834 1444234982 -sensu.metric.net.server0.eth0.tx_errors 0 1444234982 -sensu.metric.net.server0.eth0.rx_errors 0 1444234982 -sensu.metric.net.server0.eth0.tx_dropped 0 1444234982 -sensu.metric.net.server0.eth0.rx_dropped 0 1444234982 -``` - -With template: -``` -sensu.metric.* ..measurement.host.interface.field -``` - -Becomes database entry: -``` -> select * from net -name: net ---------- -time host interface rx_bytes rx_dropped rx_errors rx_packets tx_bytes tx_dropped tx_errors -1444234982000000000 server0 eth0 1.015633926034834e+15 0 0 4.61295119435e+11 1.09308649338848e+15 0 0 -``` - -## Multiple Templates - -One template may not match all metrics. For example, using multiple plugins with diamond will produce metrics in different formats. If you need to use multiple templates, you'll need to define a prefix filter that must match before the template can be applied. - -### Filters - -Filters have a similar format to templates but work more like wildcard expressions. When multiple filters would match a metric, the more specific one is chosen. Filters are configured by adding them before the template. - -For example, - -``` -servers.localhost.cpu.loadavg.10 -servers.host123.elasticsearch.cache_hits 100 -servers.host456.mysql.tx_count 10 -servers.host789.prod.mysql.tx_count 10 -``` -* `servers.*` would match all values -* `servers.*.mysql` would match `servers.host456.mysql.tx_count 10` -* `servers.localhost.*` would match `servers.localhost.cpu.loadavg` -* `servers.*.*.mysql` would match `servers.host789.prod.mysql.tx_count 10` - -## Default Templates - -If no template filters are defined or you want to just have one basic template, you can define a default template. This template will apply to any metric that has not already matched a filter. - -``` -dev.http.requests.200 -prod.myapp.errors.count -dev.db.queries.count -``` - -* `env.app.measurement*` would create - * _measurement_=`requests.200` _tags_=`env=dev,app=http` - * _measurement_= `errors.count` _tags_=`env=prod,app=myapp` - * _measurement_=`queries.count` _tags_=`env=dev,app=db` - -## Global Tags - -If you need to add the same set of tags to all metrics, you can define them globally at the plugin level and not within each template description. - - -# Minimal Config - -``` - # NOTE This execline plugin only reads numerical measurements output by commands, - # strings and booleans ill be ignored. - commands = ["/tmp/test.sh","/tmp/test2.sh"] # the bind address - - ### If matching multiple measurement files, this string will be used to join the matched values. - separator = "." - - ### Default tags that will be added to all metrics. These can be overridden at the template level - ### or by tags extracted from metric - tags = ["region=north-china", "zone=1c"] - - ### Each template line requires a template pattern. It can have an optional - ### filter before the template and separated by spaces. It can also have optional extra - ### tags following the template. Multiple tags should be separated by commas and no spaces - ### similar to the line protocol format. The can be only one default template. - ### Templates support below format: - ### filter + template - ### filter + template + extra tag - ### filter + template with field key - ### default template. Ignore the first graphite component "servers" - templates = [ - "*.app env.service.resource.measurement", - "stats.* .host.measurement* region=us-west,agent=sensu", - "stats2.* .host.measurement.field", - "measurement*" - ] -``` diff --git a/plugins/inputs/execline/config.go b/plugins/inputs/execline/config.go deleted file mode 100644 index 424b9e47d..000000000 --- a/plugins/inputs/execline/config.go +++ /dev/null @@ -1,172 +0,0 @@ -package execline - -import ( - "fmt" - "strings" - - "github.com/influxdata/influxdb/models" -) - -const ( - // DefaultSeparator is the default join character to use when joining multiple - // measurment parts in a template. - DefaultSeparator = "." -) - -// Config represents the configuration for Graphite endpoints. -type Config struct { - Commands []string - Separator string - Tags []string - Templates []string -} - -// WithDefaults takes the given config and returns a new config with any required -// default values set. -func (c *Config) WithDefaults() *Config { - d := *c - if d.Separator == "" { - d.Separator = DefaultSeparator - } - return &d -} - -// DefaultTags returns the config's tags. -func (c *Config) DefaultTags() models.Tags { - tags := models.Tags{} - for _, t := range c.Tags { - parts := strings.Split(t, "=") - tags[parts[0]] = parts[1] - } - return tags -} - -// Validate validates the config's templates and tags. -func (c *Config) Validate() error { - if err := c.validateTemplates(); err != nil { - return err - } - - if err := c.validateTags(); err != nil { - return err - } - - return nil -} - -func (c *Config) validateTemplates() error { - // map to keep track of filters we see - filters := map[string]struct{}{} - - for i, t := range c.Templates { - parts := strings.Fields(t) - // Ensure template string is non-empty - if len(parts) == 0 { - return fmt.Errorf("missing template at position: %d", i) - } - if len(parts) == 1 && parts[0] == "" { - return fmt.Errorf("missing template at position: %d", i) - } - - if len(parts) > 3 { - return fmt.Errorf("invalid template format: '%s'", t) - } - - template := t - filter := "" - tags := "" - if len(parts) >= 2 { - // We could have