Add graphite protocol support to exec plugin, and refactor the telegraf mertic paser to the internal/encoding direcotry.

This commit is contained in:
Henry Hu 2016-02-03 11:25:01 +08:00
parent f454ca7c8a
commit 3a54ef33f1
23 changed files with 374 additions and 2422 deletions

3
Godeps
View File

@ -56,5 +56,4 @@ golang.org/x/text 6d3c22c4525a4da167968fa2479be5524d2e8bd0
gopkg.in/dancannon/gorethink.v1 6f088135ff288deb9d5546f4c71919207f891a70
gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715
gopkg.in/mgo.v2 03c9f3ee4c14c8e51ee521a6a7d0425658dd6f64
gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4
github.com/hpcloud/tail 1a0242e795eeefe54261ff308dc685f7d29cc58c
gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4

View File

@ -60,5 +60,4 @@ golang.org/x/text 6fc2e00a0d64b1f7fc1212dae5b0c939cf6d9ac4
gopkg.in/dancannon/gorethink.v1 6f088135ff288deb9d5546f4c71919207f891a70
gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715
gopkg.in/mgo.v2 03c9f3ee4c14c8e51ee521a6a7d0425658dd6f64
gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4
github.com/hpcloud/tail 1a0242e795eeefe54261ff308dc685f7d29cc58c
gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4

View File

