Implementing generic parser plugins and documentation

This constitutes a large change in how we will parse different data
formats going forward (for the plugins that support it)

This is working off @henrypfhu's changes.
This commit is contained in:
Cameron Sparr
2016-02-05 17:36:35 -07:00
parent 1449c8b887
commit e619493ece
32 changed files with 1971 additions and 522 deletions

View File

@@ -10,8 +10,8 @@ import (
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
)
const statsPath = "/_nodes/stats"
@@ -168,7 +168,7 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er
now := time.Now()
for p, s := range stats {
f := internal.JSONFlattener{}
f := jsonparser.JSONFlattener{}
err := f.FlattenJSON("", s)
if err != nil {
return err

View File

@@ -8,8 +8,8 @@ The exec plugin can execute arbitrary commands which output:
> Graphite understands messages with this format:
> ```
metric_path value timestamp\n
> ```
metric_path value timestamp\n
```
> __metric_path__ is the metric namespace that you want to populate.
@@ -28,10 +28,7 @@ and strings will be ignored.
# Read flattened metrics from one or more commands that output JSON to stdout
[[inputs.exec]]
# Shell/commands array
# compatible with old version
# we can still use the old command configuration
# command = "/usr/bin/mycollector --foo=bar"
commands = ["/tmp/test.sh","/tmp/test2.sh"]
commands = ["/tmp/test.sh", "/tmp/test2.sh"]
# Data format to consume. This can be "json", "influx" or "graphite" (line-protocol)
# NOTE json only reads numerical measurements, strings and booleans are ignored.
@@ -128,7 +125,7 @@ and usage_busy. They will receive a timestamp at collection time.
We can also change the data_format to "graphite" to use the metrics collecting scripts such as (compatible with graphite):
* Nagios [Mertics Plugins] (https://exchange.nagios.org/directory/Plugins)
* Sensu [Mertics Plugins] (https://github.com/sensu-plugins)
* Sensu [Mertics Plugins] (https://github.com/sensu-plugins)
#### Configuration
```
@@ -180,4 +177,4 @@ sensu.metric.net.server0.eth0.rx_dropped 0 1444234982
The templates configuration will be used to parse the graphite metrics to support influxdb/opentsdb tagging store engines.
More detail information about templates, please refer to [The graphite Input] (https://github.com/influxdata/influxdb/blob/master/services/graphite/README.md)

View File

@@ -9,66 +9,40 @@ import (
"github.com/gonuts/go-shellquote"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/encoding"
"github.com/influxdata/telegraf/plugins/inputs"
_ "github.com/influxdata/telegraf/internal/encoding/graphite"
_ "github.com/influxdata/telegraf/internal/encoding/influx"
_ "github.com/influxdata/telegraf/internal/encoding/json"
"github.com/influxdata/telegraf/plugins/parsers"
)
const sampleConfig = `
# Shell/commands array
# compatible with old version
# we can still use the old command configuration
# command = "/usr/bin/mycollector --foo=bar"
commands = ["/tmp/test.sh","/tmp/test2.sh"]
### Commands array
commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"]
# Data format to consume. This can be "json", "influx" or "graphite" (line-protocol)
# NOTE json only reads numerical measurements, strings and booleans are ignored.
data_format = "json"
# measurement name suffix (for separating different commands)
### measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
### Below configuration will be used for data_format = "graphite", can be ignored for other data_format
### If matching multiple measurement files, this string will be used to join the matched values.
separator = "."
### Each template line requires a template pattern. It can have an optional
### filter before the template and separated by spaces. It can also have optional extra
### tags following the template. Multiple tags should be separated by commas and no spaces
### similar to the line protocol format. The can be only one default template.
### Templates support below format:
### 1. filter + template
### 2. filter + template + extra tag
### 3. filter + template with field key
### 4. default template
templates = [
"*.app env.service.resource.measurement",
"stats.* .host.measurement* region=us-west,agent=sensu",
"stats2.* .host.measurement.field",
"measurement*"
]
### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS.md
data_format = "influx"
`
type Exec struct {
Commands []string
Command string
DataFormat string
Commands []string
Command string
Separator string
Templates []string
encodingParser encoding.Parser
initedConfig bool
parser parsers.Parser
wg sync.WaitGroup
sync.Mutex
runner Runner
errc chan error
runner Runner
errChan chan error
}
func NewExec() *Exec {
return &Exec{
runner: CommandRunner{},
}
}
type Runner interface {
@@ -95,22 +69,18 @@ func (c CommandRunner) Run(e *Exec, command string) ([]byte, error) {
return out.Bytes(), nil
}
func NewExec() *Exec {
return &Exec{runner: CommandRunner{}}
}
func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator) {
defer e.wg.Done()
out, err := e.runner.Run(e, command)
if err != nil {
e.errc <- err
e.errChan <- err
return
}
metrics, err := e.encodingParser.Parse(out)
metrics, err := e.parser.Parse(out)
if err != nil {
e.errc <- err
e.errChan <- err
} else {
for _, metric := range metrics {
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
@@ -118,66 +88,33 @@ func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator) {
}
}
func (e *Exec) initConfig() error {
e.Lock()
defer e.Unlock()
if e.Command != "" && len(e.Commands) < 1 {
e.Commands = []string{e.Command}
}
if e.DataFormat == "" {
e.DataFormat = "json"
}
var err error
configs := make(map[string]interface{})
configs["Separator"] = e.Separator
configs["Templates"] = e.Templates
e.encodingParser, err = encoding.NewParser(e.DataFormat, configs)
if err != nil {
return fmt.Errorf("exec configuration is error: %s ", err.Error())
}
return nil
}
func (e *Exec) SampleConfig() string {
return sampleConfig
}
func (e *Exec) Description() string {
return "Read metrics from one or more commands that can output JSON, influx or graphite line protocol to stdout"
return "Read metrics from one or more commands that can output to stdout"
}
func (e *Exec) SetParser(parser parsers.Parser) {
e.parser = parser
}
func (e *Exec) Gather(acc telegraf.Accumulator) error {
e.errChan = make(chan error, len(e.Commands))
if !e.initedConfig {
if err := e.initConfig(); err != nil {
return err
}
e.initedConfig = true
}
e.Lock()
e.errc = make(chan error, 10)
e.Unlock()
e.wg.Add(len(e.Commands))
for _, command := range e.Commands {
e.wg.Add(1)
go e.ProcessCommand(command, acc)
}
e.wg.Wait()
select {
default:
close(e.errc)
close(e.errChan)
return nil
case err := <-e.errc:
close(e.errc)
case err := <-e.errChan:
close(e.errChan)
return err
}

View File

@@ -4,6 +4,8 @@ import (
"fmt"
"testing"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -63,9 +65,11 @@ func (r runnerMock) Run(e *Exec, command string) ([]byte, error) {
}
func TestExec(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
e := &Exec{
runner: newRunnerMock([]byte(validJson), nil),
Commands: []string{"testcommand arg1"},
parser: parser,
}
var acc testutil.Accumulator
@@ -87,9 +91,11 @@ func TestExec(t *testing.T) {
}
func TestExecMalformed(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
e := &Exec{
runner: newRunnerMock([]byte(malformedJson), nil),
Commands: []string{"badcommand arg1"},
parser: parser,
}
var acc testutil.Accumulator
@@ -99,9 +105,11 @@ func TestExecMalformed(t *testing.T) {
}
func TestCommandError(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
e := &Exec{
runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")),
Commands: []string{"badcommand"},
parser: parser,
}
var acc testutil.Accumulator
@@ -111,10 +119,11 @@ func TestCommandError(t *testing.T) {
}
func TestLineProtocolParse(t *testing.T) {
parser, _ := parsers.NewInfluxParser()
e := &Exec{
runner: newRunnerMock([]byte(lineProtocol), nil),
Commands: []string{"line-protocol"},
DataFormat: "influx",
runner: newRunnerMock([]byte(lineProtocol), nil),
Commands: []string{"line-protocol"},
parser: parser,
}
var acc testutil.Accumulator
@@ -133,10 +142,11 @@ func TestLineProtocolParse(t *testing.T) {
}
func TestLineProtocolParseMultiple(t *testing.T) {
parser, _ := parsers.NewInfluxParser()
e := &Exec{
runner: newRunnerMock([]byte(lineProtocolMulti), nil),
Commands: []string{"line-protocol"},
DataFormat: "influx",
runner: newRunnerMock([]byte(lineProtocolMulti), nil),
Commands: []string{"line-protocol"},
parser: parser,
}
var acc testutil.Accumulator
@@ -158,15 +168,3 @@ func TestLineProtocolParseMultiple(t *testing.T) {
acc.AssertContainsTaggedFields(t, "cpu", fields, tags)
}
}
func TestInvalidDataFormat(t *testing.T) {
e := &Exec{
runner: newRunnerMock([]byte(lineProtocol), nil),
Commands: []string{"bad data format"},
DataFormat: "FooBar",
}
var acc testutil.Accumulator
err := e.Gather(&acc)
require.Error(t, err)
}

View File

@@ -1,7 +1,6 @@
package httpjson
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
@@ -12,8 +11,8 @@ import (
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)
type HttpJson struct {
@@ -137,39 +136,34 @@ func (h *HttpJson) gatherServer(
return err
}
var jsonOut map[string]interface{}
if err = json.Unmarshal([]byte(resp), &jsonOut); err != nil {
return errors.New("Error decoding JSON response")
}
tags := map[string]string{
"server": serverURL,
}
for _, tag := range h.TagKeys {
switch v := jsonOut[tag].(type) {
case string:
tags[tag] = v
}
delete(jsonOut, tag)
}
if responseTime >= 0 {
jsonOut["response_time"] = responseTime
}
f := internal.JSONFlattener{}
err = f.FlattenJSON("", jsonOut)
if err != nil {
return err
}
var msrmnt_name string
if h.Name == "" {
msrmnt_name = "httpjson"
} else {
msrmnt_name = "httpjson_" + h.Name
}
acc.AddFields(msrmnt_name, f.Fields, tags)
tags := map[string]string{
"server": serverURL,
}
parser, err := parsers.NewJSONParser(msrmnt_name, h.TagKeys, tags)
if err != nil {
return err
}
metrics, err := parser.Parse([]byte(resp))
if err != nil {
return err
}
for _, metric := range metrics {
fields := make(map[string]interface{})
for k, v := range metric.Fields() {
fields[k] = v
}
fields["response_time"] = responseTime
acc.AddFields(metric.Name(), fields, metric.Tags())
}
return nil
}

View File

@@ -1,12 +1,14 @@
package kafka_consumer
import (
"fmt"
"log"
"strings"
"sync"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/Shopify/sarama"
"github.com/wvanbergen/kafka/consumergroup"
@@ -20,6 +22,8 @@ type Kafka struct {
PointBuffer int
Offset string
parser parsers.Parser
sync.Mutex
// channel for all incoming kafka messages
@@ -36,16 +40,22 @@ type Kafka struct {
}
var sampleConfig = `
# topic(s) to consume
### topic(s) to consume
topics = ["telegraf"]
# an array of Zookeeper connection strings
### an array of Zookeeper connection strings
zookeeper_peers = ["localhost:2181"]
# the name of the consumer group
### the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
# Maximum number of points to buffer between collection intervals
### Maximum number of points to buffer between collection intervals
point_buffer = 100000
# Offset (must be either "oldest" or "newest")
### Offset (must be either "oldest" or "newest")
offset = "oldest"
### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS.md
data_format = "influx"
`
func (k *Kafka) SampleConfig() string {
@@ -53,7 +63,11 @@ func (k *Kafka) SampleConfig() string {
}
func (k *Kafka) Description() string {
return "Read line-protocol metrics from Kafka topic(s)"
return "Read metrics from Kafka topic(s)"
}
func (k *Kafka) SetParser(parser parsers.Parser) {
k.parser = parser
}
func (k *Kafka) Start() error {
@@ -96,15 +110,15 @@ func (k *Kafka) Start() error {
k.metricC = make(chan telegraf.Metric, k.PointBuffer)
// Start the kafka message reader
go k.parser()
go k.receiver()
log.Printf("Started the kafka consumer service, peers: %v, topics: %v\n",
k.ZookeeperPeers, k.Topics)
return nil
}
// parser() reads all incoming messages from the consumer, and parses them into
// receiver() reads all incoming messages from the consumer, and parses them into
// influxdb metric points.
func (k *Kafka) parser() {
func (k *Kafka) receiver() {
for {
select {
case <-k.done:
@@ -112,13 +126,14 @@ func (k *Kafka) parser() {
case err := <-k.errs:
log.Printf("Kafka Consumer Error: %s\n", err.Error())
case msg := <-k.in:
metrics, err := telegraf.ParseMetrics(msg.Value)
metrics, err := k.parser.Parse(msg.Value)
if err != nil {
log.Printf("Could not parse kafka message: %s, error: %s",
string(msg.Value), err.Error())
}
for _, metric := range metrics {
fmt.Println(string(metric.Name()))
select {
case k.metricC <- metric:
continue

View File

@@ -9,6 +9,8 @@ import (
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/parsers"
)
func TestReadsMetricsFromKafka(t *testing.T) {
@@ -40,6 +42,8 @@ func TestReadsMetricsFromKafka(t *testing.T) {
PointBuffer: 100000,
Offset: "oldest",
}
p, _ := parsers.NewInfluxParser()
k.SetParser(p)
if err := k.Start(); err != nil {
t.Fatal(err.Error())
} else {

View File

@@ -5,6 +5,7 @@ import (
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/Shopify/sarama"
@@ -12,9 +13,11 @@ import (
)
const (
testMsg = "cpu_load_short,host=server01 value=23422.0 1422568543702900257"
invalidMsg = "cpu_load_short,host=server01 1422568543702900257"
pointBuffer = 5
testMsg = "cpu_load_short,host=server01 value=23422.0 1422568543702900257"
testMsgGraphite = "cpu.load.short.graphite 23422 1454780029"
testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n"
invalidMsg = "cpu_load_short,host=server01 1422568543702900257"
pointBuffer = 5
)
func NewTestKafka() (*Kafka, chan *sarama.ConsumerMessage) {
@@ -39,7 +42,8 @@ func TestRunParser(t *testing.T) {
k, in := NewTestKafka()
defer close(k.done)
go k.parser()
k.parser, _ = parsers.NewInfluxParser()
go k.receiver()
in <- saramaMsg(testMsg)
time.Sleep(time.Millisecond)
@@ -51,7 +55,8 @@ func TestRunParserInvalidMsg(t *testing.T) {
k, in := NewTestKafka()
defer close(k.done)
go k.parser()
k.parser, _ = parsers.NewInfluxParser()
go k.receiver()
in <- saramaMsg(invalidMsg)
time.Sleep(time.Millisecond)
@@ -63,7 +68,8 @@ func TestRunParserRespectsBuffer(t *testing.T) {
k, in := NewTestKafka()
defer close(k.done)
go k.parser()
k.parser, _ = parsers.NewInfluxParser()
go k.receiver()
for i := 0; i < pointBuffer+1; i++ {
in <- saramaMsg(testMsg)
}
@@ -77,7 +83,8 @@ func TestRunParserAndGather(t *testing.T) {
k, in := NewTestKafka()
defer close(k.done)
go k.parser()
k.parser, _ = parsers.NewInfluxParser()
go k.receiver()
in <- saramaMsg(testMsg)
time.Sleep(time.Millisecond)
@@ -89,6 +96,45 @@ func TestRunParserAndGather(t *testing.T) {
map[string]interface{}{"value": float64(23422)})
}
// Test that the parser parses kafka messages into points
func TestRunParserAndGatherGraphite(t *testing.T) {
k, in := NewTestKafka()
defer close(k.done)
k.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
go k.receiver()
in <- saramaMsg(testMsgGraphite)
time.Sleep(time.Millisecond)
acc := testutil.Accumulator{}
k.Gather(&acc)
assert.Equal(t, len(acc.Metrics), 1)
acc.AssertContainsFields(t, "cpu_load_short_graphite",
map[string]interface{}{"value": float64(23422)})
}
// Test that the parser parses kafka messages into points
func TestRunParserAndGatherJSON(t *testing.T) {
k, in := NewTestKafka()
defer close(k.done)
k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
go k.receiver()
in <- saramaMsg(testMsgJSON)
time.Sleep(time.Millisecond)
acc := testutil.Accumulator{}
k.Gather(&acc)
assert.Equal(t, len(acc.Metrics), 1)
acc.AssertContainsFields(t, "kafka_json_test",
map[string]interface{}{
"a": float64(5),
"b_c": float64(6),
})
}
func saramaMsg(val string) *sarama.ConsumerMessage {
return &sarama.ConsumerMessage{
Key: nil,

View File

@@ -11,7 +11,7 @@ import (
"sync"
"time"
"github.com/influxdata/influxdb/services/graphite"
"github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
@@ -123,37 +123,39 @@ func (_ *Statsd) Description() string {
}
const sampleConfig = `
# Address and port to host UDP listener on
### Address and port to host UDP listener on
service_address = ":8125"
# Delete gauges every interval (default=false)
### Delete gauges every interval (default=false)
delete_gauges = false
# Delete counters every interval (default=false)
### Delete counters every interval (default=false)
delete_counters = false
# Delete sets every interval (default=false)
### Delete sets every interval (default=false)
delete_sets = false
# Delete timings & histograms every interval (default=true)
### Delete timings & histograms every interval (default=true)
delete_timings = true
# Percentiles to calculate for timing & histogram stats
### Percentiles to calculate for timing & histogram stats
percentiles = [90]
# convert measurement names, "." to "_" and "-" to "__"
### convert measurement names, "." to "_" and "-" to "__"
convert_names = true
### Statsd data translation templates, more info can be read here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS.md#graphite
# templates = [
# "cpu.* measurement*"
# ]
# Number of UDP messages allowed to queue up, once filled,
# the statsd server will start dropping packets
### Number of UDP messages allowed to queue up, once filled,
### the statsd server will start dropping packets
allowed_pending_messages = 10000
# Number of timing/histogram values to track per-measurement in the
# calculation of percentiles. Raising this limit increases the accuracy
# of percentiles but also increases the memory usage and cpu time.
### Number of timing/histogram values to track per-measurement in the
### calculation of percentiles. Raising this limit increases the accuracy
### of percentiles but also increases the memory usage and cpu time.
percentile_limit = 1000
# UDP packet size for the server to listen for. This will depend on the size
# of the packets that the client is sending, which is usually 1500 bytes.
### UDP packet size for the server to listen for. This will depend on the size
### of the packets that the client is sending, which is usually 1500 bytes.
udp_packet_size = 1500
`
@@ -418,18 +420,14 @@ func (s *Statsd) parseName(bucket string) (string, string, map[string]string) {
}
}
o := graphite.Options{
Separator: "_",
Templates: s.Templates,
DefaultTags: tags,
}
var field string
name := bucketparts[0]
p, err := graphite.NewParserWithOptions(o)
p, err := graphite.NewGraphiteParser(".", s.Templates, nil)
if err == nil {
p.DefaultTags = tags
name, tags, field, _ = p.ApplyTemplate(name)
}
if s.ConvertNames {
name = strings.Replace(name, ".", "_", -1)
name = strings.Replace(name, "-", "__", -1)