@ -1,4 +1,4 @@
package tail
package graphite
import (
"fmt"
@ -14,25 +14,14 @@ const (
)
// Config represents the configuration for Graphite endpoints.
type Config struct {
Files []string
type InnerConfig struct {
Separator string
Tags []string
Templates []string
}
// WithDefaults takes the given config and returns a new config with any required
// default values set.
func (c *Config) WithDefaults() *Config {
d := *c
if d.Separator == "" {
d.Separator = DefaultSeparator
}
return &d
}
// DefaultTags returns the config's tags.
func (c *Config) DefaultTags() models.Tags {
func (c *InnerConfig) DefaultTags() models.Tags {
tags := models.Tags{}
for _, t := range c.Tags {
parts := strings.Split(t, "=")
@ -42,7 +31,7 @@ func (c *Config) DefaultTags() models.Tags {
}
// Validate validates the config's templates and tags.
func (c *Config) Validate() error {
func (c *InnerConfig) Validate() error {
if err := c.validateTemplates(); err != nil {
return err
}
@ -54,7 +43,7 @@ func (c *Config) Validate() error {
return nil
}
func (c *Config) validateTemplates() error {
func (c *InnerConfig) validateTemplates() error {
// map to keep track of filters we see
filters := map[string]struct{}{}
@ -121,7 +110,7 @@ func (c *Config) validateTemplates() error {
return nil
}
func (c *Config) validateTags() error {
func (c *InnerConfig) validateTags() error {
for _, t := range c.Tags {
if err := c.validateTag(t); err != nil {
return err
@ -130,7 +119,7 @@ func (c *Config) validateTags() error {
return nil
}
func (c *Config) validateTemplate(template string) error {
func (c *InnerConfig) validateTemplate(template string) error {
hasMeasurement := false
for _, p := range strings.Split(template, ".") {
if p == "measurement" || p == "measurement*" {
@ -145,7 +134,7 @@ func (c *Config) validateTemplate(template string) error {
return nil
}
func (c *Config) validateFilter(filter string) error {
func (c *InnerConfig) validateFilter(filter string) error {
for _, p := range strings.Split(filter, ".") {
if p == "" {
return fmt.Errorf("filter contains blank section: %s", filter)
@ -158,7 +147,7 @@ func (c *Config) validateFilter(filter string) error {
return nil
}
func (c *Config) validateTag(keyValue string) error {
func (c *InnerConfig) validateTag(keyValue string) error {
parts := strings.Split(keyValue, "=")
if len(parts) != 2 {
return fmt.Errorf("invalid template tags: '%s'", keyValue)

View File

@ -1,13 +1,17 @@
package graphite
import (
"bytes"
"fmt"
"io"
"math"
"sort"
"strconv"
"strings"
"time"
"bufio"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/telegraf"
)
@ -93,6 +97,34 @@ func NewParser(templates []string, defaultTags models.Tags) (*Parser, error) {
})
}
func (p *Parser) ParseMetrics(buf []byte) ([]telegraf.Metric, error) {
// parse even if the buffer begins with a newline
buf = bytes.TrimPrefix(buf, []byte("\n"))
metrics := make([]telegraf.Metric, 0)
buffer := bytes.NewBuffer(buf)
reader := bufio.NewReader(buffer)
for {
// Read up to the next newline.
buf, err := reader.ReadBytes('\n')
if err == io.EOF {
return metrics, nil
}
if err != nil && err != io.EOF {
return metrics, err
}
// Trim the buffer, even though there should be no padding
line := strings.TrimSpace(string(buf))
if metric, err := p.Parse(line); err == nil {
metrics = append(metrics, metric)
}
}
return metrics, nil
}
// Parse performs Graphite parsing of a single line.
func (p *Parser) Parse(line string) (telegraf.Metric, error) {
// Break into 3 fields (name, value, timestamp).

View File

@ -0,0 +1,57 @@
package encoding
import (
"encoding/json"
"fmt"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/encoding/graphite"
)
type Parser struct {
graphiteParser *graphite.Parser
}
func NewParser(parser *graphite.Parser) *Parser {
return &Parser{graphiteParser: parser}
}
func (p *Parser) Parse(dataFormat string, out []byte, acc telegraf.Accumulator) error {
var err error
var metrics []telegraf.Metric
var metric telegraf.Metric
switch dataFormat {
case "json":
var jsonOut interface{}
err = json.Unmarshal(out, &jsonOut)
if err != nil {
err = fmt.Errorf("unable to parse out as JSON, %s", err)
break
}
f := internal.JSONFlattener{}
err = f.FlattenJSON("", jsonOut)
if err != nil {
break
}
acc.AddFields("exec", f.Fields, nil)
case "influx":
now := time.Now()
metrics, err = telegraf.ParseMetrics(out)
for _, metric = range metrics {
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), now)
}
case "graphite":
metrics, err = p.graphiteParser.ParseMetrics(out)
for _, metric = range metrics {
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
default:
err = fmt.Errorf("Unsupported data format: %s. Must be either json, influx or graphite ", dataFormat)
}
return err
}

View File

@ -8,9 +8,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/docker"
_ "github.com/influxdata/telegraf/plugins/inputs/elasticsearch"
_ "github.com/influxdata/telegraf/plugins/inputs/exec"
_ "github.com/influxdata/telegraf/plugins/inputs/execline"
_ "github.com/influxdata/telegraf/plugins/inputs/github_webhooks"
_ "github.com/influxdata/telegraf/plugins/inputs/graphite"
_ "github.com/influxdata/telegraf/plugins/inputs/haproxy"
_ "github.com/influxdata/telegraf/plugins/inputs/httpjson"
_ "github.com/influxdata/telegraf/plugins/inputs/influxdb"
@ -40,7 +38,6 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"
_ "github.com/influxdata/telegraf/plugins/inputs/statsd"
_ "github.com/influxdata/telegraf/plugins/inputs/system"
_ "github.com/influxdata/telegraf/plugins/inputs/tail"
_ "github.com/influxdata/telegraf/plugins/inputs/trig"
_ "github.com/influxdata/telegraf/plugins/inputs/twemproxy"
_ "github.com/influxdata/telegraf/plugins/inputs/win_perf_counters"

View File

@ -1,7 +1,23 @@
# Exec Input Plugin
The exec plugin can execute arbitrary commands which output JSON or
InfluxDB [line-protocol](https://docs.influxdata.com/influxdb/v0.9/write_protocols/line/).
The exec plugin can execute arbitrary commands which output:
* JSON
* InfluxDB [line-protocol](https://docs.influxdata.com/influxdb/v0.9/write_protocols/line/)
* Graphite [graphite-protocol](http://graphite.readthedocs.org/en/latest/feeding-carbon.html)
> Graphite understands messages with this format:
> ```
metric_path value timestamp\n
```
> __metric_path__ is the metric namespace that you want to populate.
> __value__ is the value that you want to assign to the metric at this time.
> __timestamp__ is the unix epoch time.
If using JSON, only numeric values are parsed and turned into floats. Booleans
and strings will be ignored.
@ -11,21 +27,44 @@ and strings will be ignored.
```
# Read flattened metrics from one or more commands that output JSON to stdout
[[inputs.exec]]
# the command to run
command = "/usr/bin/mycollector --foo=bar"
# Shell/commands array
commands = ["/tmp/test.sh","/tmp/test2.sh"]
# Data format to consume. This can be "json" or "influx" (line-protocol)
# Data format to consume. This can be "json", "influx" or "graphite" (line-protocol)
# NOTE json only reads numerical measurements, strings and booleans are ignored.
data_format = "json"
# measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
### Below configuration will be used for data_format = "graphite", can be ignored for other data_format
### If matching multiple measurement files, this string will be used to join the matched values.
#separator = "."
### Default tags that will be added to all metrics. These can be overridden at the template level
### or by tags extracted from metric
#tags = ["region=north-east", "zone=1c"]
### Each template line requires a template pattern. It can have an optional
### filter before the template and separated by spaces. It can also have optional extra
### tags following the template. Multiple tags should be separated by commas and no spaces
### similar to the line protocol format. The can be only one default template.
### Templates support below format:
### filter + template
### filter + template + extra tag
### filter + template with field key
### default template. Ignore the first graphite component "servers"
#templates = [
# "*.app env.service.resource.measurement",
# "stats.* .host.measurement* region=us-west,agent=sensu",
# "stats2.* .host.measurement.field",
# "measurement*"
#]
```
Other options for modifying the measurement names are:
```
name_override = "measurement_name"
name_prefix = "prefix_"
```
@ -80,3 +119,67 @@ cpu,cpu=cpu6,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
You will get data in InfluxDB exactly as it is defined above,
tags are cpu=cpuN, host=foo, and datacenter=us-east with fields usage_idle
and usage_busy. They will receive a timestamp at collection time.
### Example 3
We can also change the data_format to "graphite" to use the metrics collecting scripts such as (compatible with graphite):
* Nagios [Mertics Plugins] (https://exchange.nagios.org/directory/Plugins)
* Sensu [Mertics Plugins] (https://github.com/sensu-plugins)
#### Configuration
```
# Read flattened metrics from one or more commands that output JSON to stdout
[[inputs.exec]]
# Shell/commands array
commands = ["/tmp/test.sh","/tmp/test2.sh"]
# Data format to consume. This can be "json", "influx" or "graphite" (line-protocol)
# NOTE json only reads numerical measurements, strings and booleans are ignored.
data_format = "graphite"
# measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
### Below configuration will be used for data_format = "graphite", can be ignored for other data_format
### If matching multiple measurement files, this string will be used to join the matched values.
separator = "."
### Default tags that will be added to all metrics. These can be overridden at the template level
### or by tags extracted from metric
tags = ["region=north-east", "zone=1c"]
### Each template line requires a template pattern. It can have an optional
### filter before the template and separated by spaces. It can also have optional extra
### tags following the template. Multiple tags should be separated by commas and no spaces
### similar to the line protocol format. The can be only one default template.
### Templates support below format:
### filter + template
### filter + template + extra tag
### filter + template with field key
### default template. Ignore the first graphite component "servers"
templates = [
"*.app env.service.resource.measurement",
"stats.* .host.measurement* region=us-west,agent=sensu",
"stats2.* .host.measurement.field",
"measurement*"
]
```
And test.sh/test2.sh will output:
```
sensu.metric.net.server0.eth0.rx_packets 461295119435 1444234982
sensu.metric.net.server0.eth0.tx_bytes 1093086493388480 1444234982
sensu.metric.net.server0.eth0.rx_bytes 1015633926034834 1444234982
sensu.metric.net.server0.eth0.tx_errors 0 1444234982
sensu.metric.net.server0.eth0.rx_errors 0 1444234982
sensu.metric.net.server0.eth0.tx_dropped 0 1444234982
sensu.metric.net.server0.eth0.rx_dropped 0 1444234982
```
The templates configuration will be used to parse the graphite metrics to support influxdb/opentsdb tagging store engines.
More detail information about templates, please refer to [The graphite Input] (https://github.com/influxdata/influxdb/blob/master/services/graphite/README.md)

View File

@ -0,0 +1,39 @@
package exec
import (
"github.com/influxdata/telegraf/internal/encoding/graphite"
)
const (
// DefaultSeparator is the default join character to use when joining multiple
// measurment parts in a template.
DefaultSeparator = "."
)
// Config represents the configuration for Graphite endpoints.
type Config struct {
Commands []string
graphite.InnerConfig
}
// New Config instance.
func NewConfig(commands, tags, templates []string, separator string) *Config {
c := &Config{}
c.Commands = commands
c.Tags = tags
c.Templates = templates
c.Separator = separator
return c
}
// WithDefaults takes the given config and returns a new config with any required
// default values set.
func (c *Config) WithDefaults() *Config {
d := *c
if d.Separator == "" {
d.Separator = DefaultSeparator
}
return &d
}

View File

@ -2,55 +2,96 @@ package exec
import (
"bytes"
"encoding/json"
"fmt"
"os/exec"
"time"
"sync"
"github.com/gonuts/go-shellquote"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/encoding"
"github.com/influxdata/telegraf/internal/encoding/graphite"
"github.com/influxdata/telegraf/plugins/inputs"
)
const sampleConfig = `
# the command to run
command = "/usr/bin/mycollector --foo=bar"
# Shell/commands array
commands = ["/tmp/test.sh","/tmp/test2.sh"]
# Data format to consume. This can be "json" or "influx" (line-protocol)
# Data format to consume. This can be "json", "influx" or "graphite" (line-protocol)
# NOTE json only reads numerical measurements, strings and booleans are ignored.
data_format = "json"
# measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
### Below configuration will be used for data_format = "graphite", can be ignored for other data_format
### If matching multiple measurement files, this string will be used to join the matched values.
separator = "."
### Default tags that will be added to all metrics. These can be overridden at the template level
### or by tags extracted from metric
tags = ["region=north-east", "zone=1c"]
### Each template line requires a template pattern. It can have an optional
### filter before the template and separated by spaces. It can also have optional extra
### tags following the template. Multiple tags should be separated by commas and no spaces
### similar to the line protocol format. The can be only one default template.
### Templates support below format:
### filter + template
### filter + template + extra tag
### filter + template with field key
### default template. Ignore the first graphite component "servers"
templates = [
"*.app env.service.resource.measurement",
"stats.* .host.measurement* region=us-west,agent=sensu",
"stats2.* .host.measurement.field",
"measurement*"
]
`
type Exec struct {
Command string
Commands []string
DataFormat string
Separator string
Tags []string
Templates []string
encodingParser *encoding.Parser
config *Config
initedConfig bool
wg sync.WaitGroup
sync.Mutex
runner Runner
errc chan error
}
type Runner interface {
Run(*Exec) ([]byte, error)
Run(*Exec, string) ([]byte, error)
}
type CommandRunner struct{}
func (c CommandRunner) Run(e *Exec) ([]byte, error) {
split_cmd, err := shellquote.Split(e.Command)
func (c CommandRunner) Run(e *Exec, command string) ([]byte, error) {
split_cmd, err := shellquote.Split(command)
if err != nil || len(split_cmd) == 0 {
return nil, fmt.Errorf("exec: unable to parse command, %s", err)
}
cmd := exec.Command(split_cmd[0], split_cmd[1:]...)
//name := strings.Replace(filepath.Base(cmd.Path), "/", "_", -1)
//name = strings.Replace(name, ".", "_", -1)
var out bytes.Buffer
cmd.Stdout = &out
if err := cmd.Run(); err != nil {
return nil, fmt.Errorf("exec: %s for command '%s'", err, e.Command)
return nil, fmt.Errorf("exec: %s for command '%s'", err, command)
}
return out.Bytes(), nil
@ -60,47 +101,82 @@ func NewExec() *Exec {
return &Exec{runner: CommandRunner{}}
}
func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator) {
defer e.wg.Done()
out, err := e.runner.Run(e, command)
if err != nil {
e.errc <- err
return
}
if err = e.encodingParser.Parse(e.DataFormat, out, acc); err != nil {
e.errc <- err
}
}
func (e *Exec) initConfig() error {
e.Lock()
defer e.Unlock()
c := NewConfig(e.Commands, e.Tags, e.Templates, e.Separator)
c.WithDefaults()
if err := c.Validate(); err != nil {
return fmt.Errorf("exec configuration is error! ", err.Error())
}
e.config = c
graphiteParser, err := graphite.NewParserWithOptions(graphite.Options{
Templates: e.config.Templates,
DefaultTags: e.config.DefaultTags(),
Separator: e.config.Separator})
if err != nil {
return fmt.Errorf("exec input parser config is error! ", err.Error())
}
e.encodingParser = encoding.NewParser(graphiteParser)
return nil
}
func (e *Exec) SampleConfig() string {
return sampleConfig
}
func (e *Exec) Description() string {
return "Read flattened metrics from one or more commands that output JSON to stdout"
return "Read metrics from one or more commands that output graphite line protocol to stdout"
}
func (e *Exec) Gather(acc telegraf.Accumulator) error {
out, err := e.runner.Run(e)
if err != nil {
return err
}
switch e.DataFormat {
case "", "json":
var jsonOut interface{}
err = json.Unmarshal(out, &jsonOut)
if err != nil {
return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s",
e.Command, err)
}
f := internal.JSONFlattener{}
err = f.FlattenJSON("", jsonOut)
if err != nil {
if !e.initedConfig {
if err := e.initConfig(); err != nil {
return err
}
acc.AddFields("exec", f.Fields, nil)
case "influx":
now := time.Now()
metrics, err := telegraf.ParseMetrics(out)
for _, metric := range metrics {
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), now)
}
return err
default:
return fmt.Errorf("Unsupported data format: %s. Must be either json "+
"or influx.", e.DataFormat)
e.initedConfig = true
}
return nil
e.Lock()
e.errc = make(chan error, 10)
e.Unlock()
for _, command := range e.Commands {
e.wg.Add(1)
go e.ProcessCommand(command, acc)
}
e.wg.Wait()
select {
default:
close(e.errc)
return nil
case err := <-e.errc:
close(e.errc)
return err
}
}
func init() {

View File

@ -55,7 +55,7 @@ func newRunnerMock(out []byte, err error) Runner {
}
}
func (r runnerMock) Run(e *Exec) ([]byte, error) {
func (r runnerMock) Run(e *Exec, command string) ([]byte, error) {
if r.err != nil {
return nil, r.err
}
@ -64,8 +64,8 @@ func (r runnerMock) Run(e *Exec) ([]byte, error) {
func TestExec(t *testing.T) {
e := &Exec{
runner: newRunnerMock([]byte(validJson), nil),
Command: "testcommand arg1",
runner: newRunnerMock([]byte(validJson), nil),
Commands: []string{"testcommand arg1"},
}
var acc testutil.Accumulator
@ -88,8 +88,8 @@ func TestExec(t *testing.T) {
func TestExecMalformed(t *testing.T) {
e := &Exec{
runner: newRunnerMock([]byte(malformedJson), nil),
Command: "badcommand arg1",
runner: newRunnerMock([]byte(malformedJson), nil),
Commands: []string{"badcommand arg1"},
}
var acc testutil.Accumulator
@ -100,8 +100,8 @@ func TestExecMalformed(t *testing.T) {
func TestCommandError(t *testing.T) {
e := &Exec{
runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")),
Command: "badcommand",
runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")),
Commands: []string{"badcommand"},
}
var acc testutil.Accumulator
@ -113,7 +113,7 @@ func TestCommandError(t *testing.T) {
func TestLineProtocolParse(t *testing.T) {
e := &Exec{
runner: newRunnerMock([]byte(lineProtocol), nil),
Command: "line-protocol",
Commands: []string{"line-protocol"},
DataFormat: "influx",
}
@ -135,7 +135,7 @@ func TestLineProtocolParse(t *testing.T) {
func TestLineProtocolParseMultiple(t *testing.T) {
e := &Exec{
runner: newRunnerMock([]byte(lineProtocolMulti), nil),
Command: "line-protocol",
Commands: []string{"line-protocol"},
DataFormat: "influx",
}
@ -162,7 +162,7 @@ func TestLineProtocolParseMultiple(t *testing.T) {
func TestInvalidDataFormat(t *testing.T) {
e := &Exec{
runner: newRunnerMock([]byte(lineProtocol), nil),
Command: "bad data format",
Commands: []string{"bad data format"},
DataFormat: "FooBar",
}

View File

@ -1,151 +0,0 @@
# ExecLine Plugin
The exec plugin can execute arbitrary commands which output graphite line protocol.
## Parsing Metrics
The graphite plugin allows measurements to be saved using the graphite line protocol. By default, enabling the graphite plugin will allow you to collect metrics and store them using the metric name as the measurement. If you send a metric named `servers.localhost.cpu.loadavg.10`, it will store the full metric name as the measurement with no extracted tags.
While this default setup works, it is not the ideal way to store measurements in InfluxDB since it does not take advantage of tags. It also will not perform optimally with a large dataset sizes since queries will be forced to use regexes which is known to not scale well.
To extract tags from metrics, one or more templates must be configured to parse metrics into tags and measurements.
## Templates
Templates allow matching parts of a metric name to be used as tag keys in the stored metric. They have a similar format to graphite metric names. The values in between the separators are used as the tag keys. The location of the tag key that matches the same position as the graphite metric section is used as the value. If there is no value, the graphite portion is skipped.
The special value _measurement_ is used to define the measurement name. It can have a trailing `*` to indicate that the remainder of the metric should be used. If a _measurement_ is not specified, the full metric name is used.
### Basic Matching
`servers.localhost.cpu.loadavg.10`
* Template: `.host.resource.measurement*`
* Output: _measurement_ =`loadavg.10` _tags_ =`host=localhost resource=cpu`
### Multiple Measurement Matching
The _measurement_ can be specified multiple times in a template to provide more control over the measurement name. Multiple values
will be joined together using the _Separator_ config variable. By default, this value is `.`.
`servers.localhost.cpu.cpu0.user`
* Template: `.host.measurement.cpu.measurement`
* Output: _measurement_ = `cpu.user` _tags_ = `host=localhost cpu=cpu0`
Since '.' requires queries on measurements to be double-quoted, you may want to set this to `_` to simplify querying parsed metrics.
`servers.localhost.cpu.cpu0.user`
* Separator: `_`
* Template: `.host.measurement.cpu.measurement`
* Output: _measurement_ = `cpu_user` _tags_ = `host=localhost cpu=cpu0`
### Adding Tags
Additional tags can be added to a metric that don't exist on the received metric. You can add additional tags by specifying them after the pattern. Tags have the same format as the line protocol. Multiple tags are separated by commas.
`servers.localhost.cpu.loadavg.10`
* Template: `.host.resource.measurement* region=us-west,zone=1a`
* Output: _measurement_ = `loadavg.10` _tags_ = `host=localhost resource=cpu region=us-west zone=1a`
### Fields
A field key can be specified by using the keyword _field_. By default if no _field_ keyword is specified then the metric will be written to a field named _value_.
When using the current default engine _BZ1_, it's recommended to use a single field per value for performance reasons.
When using the _TSM1_ engine it's possible to amend measurement metrics with additional fields, e.g:
Input:
```
sensu.metric.net.server0.eth0.rx_packets 461295119435 1444234982
sensu.metric.net.server0.eth0.tx_bytes 1093086493388480 1444234982
sensu.metric.net.server0.eth0.rx_bytes 1015633926034834 1444234982
sensu.metric.net.server0.eth0.tx_errors 0 1444234982
sensu.metric.net.server0.eth0.rx_errors 0 1444234982
sensu.metric.net.server0.eth0.tx_dropped 0 1444234982
sensu.metric.net.server0.eth0.rx_dropped 0 1444234982
```
With template:
```
sensu.metric.* ..measurement.host.interface.field
```
Becomes database entry:
```
> select * from net
name: net
---------
time host interface rx_bytes rx_dropped rx_errors rx_packets tx_bytes tx_dropped tx_errors
1444234982000000000 server0 eth0 1.015633926034834e+15 0 0 4.61295119435e+11 1.09308649338848e+15 0 0
```
## Multiple Templates
One template may not match all metrics. For example, using multiple plugins with diamond will produce metrics in different formats. If you need to use multiple templates, you'll need to define a prefix filter that must match before the template can be applied.
### Filters
Filters have a similar format to templates but work more like wildcard expressions. When multiple filters would match a metric, the more specific one is chosen. Filters are configured by adding them before the template.
For example,
```
servers.localhost.cpu.loadavg.10
servers.host123.elasticsearch.cache_hits 100
servers.host456.mysql.tx_count 10
servers.host789.prod.mysql.tx_count 10
```
* `servers.*` would match all values
* `servers.*.mysql` would match `servers.host456.mysql.tx_count 10`
* `servers.localhost.*` would match `servers.localhost.cpu.loadavg`
* `servers.*.*.mysql` would match `servers.host789.prod.mysql.tx_count 10`
## Default Templates
If no template filters are defined or you want to just have one basic template, you can define a default template. This template will apply to any metric that has not already matched a filter.
```
dev.http.requests.200
prod.myapp.errors.count
dev.db.queries.count
```
* `env.app.measurement*` would create
* _measurement_=`requests.200` _tags_=`env=dev,app=http`
* _measurement_= `errors.count` _tags_=`env=prod,app=myapp`
* _measurement_=`queries.count` _tags_=`env=dev,app=db`
## Global Tags
If you need to add the same set of tags to all metrics, you can define them globally at the plugin level and not within each template description.
# Minimal Config
```
# NOTE This execline plugin only reads numerical measurements output by commands,
# strings and booleans ill be ignored.
commands = ["/tmp/test.sh","/tmp/test2.sh"] # the bind address
### If matching multiple measurement files, this string will be used to join the matched values.
separator = "."
### Default tags that will be added to all metrics. These can be overridden at the template level
### or by tags extracted from metric
tags = ["region=north-china", "zone=1c"]
### Each template line requires a template pattern. It can have an optional
### filter before the template and separated by spaces. It can also have optional extra
### tags following the template. Multiple tags should be separated by commas and no spaces
### similar to the line protocol format. The can be only one default template.
### Templates support below format:
### filter + template
### filter + template + extra tag
### filter + template with field key
### default template. Ignore the first graphite component "servers"
templates = [
"*.app env.service.resource.measurement",
"stats.* .host.measurement* region=us-west,agent=sensu",
"stats2.* .host.measurement.field",
"measurement*"
]
```

View File

@ -1,172 +0,0 @@
package execline
import (
"fmt"
"strings"
"github.com/influxdata/influxdb/models"
)
const (
// DefaultSeparator is the default join character to use when joining multiple
// measurment parts in a template.
DefaultSeparator = "."
)
// Config represents the configuration for Graphite endpoints.
type Config struct {
Commands []string
Separator string
Tags []string
Templates []string
}
// WithDefaults takes the given config and returns a new config with any required
// default values set.
func (c *Config) WithDefaults() *Config {
d := *c
if d.Separator == "" {
d.Separator = DefaultSeparator
}
return &d
}
// DefaultTags returns the config's tags.
func (c *Config) DefaultTags() models.Tags {
tags := models.Tags{}
for _, t := range c.Tags {
parts := strings.Split(t, "=")
tags[parts[0]] = parts[1]
}
return tags
}
// Validate validates the config's templates and tags.
func (c *Config) Validate() error {
if err := c.validateTemplates(); err != nil {
return err
}
if err := c.validateTags(); err != nil {
return err
}
return nil
}
func (c *Config) validateTemplates() error {
// map to keep track of filters we see
filters := map[string]struct{}{}
for i, t := range c.Templates {
parts := strings.Fields(t)
// Ensure template string is non-empty
if len(parts) == 0 {
return fmt.Errorf("missing template at position: %d", i)
}
if len(parts) == 1 && parts[0] == "" {
return fmt.Errorf("missing template at position: %d", i)
}
if len(parts) > 3 {
return fmt.Errorf("invalid template format: '%s'", t)
}
template := t
filter := ""
tags := ""
if len(parts) >= 2 {
// We could have <filter> <template> or <template> <tags>. Equals is only allowed in
// tags section.
if strings.Contains(parts[1], "=") {
template = parts[0]
tags = parts[1]
} else {
filter = parts[0]
template = parts[1]
}
}
if len(parts) == 3 {
tags = parts[2]
}
// Validate the template has one and only one measurement
if err := c.validateTemplate(template); err != nil {
return err
}
// Prevent duplicate filters in the config
if _, ok := filters[filter]; ok {
return fmt.Errorf("duplicate filter '%s' found at position: %d", filter, i)
}
filters[filter] = struct{}{}
if filter != "" {
// Validate filter expression is valid
if err := c.validateFilter(filter); err != nil {
return err
}
}
if tags != "" {
// Validate tags
for _, tagStr := range strings.Split(tags, ",") {
if err := c.validateTag(tagStr); err != nil {
return err
}
}
}
}
return nil
}
func (c *Config) validateTags() error {
for _, t := range c.Tags {
if err := c.validateTag(t); err != nil {
return err
}
}
return nil
}
func (c *Config) validateTemplate(template string) error {
hasMeasurement := false
for _, p := range strings.Split(template, ".") {
if p == "measurement" || p == "measurement*" {
hasMeasurement = true
}
}
if !hasMeasurement {
return fmt.Errorf("no measurement in template `%s`", template)
}
return nil
}
func (c *Config) validateFilter(filter string) error {
for _, p := range strings.Split(filter, ".") {
if p == "" {
return fmt.Errorf("filter contains blank section: %s", filter)
}
if strings.Contains(p, "*") && p != "*" {
return fmt.Errorf("invalid filter wildcard section: %s", filter)
}
}
return nil
}
func (c *Config) validateTag(keyValue string) error {
parts := strings.Split(keyValue, "=")
if len(parts) != 2 {
return fmt.Errorf("invalid template tags: '%s'", keyValue)
}
if parts[0] == "" || parts[1] == "" {
return fmt.Errorf("invalid template tags: %s'", keyValue)
}
return nil
}

View File

@ -1,14 +0,0 @@
package execline
import "fmt"
// An UnsupposedValueError is returned when a parsed value is not
// supposed.
type UnsupposedValueError struct {
Field string
Value float64
}
func (err *UnsupposedValueError) Error() string {
return fmt.Sprintf(`field "%s" value: "%v" is unsupported`, err.Field, err.Value)
}

View File

@ -1,179 +0,0 @@
package execline
import (
"bufio"
"bytes"
"fmt"
"math"
"os/exec"
"path/filepath"
"strings"
"sync"
"github.com/gonuts/go-shellquote"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
const sampleConfig = `
# NOTE This execline plugin only reads numerical measurements output by commands,
# strings and booleans ill be ignored.
commands = ["/tmp/test.sh","/tmp/test2.sh"] # the bind address
### If matching multiple measurement files, this string will be used to join the matched values.
separator = "."
### Default tags that will be added to all metrics. These can be overridden at the template level
### or by tags extracted from metric
tags = ["region=north-china", "zone=1c"]
### Each template line requires a template pattern. It can have an optional
### filter before the template and separated by spaces. It can also have optional extra
### tags following the template. Multiple tags should be separated by commas and no spaces
### similar to the line protocol format. The can be only one default template.
### Templates support below format:
### filter + template
### filter + template + extra tag
### filter + template with field key
### default template. Ignore the first graphite component "servers"
templates = [
"*.app env.service.resource.measurement",
"stats.* .host.measurement* region=us-west,agent=sensu",
"stats2.* .host.measurement.field",
"measurement*"
]
`
type ExecLine struct {
Commands []string
Separator string
Tags []string
Templates []string
parser *Parser
config *Config
initedConfig bool
wg sync.WaitGroup
sync.Mutex
}
func (e *ExecLine) Run(command string, acc telegraf.Accumulator) error {
defer e.wg.Done()
split_cmd, err := shellquote.Split(command)
if err != nil || len(split_cmd) == 0 {
return fmt.Errorf("execline: unable to parse command, %s", err)
}
cmd := exec.Command(split_cmd[0], split_cmd[1:]...)
name := strings.Replace(filepath.Base(cmd.Path), "/", "_", -1)
name = strings.Replace(name, ".", "_", -1)
var out bytes.Buffer
cmd.Stdout = &out
if err := cmd.Run(); err != nil {
return fmt.Errorf("execline: %s for command '%s'", err, command)
}
reader := bufio.NewReader(&out)
for {
// Read up to the next newline.
buf, err := reader.ReadBytes('\n')
if err != nil {
return err
}
// Trim the buffer, even though there should be no padding
line := strings.TrimSpace(string(buf))
e.handleLine(name, acc, line)
}
return nil
}
func (e *ExecLine) handleLine(name string, acc telegraf.Accumulator, line string) {
if line == "" {
return
}
// Parse it.
metric, err := e.parser.Parse(line)
if err != nil {
switch err := err.(type) {
case *UnsupposedValueError:
// Graphite ignores NaN values with no error.
if math.IsNaN(err.Value) {
return
}
}
fmt.Errorf("unable to parse line: %s: %s", line, err)
return
}
acc.AddFields(name+"."+metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
func (e *ExecLine) initConfig() error {
e.Lock()
defer e.Unlock()
c := &Config{
Commands: e.Commands,
Separator: e.Separator,
Tags: e.Tags,
Templates: e.Templates,
}
c.WithDefaults()
if err := c.Validate(); err != nil {
return fmt.Errorf("ExecLine configuration is error! ", err.Error())
}
e.config = c
parser, err := NewParserWithOptions(Options{
Templates: e.config.Templates,
DefaultTags: e.config.DefaultTags(),
Separator: e.config.Separator})
if err != nil {
return fmt.Errorf("ExecLine input parser config is error! ", err.Error())
}
e.parser = parser
return nil
}
func (e *ExecLine) SampleConfig() string {
return sampleConfig
}
func (e *ExecLine) Description() string {
return "Read metrics from one or more commands that output graphite line protocol to stdout"
}
func (e *ExecLine) Gather(acc telegraf.Accumulator) error {
if !e.initedConfig {
if err := e.initConfig(); err != nil {
return err
}
e.initedConfig = true
}
for _, command := range e.Commands {
e.wg.Add(1)
go e.Run(command, acc)
}
e.wg.Wait()
return nil
}
func init() {
inputs.Add("execline", func() telegraf.Input {
return &ExecLine{}
})
}

View File

@ -1,392 +0,0 @@
package execline
import (
"fmt"
"math"
"sort"
"strconv"
"strings"
"time"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/telegraf"
)
// Minimum and maximum supported dates for timestamps.
var (
MinDate = time.Date(1901, 12, 13, 0, 0, 0, 0, time.UTC)
MaxDate = time.Date(2038, 1, 19, 0, 0, 0, 0, time.UTC)
)
var defaultTemplate *template
func init() {
var err error
defaultTemplate, err = NewTemplate("measurement*", nil, DefaultSeparator)
if err != nil {
panic(err)
}
}
// Parser encapsulates a Graphite Parser.
type Parser struct {
matcher *matcher
tags models.Tags
}
// Options are configurable values that can be provided to a Parser
type Options struct {
Separator string
Templates []string
DefaultTags models.Tags
}
// NewParserWithOptions returns a graphite parser using the given options
func NewParserWithOptions(options Options) (*Parser, error) {
matcher := newMatcher()
matcher.AddDefaultTemplate(defaultTemplate)
for _, pattern := range options.Templates {
template := pattern
filter := ""
// Format is [filter] <template> [tag1=value1,tag2=value2]
parts := strings.Fields(pattern)
if len(parts) < 1 {
continue
} else if len(parts) >= 2 {
if strings.Contains(parts[1], "=") {
template = parts[0]
} else {
filter = parts[0]
template = parts[1]
}
}
// Parse out the default tags specific to this template
tags := models.Tags{}
if strings.Contains(parts[len(parts)-1], "=") {
tagStrs := strings.Split(parts[len(parts)-1], ",")
for _, kv := range tagStrs {
parts := strings.Split(kv, "=")
tags[parts[0]] = parts[1]
}
}
tmpl, err := NewTemplate(template, tags, options.Separator)
if err != nil {
return nil, err
}
matcher.Add(filter, tmpl)
}
return &Parser{matcher: matcher, tags: options.DefaultTags}, nil
}
// NewParser returns a GraphiteParser instance.
func NewParser(templates []string, defaultTags models.Tags) (*Parser, error) {
return NewParserWithOptions(
Options{
Templates: templates,
DefaultTags: defaultTags,
Separator: DefaultSeparator,
})
}
// Parse performs Graphite parsing of a single line.
func (p *Parser) Parse(line string) (telegraf.Metric, error) {
// Break into 3 fields (name, value, timestamp).
fields := strings.Fields(line)
if len(fields) != 2 && len(fields) != 3 {
return nil, fmt.Errorf("received %q which doesn't have required fields", line)
}
// decode the name and tags
template := p.matcher.Match(fields[0])
measurement, tags, field, err := template.Apply(fields[0])
if err != nil {
return nil, err
}
// Could not extract measurement, use the raw value
if measurement == "" {
measurement = fields[0]
}
// Parse value.
v, err := strconv.ParseFloat(fields[1], 64)
if err != nil {
return nil, fmt.Errorf(`field "%s" value: %s`, fields[0], err)
}
if math.IsNaN(v) || math.IsInf(v, 0) {
return nil, &UnsupposedValueError{Field: fields[0], Value: v}
}
fieldValues := map[string]interface{}{}
if field != "" {
fieldValues[field] = v
} else {
fieldValues["value"] = v
}
// If no 3rd field, use now as timestamp
timestamp := time.Now().UTC()
if len(fields) == 3 {
// Parse timestamp.
unixTime, err := strconv.ParseFloat(fields[2], 64)
if err != nil {
return nil, fmt.Errorf(`field "%s" time: %s`, fields[0], err)
}
// -1 is a special value that gets converted to current UTC time
// See https://github.com/graphite-project/carbon/issues/54
if unixTime != float64(-1) {
// Check if we have fractional seconds
timestamp = time.Unix(int64(unixTime), int64((unixTime-math.Floor(unixTime))*float64(time.Second)))
if timestamp.Before(MinDate) || timestamp.After(MaxDate) {
return nil, fmt.Errorf("timestamp out of range")
}
}
}
// Set the default tags on the point if they are not already set
for k, v := range p.tags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}
return telegraf.NewMetric(measurement, tags, fieldValues, timestamp)
}
// ApplyTemplate extracts the template fields from the given line and
// returns the measurement name and tags.
func (p *Parser) ApplyTemplate(line string) (string, map[string]string, string, error) {
// Break line into fields (name, value, timestamp), only name is used
fields := strings.Fields(line)
if len(fields) == 0 {
return "", make(map[string]string), "", nil
}
// decode the name and tags
template := p.matcher.Match(fields[0])
name, tags, field, err := template.Apply(fields[0])
// Set the default tags on the point if they are not already set
for k, v := range p.tags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}
return name, tags, field, err
}
// template represents a pattern and tags to map a graphite metric string to a influxdb Point
type template struct {
tags []string
defaultTags models.Tags
greedyMeasurement bool
separator string
}
// NewTemplate returns a new template ensuring it has a measurement
// specified.
func NewTemplate(pattern string, defaultTags models.Tags, separator string) (*template, error) {
tags := strings.Split(pattern, ".")
hasMeasurement := false
template := &template{tags: tags, defaultTags: defaultTags, separator: separator}
for _, tag := range tags {
if strings.HasPrefix(tag, "measurement") {
hasMeasurement = true
}
if tag == "measurement*" {
template.greedyMeasurement = true
}
}
if !hasMeasurement {
return nil, fmt.Errorf("no measurement specified for template. %q", pattern)
}
return template, nil
}
// Apply extracts the template fields from the given line and returns the measurement
// name and tags
func (t *template) Apply(line string) (string, map[string]string, string, error) {
fields := strings.Split(line, ".")
var (
measurement []string
tags = make(map[string]string)
field string
)
// Set any default tags
for k, v := range t.defaultTags {
tags[k] = v
}
for i, tag := range t.tags {
if i >= len(fields) {
continue
}
if tag == "measurement" {
measurement = append(measurement, fields[i])
} else if tag == "field" {
if len(field) != 0 {
return "", nil, "", fmt.Errorf("'field' can only be used once in each template: %q", line)
}
field = fields[i]
} else if tag == "measurement*" {
measurement = append(measurement, fields[i:]...)
break
} else if tag != "" {
tags[tag] = fields[i]
}
}
return strings.Join(measurement, t.separator), tags, field, nil
}
// matcher determines which template should be applied to a given metric
// based on a filter tree.
type matcher struct {
root *node
defaultTemplate *template
}
func newMatcher() *matcher {
return &matcher{
root: &node{},
}
}
// Add inserts the template in the filter tree based the given filter
func (m *matcher) Add(filter string, template *template) {
if filter == "" {
m.AddDefaultTemplate(template)
return
}
m.root.Insert(filter, template)
}
func (m *matcher) AddDefaultTemplate(template *template) {
m.defaultTemplate = template
}
// Match returns the template that matches the given graphite line
func (m *matcher) Match(line string) *template {
tmpl := m.root.Search(line)
if tmpl != nil {
return tmpl
}
return m.defaultTemplate
}
// node is an item in a sorted k-ary tree. Each child is sorted by its value.
// The special value of "*", is always last.
type node struct {
value string
children nodes
template *template
}
func (n *node) insert(values []string, template *template) {
// Add the end, set the template
if len(values) == 0 {
n.template = template
return
}
// See if the the current element already exists in the tree. If so, insert the
// into that sub-tree
for _, v := range n.children {
if v.value == values[0] {
v.insert(values[1:], template)
return
}
}
// New element, add it to the tree and sort the children
newNode := &node{value: values[0]}
n.children = append(n.children, newNode)
sort.Sort(&n.children)
// Now insert the rest of the tree into the new element
newNode.insert(values[1:], template)
}
// Insert inserts the given string template into the tree. The filter string is separated
// on "." and each part is used as the path in the tree.
func (n *node) Insert(filter string, template *template) {
n.insert(strings.Split(filter, "."), template)
}
func (n *node) search(lineParts []string) *template {
// Nothing to search
if len(lineParts) == 0 || len(n.children) == 0 {
return n.template
}
// If last element is a wildcard, don't include in this search since it's sorted
// to the end but lexicographically it would not always be and sort.Search assumes
// the slice is sorted.
length := len(n.children)
if n.children[length-1].value == "*" {
length--
}
// Find the index of child with an exact match
i := sort.Search(length, func(i int) bool {
return n.children[i].value >= lineParts[0]
})
// Found an exact match, so search that child sub-tree
if i < len(n.children) && n.children[i].value == lineParts[0] {
return n.children[i].search(lineParts[1:])
}
// Not an exact match, see if we have a wildcard child to search
if n.children[len(n.children)-1].value == "*" {
return n.children[len(n.children)-1].search(lineParts[1:])
}
return n.template
}
func (n *node) Search(line string) *template {
return n.search(strings.Split(line, "."))
}
type nodes []*node
// Less returns a boolean indicating whether the filter at position j
// is less than the filter at position k. Filters are order by string
// comparison of each component parts. A wildcard value "*" is never
// less than a non-wildcard value.
//
// For example, the filters:
// "*.*"
// "servers.*"
// "servers.localhost"
// "*.localhost"
//
// Would be sorted as:
// "servers.localhost"
// "servers.*"
// "*.localhost"
// "*.*"
func (n *nodes) Less(j, k int) bool {
if (*n)[j].value == "*" && (*n)[k].value != "*" {
return false
}
if (*n)[j].value != "*" && (*n)[k].value == "*" {
return true
}
return (*n)[j].value < (*n)[k].value
}
func (n *nodes) Swap(i, j int) { (*n)[i], (*n)[j] = (*n)[j], (*n)[i] }
func (n *nodes) Len() int { return len(*n) }

View File

@ -1,160 +0,0 @@
# The graphite Input
## A note on UDP/IP OS Buffer sizes
If you're using UDP input and running Linux or FreeBSD, please adjust your UDP buffer
size limit, [see here for more details.](../udp/README.md#a-note-on-udpip-os-buffer-sizes)
## Configuration
Each Graphite input allows the binding address, and protocol to be set.
## Parsing Metrics
The graphite plugin allows measurements to be saved using the graphite line protocol. By default, enabling the graphite plugin will allow you to collect metrics and store them using the metric name as the measurement. If you send a metric named `servers.localhost.cpu.loadavg.10`, it will store the full metric name as the measurement with no extracted tags.
While this default setup works, it is not the ideal way to store measurements in InfluxDB since it does not take advantage of tags. It also will not perform optimally with a large dataset sizes since queries will be forced to use regexes which is known to not scale well.
To extract tags from metrics, one or more templates must be configured to parse metrics into tags and measurements.
## Templates
Templates allow matching parts of a metric name to be used as tag keys in the stored metric. They have a similar format to graphite metric names. The values in between the separators are used as the tag keys. The location of the tag key that matches the same position as the graphite metric section is used as the value. If there is no value, the graphite portion is skipped.
The special value _measurement_ is used to define the measurement name. It can have a trailing `*` to indicate that the remainder of the metric should be used. If a _measurement_ is not specified, the full metric name is used.
### Basic Matching
`servers.localhost.cpu.loadavg.10`
* Template: `.host.resource.measurement*`
* Output: _measurement_ =`loadavg.10` _tags_ =`host=localhost resource=cpu`
### Multiple Measurement Matching
The _measurement_ can be specified multiple times in a template to provide more control over the measurement name. Multiple values
will be joined together using the _Separator_ config variable. By default, this value is `.`.
`servers.localhost.cpu.cpu0.user`
* Template: `.host.measurement.cpu.measurement`
* Output: _measurement_ = `cpu.user` _tags_ = `host=localhost cpu=cpu0`
Since '.' requires queries on measurements to be double-quoted, you may want to set this to `_` to simplify querying parsed metrics.
`servers.localhost.cpu.cpu0.user`
* Separator: `_`
* Template: `.host.measurement.cpu.measurement`
* Output: _measurement_ = `cpu_user` _tags_ = `host=localhost cpu=cpu0`
### Adding Tags
Additional tags can be added to a metric that don't exist on the received metric. You can add additional tags by specifying them after the pattern. Tags have the same format as the line protocol. Multiple tags are separated by commas.
`servers.localhost.cpu.loadavg.10`
* Template: `.host.resource.measurement* region=us-west,zone=1a`
* Output: _measurement_ = `loadavg.10` _tags_ = `host=localhost resource=cpu region=us-west zone=1a`
### Fields
A field key can be specified by using the keyword _field_. By default if no _field_ keyword is specified then the metric will be written to a field named _value_.
When using the current default engine _BZ1_, it's recommended to use a single field per value for performance reasons.
When using the _TSM1_ engine it's possible to amend measurement metrics with additional fields, e.g:
Input:
```
sensu.metric.net.server0.eth0.rx_packets 461295119435 1444234982
sensu.metric.net.server0.eth0.tx_bytes 1093086493388480 1444234982
sensu.metric.net.server0.eth0.rx_bytes 1015633926034834 1444234982
sensu.metric.net.server0.eth0.tx_errors 0 1444234982
sensu.metric.net.server0.eth0.rx_errors 0 1444234982
sensu.metric.net.server0.eth0.tx_dropped 0 1444234982
sensu.metric.net.server0.eth0.rx_dropped 0 1444234982
```
With template:
```
sensu.metric.* ..measurement.host.interface.field
```
Becomes database entry:
```
> select * from net
name: net
---------
time host interface rx_bytes rx_dropped rx_errors rx_packets tx_bytes tx_dropped tx_errors
1444234982000000000 server0 eth0 1.015633926034834e+15 0 0 4.61295119435e+11 1.09308649338848e+15 0 0
```
## Multiple Templates
One template may not match all metrics. For example, using multiple plugins with diamond will produce metrics in different formats. If you need to use multiple templates, you'll need to define a prefix filter that must match before the template can be applied.
### Filters
Filters have a similar format to templates but work more like wildcard expressions. When multiple filters would match a metric, the more specific one is chosen. Filters are configured by adding them before the template.
For example,
```
servers.localhost.cpu.loadavg.10
servers.host123.elasticsearch.cache_hits 100
servers.host456.mysql.tx_count 10
servers.host789.prod.mysql.tx_count 10
```
* `servers.*` would match all values
* `servers.*.mysql` would match `servers.host456.mysql.tx_count 10`
* `servers.localhost.*` would match `servers.localhost.cpu.loadavg`
* `servers.*.*.mysql` would match `servers.host789.prod.mysql.tx_count 10`
## Default Templates
If no template filters are defined or you want to just have one basic template, you can define a default template. This template will apply to any metric that has not already matched a filter.
```
dev.http.requests.200
prod.myapp.errors.count
dev.db.queries.count
```
* `env.app.measurement*` would create
* _measurement_=`requests.200` _tags_=`env=dev,app=http`
* _measurement_= `errors.count` _tags_=`env=prod,app=myapp`
* _measurement_=`queries.count` _tags_=`env=dev,app=db`
## Global Tags
If you need to add the same set of tags to all metrics, you can define them globally at the plugin level and not within each template description.
## Minimal Config
```
[[inputs.graphite]]
bind_address = ":3003" # the bind address
protocol = "tcp" # or "udp" protocol to read via
udp_read_buffer = 8388608 # (8*1024*1024) UDP read buffer size
### If matching multiple measurement files, this string will be used to join the matched values.
separator = "."
### Default tags that will be added to all metrics. These can be overridden at the template level
### or by tags extracted from metric
tags = ["region=us-east", "zone=1c"]
### Each template line requires a template pattern. It can have an optional
### filter before the template and separated by spaces. It can also have optional extra
### tags following the template. Multiple tags should be separated by commas and no spaces
### similar to the line protocol format. The can be only one default template.
### Templates support below format:
### filter + template
### filter + template + extra tag
### filter + template with field key
### default template. Ignore the first graphite component "servers"
templates = [
"*.app env.service.resource.measurement",
"stats.* .host.measurement* region=us-west,agent=sensu",
"stats2.* .host.measurement.field",
"measurement*"
]
```

View File

@ -1,205 +0,0 @@
package graphite
import (
"fmt"
"strings"
"github.com/influxdata/influxdb/models"
)
const (
// DefaultBindAddress is the default binding interface if none is specified.
DefaultBindAddress = ":2003"
// DefaultDatabase is the default database if none is specified.
DefaultDatabase = "graphite"
// DefaultProtocol is the default IP protocol used by the Graphite input.
DefaultProtocol = "tcp"
// DefaultSeparator is the default join character to use when joining multiple
// measurment parts in a template.
DefaultSeparator = "."
// DefaultUDPReadBuffer is the default buffer size for the UDP listener.
// Sets the size of the operating system's receive buffer associated with
// the UDP traffic. Keep in mind that the OS must be able
// to handle the number set here or the UDP listener will error and exit.
//
// DefaultReadBuffer = 0 means to use the OS default, which is usually too
// small for high UDP performance.
//
// Increasing OS buffer limits:
// Linux: sudo sysctl -w net.core.rmem_max=<read-buffer>
// BSD/Darwin: sudo sysctl -w kern.ipc.maxsockbuf=<read-buffer>
DefaultUdpReadBuffer = 0
)
// Config represents the configuration for Graphite endpoints.
type Config struct {
BindAddress string
Protocol string
UdpReadBuffer int
Separator string
Tags []string
Templates []string
}
// WithDefaults takes the given config and returns a new config with any required
// default values set.
func (c *Config) WithDefaults() *Config {
d := *c
if d.BindAddress == "" {
d.BindAddress = DefaultBindAddress
}
if d.Protocol == "" {
d.Protocol = DefaultProtocol
}
if d.Separator == "" {
d.Separator = DefaultSeparator
}
if d.UdpReadBuffer == 0 {
d.UdpReadBuffer = DefaultUdpReadBuffer
}
return &d
}
// DefaultTags returns the config's tags.
func (c *Config) DefaultTags() models.Tags {
tags := models.Tags{}
for _, t := range c.Tags {
parts := strings.Split(t, "=")
tags[parts[0]] = parts[1]
}
return tags
}
// Validate validates the config's templates and tags.
func (c *Config) Validate() error {
if err := c.validateTemplates(); err != nil {
return err
}
if err := c.validateTags(); err != nil {
return err
}
return nil
}
func (c *Config) validateTemplates() error {
// map to keep track of filters we see
filters := map[string]struct{}{}
for i, t := range c.Templates {
parts := strings.Fields(t)
// Ensure template string is non-empty
if len(parts) == 0 {
return fmt.Errorf("missing template at position: %d", i)
}
if len(parts) == 1 && parts[0] == "" {
return fmt.Errorf("missing template at position: %d", i)
}
if len(parts) > 3 {
return fmt.Errorf("invalid template format: '%s'", t)
}
template := t
filter := ""
tags := ""
if len(parts) >= 2 {
// We could have <filter> <template> or <template> <tags>. Equals is only allowed in
// tags section.
if strings.Contains(parts[1], "=") {
template = parts[0]
tags = parts[1]
} else {
filter = parts[0]
template = parts[1]
}
}
if len(parts) == 3 {
tags = parts[2]
}
// Validate the template has one and only one measurement
if err := c.validateTemplate(template); err != nil {
return err
}
// Prevent duplicate filters in the config
if _, ok := filters[filter]; ok {
return fmt.Errorf("duplicate filter '%s' found at position: %d", filter, i)
}
filters[filter] = struct{}{}
if filter != "" {
// Validate filter expression is valid
if err := c.validateFilter(filter); err != nil {
return err
}
}
if tags != "" {
// Validate tags
for _, tagStr := range strings.Split(tags, ",") {
if err := c.validateTag(tagStr); err != nil {
return err
}
}
}
}
return nil
}
func (c *Config) validateTags() error {
for _, t := range c.Tags {
if err := c.validateTag(t); err != nil {
return err
}
}
return nil
}
func (c *Config) validateTemplate(template string) error {
hasMeasurement := false
for _, p := range strings.Split(template, ".") {
if p == "measurement" || p == "measurement*" {
hasMeasurement = true
}
}
if !hasMeasurement {
return fmt.Errorf("no measurement in template `%s`", template)
}
return nil
}
func (c *Config) validateFilter(filter string) error {
for _, p := range strings.Split(filter, ".") {
if p == "" {
return fmt.Errorf("filter contains blank section: %s", filter)
}
if strings.Contains(p, "*") && p != "*" {
return fmt.Errorf("invalid filter wildcard section: %s", filter)
}
}
return nil
}
func (c *Config) validateTag(keyValue string) error {
parts := strings.Split(keyValue, "=")
if len(parts) != 2 {
return fmt.Errorf("invalid template tags: '%s'", keyValue)
}
if parts[0] == "" || parts[1] == "" {
return fmt.Errorf("invalid template tags: %s'", keyValue)
}
return nil
}

View File

@ -1,315 +0,0 @@
package graphite
import (
"bufio"
"fmt"
"log"
"math"
"net"
"os"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
const (
udpBufferSize = 65536
)
type tcpConnection struct {
conn net.Conn
connectTime time.Time
}
func (c *tcpConnection) Close() {
c.conn.Close()
}
// Graphite represents a Graphite service.
type Graphite struct {
BindAddress string
Protocol string
UdpReadBuffer int
Separator string
Tags []string
Templates []string
mu sync.Mutex
parser *Parser
logger *log.Logger
config *Config
tcpConnectionsMu sync.Mutex
tcpConnections map[string]*tcpConnection
ln net.Listener
addr net.Addr
udpConn *net.UDPConn
wg sync.WaitGroup
done chan struct{}
// channel for all incoming parsed points
metricC chan telegraf.Metric
}
var sampleConfig = `
bind_address = ":2003" # the bind address
protocol = "tcp" # or "udp" protocol to read via
udp_read_buffer = 8388608 # (8*1024*1024) UDP read buffer size
### If matching multiple measurement files, this string will be used to join the matched values.
separator = "."
### Default tags that will be added to all metrics. These can be overridden at the template level
### or by tags extracted from metric
tags = ["region=north-china", "zone=1c"]
### Each template line requires a template pattern. It can have an optional
### filter before the template and separated by spaces. It can also have optional extra
### tags following the template. Multiple tags should be separated by commas and no spaces
### similar to the line protocol format. The can be only one default template.
### Templates support below format:
### filter + template
### filter + template + extra tag
### filter + template with field key
### default template. Ignore the first graphite component "servers"
templates = [
"*.app env.service.resource.measurement",
"stats.* .host.measurement* region=us-west,agent=sensu",
"stats2.* .host.measurement.field",
"measurement*"
]
`
func (g *Graphite) SampleConfig() string {
return sampleConfig
}
func (g *Graphite) Description() string {
return "Graphite read line-protocol metrics from tcp/udp socket"
}
// Open starts the Graphite input processing data.
func (g *Graphite) Start() error {
g.mu.Lock()
defer g.mu.Unlock()
c := &Config{
BindAddress: g.BindAddress,
Protocol: g.Protocol,
UdpReadBuffer: g.UdpReadBuffer,
Separator: g.Separator,
Tags: g.Tags,
Templates: g.Templates,
}
c.WithDefaults()
if err := c.Validate(); err != nil {
return fmt.Errorf("Graphite input configuration is error! ", err.Error())
}
g.config = c
parser, err := NewParserWithOptions(Options{
Templates: g.config.Templates,
DefaultTags: g.config.DefaultTags(),
Separator: g.config.Separator})
if err != nil {
return fmt.Errorf("Graphite input parser config is error! ", err.Error())
}
g.parser = parser
g.tcpConnections = make(map[string]*tcpConnection)
g.done = make(chan struct{})
g.metricC = make(chan telegraf.Metric, 10000)
if strings.ToLower(g.config.Protocol) == "tcp" {
g.addr, err = g.openTCPServer()
} else if strings.ToLower(g.config.Protocol) == "udp" {
g.addr, err = g.openUDPServer()
} else {
return fmt.Errorf("unrecognized Graphite input protocol %s", g.config.Protocol)
}
if err != nil {
return err
}
g.logger.Printf("Listening on %s: %s", strings.ToUpper(g.config.Protocol), g.config.BindAddress)
return nil
}
func (g *Graphite) closeAllConnections() {
g.tcpConnectionsMu.Lock()
defer g.tcpConnectionsMu.Unlock()
for _, c := range g.tcpConnections {
c.Close()
}
}
// Close stops all data processing on the Graphite input.
func (g *Graphite) Stop() {
g.mu.Lock()
defer g.mu.Unlock()
g.closeAllConnections()
if g.ln != nil {
g.ln.Close()
}
if g.udpConn != nil {
g.udpConn.Close()
}
close(g.done)
g.wg.Wait()
g.done = nil
}
// openTCPServer opens the Graphite input in TCP mode and starts processing data.
func (g *Graphite) openTCPServer() (net.Addr, error) {
ln, err := net.Listen("tcp", g.config.BindAddress)
if err != nil {
return nil, err
}
g.ln = ln
g.wg.Add(1)
go func() {
defer g.wg.Done()
for {
conn, err := g.ln.Accept()
if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() {
g.logger.Println("graphite TCP listener closed")
return
}
if err != nil {
g.logger.Println("error accepting TCP connection", err.Error())
continue
}
g.wg.Add(1)
go g.handleTCPConnection(conn)
}
}()
return ln.Addr(), nil
}
// handleTCPConnection services an individual TCP connection for the Graphite input.
func (g *Graphite) handleTCPConnection(conn net.Conn) {
defer g.wg.Done()
defer conn.Close()
defer g.untrackConnection(conn)
g.trackConnection(conn)
reader := bufio.NewReader(conn)
for {
// Read up to the next newline.
buf, err := reader.ReadBytes('\n')
if err != nil {
return
}
// Trim the buffer, even though there should be no padding
line := strings.TrimSpace(string(buf))
g.handleLine(line)
}
}
func (g *Graphite) trackConnection(c net.Conn) {
g.tcpConnectionsMu.Lock()
defer g.tcpConnectionsMu.Unlock()
g.tcpConnections[c.RemoteAddr().String()] = &tcpConnection{
conn: c,
connectTime: time.Now().UTC(),
}
}
func (g *Graphite) untrackConnection(c net.Conn) {
g.tcpConnectionsMu.Lock()
defer g.tcpConnectionsMu.Unlock()
delete(g.tcpConnections, c.RemoteAddr().String())
}
// openUDPServer opens the Graphite input in UDP mode and starts processing incoming data.
func (g *Graphite) openUDPServer() (net.Addr, error) {
addr, err := net.ResolveUDPAddr("udp", g.config.BindAddress)
if err != nil {
return nil, err
}
g.udpConn, err = net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}
if g.config.UdpReadBuffer != 0 {
err = g.udpConn.SetReadBuffer(g.config.UdpReadBuffer)
if err != nil {
return nil, fmt.Errorf("unable to set UDP read buffer to %d: %s",
g.config.UdpReadBuffer, err)
}
}
buf := make([]byte, udpBufferSize)
g.wg.Add(1)
go func() {
defer g.wg.Done()
for {
n, _, err := g.udpConn.ReadFromUDP(buf)
if err != nil {
g.udpConn.Close()
return
}
lines := strings.Split(string(buf[:n]), "\n")
for _, line := range lines {
g.handleLine(line)
}
}
}()
return g.udpConn.LocalAddr(), nil
}
func (g *Graphite) handleLine(line string) {
if line == "" {
return
}
// Parse it.
metric, err := g.parser.Parse(line)
if err != nil {
switch err := err.(type) {
case *UnsupposedValueError:
// Graphite ignores NaN values with no error.
if math.IsNaN(err.Value) {
return
}
}
g.logger.Printf("unable to parse line: %s: %s", line, err)
return
}
g.metricC <- metric
}
func (g *Graphite) Gather(acc telegraf.Accumulator) error {
g.mu.Lock()
defer g.mu.Unlock()
npoints := len(g.metricC)
for i := 0; i < npoints; i++ {
metric := <-g.metricC
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
return nil
}
func init() {
inputs.Add("graphite", func() telegraf.Input {
return &Graphite{logger: log.New(os.Stderr, "[graphite] ", log.LstdFlags)}
})
}

View File

@ -1,159 +0,0 @@
# The graphite Input
## A note on UDP/IP OS Buffer sizes
If you're using UDP input and running Linux or FreeBSD, please adjust your UDP buffer
size limit, [see here for more details.](../udp/README.md#a-note-on-udpip-os-buffer-sizes)
## Configuration
Each Graphite input allows the binding address, and protocol to be set.
## Parsing Metrics
The graphite plugin allows measurements to be saved using the graphite line protocol. By default, enabling the graphite plugin will allow you to collect metrics and store them using the metric name as the measurement. If you send a metric named `servers.localhost.cpu.loadavg.10`, it will store the full metric name as the measurement with no extracted tags.
While this default setup works, it is not the ideal way to store measurements in InfluxDB since it does not take advantage of tags. It also will not perform optimally with a large dataset sizes since queries will be forced to use regexes which is known to not scale well.
To extract tags from metrics, one or more templates must be configured to parse metrics into tags and measurements.
## Templates
Templates allow matching parts of a metric name to be used as tag keys in the stored metric. They have a similar format to graphite metric names. The values in between the separators are used as the tag keys. The location of the tag key that matches the same position as the graphite metric section is used as the value. If there is no value, the graphite portion is skipped.
The special value _measurement_ is used to define the measurement name. It can have a trailing `*` to indicate that the remainder of the metric should be used. If a _measurement_ is not specified, the full metric name is used.
### Basic Matching
`servers.localhost.cpu.loadavg.10`
* Template: `.host.resource.measurement*`
* Output: _measurement_ =`loadavg.10` _tags_ =`host=localhost resource=cpu`
### Multiple Measurement Matching
The _measurement_ can be specified multiple times in a template to provide more control over the measurement name. Multiple values
will be joined together using the _Separator_ config variable. By default, this value is `.`.
`servers.localhost.cpu.cpu0.user`
* Template: `.host.measurement.cpu.measurement`
* Output: _measurement_ = `cpu.user` _tags_ = `host=localhost cpu=cpu0`
Since '.' requires queries on measurements to be double-quoted, you may want to set this to `_` to simplify querying parsed metrics.
`servers.localhost.cpu.cpu0.user`
* Separator: `_`
* Template: `.host.measurement.cpu.measurement`
* Output: _measurement_ = `cpu_user` _tags_ = `host=localhost cpu=cpu0`
### Adding Tags
Additional tags can be added to a metric that don't exist on the received metric. You can add additional tags by specifying them after the pattern. Tags have the same format as the line protocol. Multiple tags are separated by commas.
`servers.localhost.cpu.loadavg.10`
* Template: `.host.resource.measurement* region=us-west,zone=1a`
* Output: _measurement_ = `loadavg.10` _tags_ = `host=localhost resource=cpu region=us-west zone=1a`
### Fields
A field key can be specified by using the keyword _field_. By default if no _field_ keyword is specified then the metric will be written to a field named _value_.
When using the current default engine _BZ1_, it's recommended to use a single field per value for performance reasons.
When using the _TSM1_ engine it's possible to amend measurement metrics with additional fields, e.g:
Input:
```
sensu.metric.net.server0.eth0.rx_packets 461295119435 1444234982
sensu.metric.net.server0.eth0.tx_bytes 1093086493388480 1444234982
sensu.metric.net.server0.eth0.rx_bytes 1015633926034834 1444234982
sensu.metric.net.server0.eth0.tx_errors 0 1444234982
sensu.metric.net.server0.eth0.rx_errors 0 1444234982
sensu.metric.net.server0.eth0.tx_dropped 0 1444234982
sensu.metric.net.server0.eth0.rx_dropped 0 1444234982
```
With template:
```
sensu.metric.* ..measurement.host.interface.field
```
Becomes database entry:
```
> select * from net
name: net
---------
time host interface rx_bytes rx_dropped rx_errors rx_packets tx_bytes tx_dropped tx_errors
1444234982000000000 server0 eth0 1.015633926034834e+15 0 0 4.61295119435e+11 1.09308649338848e+15 0 0
```
## Multiple Templates
One template may not match all metrics. For example, using multiple plugins with diamond will produce metrics in different formats. If you need to use multiple templates, you'll need to define a prefix filter that must match before the template can be applied.
### Filters
Filters have a similar format to templates but work more like wildcard expressions. When multiple filters would match a metric, the more specific one is chosen. Filters are configured by adding them before the template.
For example,
```
servers.localhost.cpu.loadavg.10
servers.host123.elasticsearch.cache_hits 100
servers.host456.mysql.tx_count 10
servers.host789.prod.mysql.tx_count 10
```
* `servers.*` would match all values
* `servers.*.mysql` would match `servers.host456.mysql.tx_count 10`
* `servers.localhost.*` would match `servers.localhost.cpu.loadavg`
* `servers.*.*.mysql` would match `servers.host789.prod.mysql.tx_count 10`
## Default Templates
If no template filters are defined or you want to just have one basic template, you can define a default template. This template will apply to any metric that has not already matched a filter.
```
dev.http.requests.200
prod.myapp.errors.count
dev.db.queries.count
```
* `env.app.measurement*` would create
* _measurement_=`requests.200` _tags_=`env=dev,app=http`
* _measurement_= `errors.count` _tags_=`env=prod,app=myapp`
* _measurement_=`queries.count` _tags_=`env=dev,app=db`
## Global Tags
If you need to add the same set of tags to all metrics, you can define them globally at the plugin level and not within each template description.
## Minimal Config
```
[[inputs.tail]]
### The file to be monited by this tail plugin
files = ["/tmp/test","/tmp/test2"]
### If matching multiple measurement files, this string will be used to join the matched values.
separator = "."
### Default tags that will be added to all metrics. These can be overridden at the template level
### or by tags extracted from metric
tags = ["region=north-china", "zone=1c"]
### Each template line requires a template pattern. It can have an optional
### filter before the template and separated by spaces. It can also have optional extra
### tags following the template. Multiple tags should be separated by commas and no spaces
### similar to the line protocol format. The can be only one default template.
### Templates support below format:
### filter + template
### filter + template + extra tag
### filter + template with field key
### default template. Ignore the first graphite component "servers"
templates = [
"*.app env.service.resource.measurement",
"stats.* .host.measurement* region=us-west,agent=sensu",
"stats2.* .host.measurement.field",
"measurement*"
]
```

View File

@ -1,14 +0,0 @@
package tail
import "fmt"
// An UnsupposedValueError is returned when a parsed value is not
// supposed.
type UnsupposedValueError struct {
Field string
Value float64
}
func (err *UnsupposedValueError) Error() string {
return fmt.Sprintf(`field "%s" value: "%v" is unsupported`, err.Field, err.Value)
}

View File

@ -1,392 +0,0 @@
package tail
import (
"fmt"
"math"
"sort"
"strconv"
"strings"
"time"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/telegraf"
)
// Minimum and maximum supported dates for timestamps.
var (
MinDate = time.Date(1901, 12, 13, 0, 0, 0, 0, time.UTC)
MaxDate = time.Date(2038, 1, 19, 0, 0, 0, 0, time.UTC)
)
var defaultTemplate *template
func init() {
var err error
defaultTemplate, err = NewTemplate("measurement*", nil, DefaultSeparator)
if err != nil {
panic(err)
}
}
// Parser encapsulates a Graphite Parser.
type Parser struct {
matcher *matcher
tags models.Tags
}
// Options are configurable values that can be provided to a Parser
type Options struct {
Separator string
Templates []string
DefaultTags models.Tags
}
// NewParserWithOptions returns a graphite parser using the given options
func NewParserWithOptions(options Options) (*Parser, error) {
matcher := newMatcher()
matcher.AddDefaultTemplate(defaultTemplate)
for _, pattern := range options.Templates {
template := pattern
filter := ""
// Format is [filter] <template> [tag1=value1,tag2=value2]
parts := strings.Fields(pattern)
if len(parts) < 1 {
continue
} else if len(parts) >= 2 {
if strings.Contains(parts[1], "=") {
template = parts[0]
} else {
filter = parts[0]
template = parts[1]
}
}
// Parse out the default tags specific to this template
tags := models.Tags{}
if strings.Contains(parts[len(parts)-1], "=") {
tagStrs := strings.Split(parts[len(parts)-1], ",")
for _, kv := range tagStrs {
parts := strings.Split(kv, "=")
tags[parts[0]] = parts[1]
}
}
tmpl, err := NewTemplate(template, tags, options.Separator)
if err != nil {
return nil, err
}
matcher.Add(filter, tmpl)
}
return &Parser{matcher: matcher, tags: options.DefaultTags}, nil
}
// NewParser returns a GraphiteParser instance.
func NewParser(templates []string, defaultTags models.Tags) (*Parser, error) {
return NewParserWithOptions(
Options{
Templates: templates,
DefaultTags: defaultTags,
Separator: DefaultSeparator,
})
}
// Parse performs Graphite parsing of a single line.
func (p *Parser) Parse(line string) (telegraf.Metric, error) {
// Break into 3 fields (name, value, timestamp).
fields := strings.Fields(line)
if len(fields) != 2 && len(fields) != 3 {
return nil, fmt.Errorf("received %q which doesn't have required fields", line)
}
// decode the name and tags
template := p.matcher.Match(fields[0])
measurement, tags, field, err := template.Apply(fields[0])
if err != nil {
return nil, err
}
// Could not extract measurement, use the raw value
if measurement == "" {
measurement = fields[0]
}
// Parse value.
v, err := strconv.ParseFloat(fields[1], 64)
if err != nil {
return nil, fmt.Errorf(`field "%s" value: %s`, fields[0], err)
}
if math.IsNaN(v) || math.IsInf(v, 0) {
return nil, &UnsupposedValueError{Field: fields[0], Value: v}
}
fieldValues := map[string]interface{}{}
if field != "" {
fieldValues[field] = v
} else {
fieldValues["value"] = v
}
// If no 3rd field, use now as timestamp
timestamp := time.Now().UTC()
if len(fields) == 3 {
// Parse timestamp.
unixTime, err := strconv.ParseFloat(fields[2], 64)
if err != nil {
return nil, fmt.Errorf(`field "%s" time: %s`, fields[0], err)
}
// -1 is a special value that gets converted to current UTC time
// See https://github.com/graphite-project/carbon/issues/54
if unixTime != float64(-1) {
// Check if we have fractional seconds
timestamp = time.Unix(int64(unixTime), int64((unixTime-math.Floor(unixTime))*float64(time.Second)))
if timestamp.Before(MinDate) || timestamp.After(MaxDate) {
return nil, fmt.Errorf("timestamp out of range")
}
}
}
// Set the default tags on the point if they are not already set
for k, v := range p.tags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}
return telegraf.NewMetric(measurement, tags, fieldValues, timestamp)
}
// ApplyTemplate extracts the template fields from the given line and
// returns the measurement name and tags.
func (p *Parser) ApplyTemplate(line string) (string, map[string]string, string, error) {
// Break line into fields (name, value, timestamp), only name is used
fields := strings.Fields(line)
if len(fields) == 0 {
return "", make(map[string]string), "", nil
}
// decode the name and tags
template := p.matcher.Match(fields[0])
name, tags, field, err := template.Apply(fields[0])
// Set the default tags on the point if they are not already set
for k, v := range p.tags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}
return name, tags, field, err
}
// template represents a pattern and tags to map a graphite metric string to a influxdb Point
type template struct {
tags []string
defaultTags models.Tags
greedyMeasurement bool
separator string
}
// NewTemplate returns a new template ensuring it has a measurement
// specified.
func NewTemplate(pattern string, defaultTags models.Tags, separator string) (*template, error) {
tags := strings.Split(pattern, ".")
hasMeasurement := false
template := &template{tags: tags, defaultTags: defaultTags, separator: separator}
for _, tag := range tags {
if strings.HasPrefix(tag, "measurement") {
hasMeasurement = true
}
if tag == "measurement*" {
template.greedyMeasurement = true
}
}
if !hasMeasurement {
return nil, fmt.Errorf("no measurement specified for template. %q", pattern)
}
return template, nil
}
// Apply extracts the template fields from the given line and returns the measurement
// name and tags
func (t *template) Apply(line string) (string, map[string]string, string, error) {
fields := strings.Split(line, ".")
var (
measurement []string
tags = make(map[string]string)
field string
)
// Set any default tags
for k, v := range t.defaultTags {
tags[k] = v
}
for i, tag := range t.tags {
if i >= len(fields) {
continue
}
if tag == "measurement" {
measurement = append(measurement, fields[i])
} else if tag == "field" {
if len(field) != 0 {
return "", nil, "", fmt.Errorf("'field' can only be used once in each template: %q", line)
}
field = fields[i]
} else if tag == "measurement*" {
measurement = append(measurement, fields[i:]...)
break
} else if tag != "" {
tags[tag] = fields[i]
}
}
return strings.Join(measurement, t.separator), tags, field, nil
}
// matcher determines which template should be applied to a given metric
// based on a filter tree.
type matcher struct {
root *node
defaultTemplate *template
}
func newMatcher() *matcher {
return &matcher{
root: &node{},
}
}
// Add inserts the template in the filter tree based the given filter
func (m *matcher) Add(filter string, template *template) {
if filter == "" {
m.AddDefaultTemplate(template)
return
}
m.root.Insert(filter, template)
}
func (m *matcher) AddDefaultTemplate(template *template) {
m.defaultTemplate = template
}
// Match returns the template that matches the given graphite line
func (m *matcher) Match(line string) *template {
tmpl := m.root.Search(line)
if tmpl != nil {
return tmpl
}
return m.defaultTemplate
}
// node is an item in a sorted k-ary tree. Each child is sorted by its value.
// The special value of "*", is always last.
type node struct {
value string
children nodes
template *template
}
func (n *node) insert(values []string, template *template) {
// Add the end, set the template
if len(values) == 0 {
n.template = template
return
}
// See if the the current element already exists in the tree. If so, insert the
// into that sub-tree
for _, v := range n.children {
if v.value == values[0] {
v.insert(values[1:], template)
return
}
}
// New element, add it to the tree and sort the children
newNode := &node{value: values[0]}
n.children = append(n.children, newNode)
sort.Sort(&n.children)
// Now insert the rest of the tree into the new element
newNode.insert(values[1:], template)
}
// Insert inserts the given string template into the tree. The filter string is separated
// on "." and each part is used as the path in the tree.
func (n *node) Insert(filter string, template *template) {
n.insert(strings.Split(filter, "."), template)
}
func (n *node) search(lineParts []string) *template {
// Nothing to search
if len(lineParts) == 0 || len(n.children) == 0 {
return n.template
}
// If last element is a wildcard, don't include in this search since it's sorted
// to the end but lexicographically it would not always be and sort.Search assumes
// the slice is sorted.
length := len(n.children)
if n.children[length-1].value == "*" {
length--
}
// Find the index of child with an exact match
i := sort.Search(length, func(i int) bool {
return n.children[i].value >= lineParts[0]
})
// Found an exact match, so search that child sub-tree
if i < len(n.children) && n.children[i].value == lineParts[0] {
return n.children[i].search(lineParts[1:])
}
// Not an exact match, see if we have a wildcard child to search
if n.children[len(n.children)-1].value == "*" {
return n.children[len(n.children)-1].search(lineParts[1:])
}
return n.template
}
func (n *node) Search(line string) *template {
return n.search(strings.Split(line, "."))
}
type nodes []*node
// Less returns a boolean indicating whether the filter at position j
// is less than the filter at position k. Filters are order by string
// comparison of each component parts. A wildcard value "*" is never
// less than a non-wildcard value.
//
// For example, the filters:
// "*.*"
// "servers.*"
// "servers.localhost"
// "*.localhost"
//
// Would be sorted as:
// "servers.localhost"
// "servers.*"
// "*.localhost"
// "*.*"
func (n *nodes) Less(j, k int) bool {
if (*n)[j].value == "*" && (*n)[k].value != "*" {
return false
}
if (*n)[j].value != "*" && (*n)[k].value == "*" {
return true
}
return (*n)[j].value < (*n)[k].value
}
func (n *nodes) Swap(i, j int) { (*n)[i], (*n)[j] = (*n)[j], (*n)[i] }
func (n *nodes) Len() int { return len(*n) }

View File

@ -1,186 +0,0 @@
package tail
import (
"fmt"
"log"
"math"
"os"
"strings"
"sync"
"github.com/hpcloud/tail"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
// Tail represents a tail service to
// use line-protocol to read metrics from the file given
type Tail struct {
Files []string
Separator string
Tags []string
Templates []string
mu sync.Mutex
parser *Parser
logger *log.Logger
config *Config
tailPointers []*tail.Tail
wg sync.WaitGroup
done chan struct{}
// channel for all incoming parsed points
metricC chan telegraf.Metric
}
var sampleConfig = `
### The file to be monited by this tail plugin
files = ["/tmp/test","/tmp/test2"]
### If matching multiple measurement files, this string will be used to join the matched values.
separator = "."
### Default tags that will be added to all metrics. These can be overridden at the template level
### or by tags extracted from metric
tags = ["region=north-china", "zone=1c"]
### Each template line requires a template pattern. It can have an optional
### filter before the template and separated by spaces. It can also have optional extra
### tags following the template. Multiple tags should be separated by commas and no spaces
### similar to the line protocol format. The can be only one default template.
### Templates support below format:
### filter + template
### filter + template + extra tag
### filter + template with field key
### default template. Ignore the first graphite component "servers"
templates = [
"*.app env.service.resource.measurement",
"stats.* .host.measurement* region=us-west,agent=sensu",
"stats2.* .host.measurement.field",
"measurement*"
]
`
func (t *Tail) SampleConfig() string {
return sampleConfig
}
func (t *Tail) Description() string {
return "Tail read line-protocol metrics from the file given!"
}
// Open starts the Graphite input processing data.
func (t *Tail) Start() error {
t.mu.Lock()
defer t.mu.Unlock()
c := &Config{
Files: t.Files,
Separator: t.Separator,
Tags: t.Tags,
Templates: t.Templates,
}
c.WithDefaults()
if err := c.Validate(); err != nil {
return fmt.Errorf("Graphite input configuration is error! ", err.Error())
}
t.config = c
parser, err := NewParserWithOptions(Options{
Templates: t.config.Templates,
DefaultTags: t.config.DefaultTags(),
Separator: t.config.Separator})
if err != nil {
return fmt.Errorf("Graphite input parser config is error! ", err.Error())
}
t.parser = parser
t.done = make(chan struct{})
t.metricC = make(chan telegraf.Metric, 10000)
t.tailPointers = make([]*tail.Tail, len(t.Files))
for i, fileName := range t.Files {
t.tailPointers[i], err = t.tailFile(fileName)
if err != nil {
fmt.Errorf("Can not open the file: %s to tail", fileName)
} else {
t.logger.Printf("Openning the file: %s to tail", fileName)
}
}
return nil
}
func (t *Tail) tailFile(fileName string) (*tail.Tail, error) {
tailPointer, err := tail.TailFile(fileName, tail.Config{Follow: true})
if err != nil {
return nil, err
}
t.wg.Add(1)
go func() {
defer t.wg.Done()
for line := range tailPointer.Lines {
t.handleLine(strings.TrimSpace(line.Text))
}
tailPointer.Wait()
}()
return tailPointer, nil
}
func (t *Tail) handleLine(line string) {
if line == "" {
return
}
// Parse it.
mertic, err := t.parser.Parse(line)
if err != nil {
switch err := err.(type) {
case *UnsupposedValueError:
// Graphite ignores NaN values with no error.
if math.IsNaN(err.Value) {
return
}
}
t.logger.Printf("unable to parse line: %s: %s", line, err)
return
}
t.metricC <- mertic
}
// Close stops all data processing on the Graphite input.
func (t *Tail) Stop() {
t.mu.Lock()
defer t.mu.Unlock()
for _, tailPointer := range t.tailPointers {
tailPointer.Cleanup()
tailPointer.Stop()
}
close(t.done)
t.wg.Wait()
t.done = nil
}
func (t *Tail) Gather(acc telegraf.Accumulator) error {
t.mu.Lock()
defer t.mu.Unlock()
npoints := len(t.metricC)
for i := 0; i < npoints; i++ {
metric := <-t.metricC
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
return nil
}
func init() {
inputs.Add("tail", func() telegraf.Input {
return &Tail{logger: log.New(os.Stderr, "[tail] ", log.LstdFlags)}
})
}