Compare commits

..

8 Commits

Author SHA1 Message Date
Max U
91964888c9 comments reflect deprecated NewJsonParser 2018-06-28 16:05:58 -07:00
Max U
e25fbed0de add old exported func NewJSONParser 2018-06-28 16:00:32 -07:00
Max U
674b9578d7 readability tweak 2018-06-28 15:56:36 -07:00
Max U
885193bc8f change test files to reflect unexported newJsonParser func 2018-06-28 15:39:39 -07:00
Max U
4db667573a newJsonParser function is now unexported, must use NewParser(config) 2018-06-28 15:35:20 -07:00
Max U
7f2b2a08ae test case edit 2018-06-28 11:45:26 -07:00
Max U
c50bfc2c71 edit unittests for JSON parser 2018-06-27 16:09:14 -07:00
Max U
420dafd591 add "field_tags" to json parser config 2018-06-27 16:09:05 -07:00
34 changed files with 303 additions and 623 deletions

View File

@@ -9,7 +9,6 @@ Telegraf is able to parse the following input data formats into metrics:
1. [Nagios](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#nagios) (exec input only) 1. [Nagios](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#nagios) (exec input only)
1. [Collectd](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#collectd) 1. [Collectd](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#collectd)
1. [Dropwizard](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#dropwizard) 1. [Dropwizard](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#dropwizard)
1. [Grok](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#grok)
Telegraf metrics, like InfluxDB Telegraf metrics, like InfluxDB
[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/), [points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/),
@@ -105,9 +104,10 @@ but can be overridden using the `name_override` config option.
#### JSON Configuration: #### JSON Configuration:
The JSON data format supports specifying "tag keys". If specified, keys The JSON data format supports specifying "tag keys" and "field keys". If specified, keys
will be searched for in the root-level of the JSON blob. If the key(s) exist, will be searched for in the root-level and any nested lists of the JSON blob. If the key(s) exist,
they will be applied as tags to the Telegraf metrics. they will be applied as tags or fields to the Telegraf metrics. If "field_keys" is not specified,
all int and float values will be set as fields by default.
For example, if you had this configuration: For example, if you had this configuration:
@@ -174,6 +174,7 @@ For example, if the following configuration:
"my_tag_1", "my_tag_1",
"my_tag_2" "my_tag_2"
] ]
field_keys = ["b_c"]
``` ```
with this JSON output from a command: with this JSON output from a command:
@@ -199,11 +200,11 @@ with this JSON output from a command:
] ]
``` ```
Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2" Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2" and fielded with "b_c"
``` ```
exec_mycollector,my_tag_1=foo,my_tag_2=baz a=5,b_c=6 exec_mycollector,my_tag_1=foo,my_tag_2=baz b_c=6
exec_mycollector,my_tag_1=bar,my_tag_2=baz a=7,b_c=8 exec_mycollector,my_tag_1=bar,my_tag_2=baz b_c=8
``` ```
# Value: # Value:
@@ -652,106 +653,5 @@ For more information about the dropwizard json format see
# [inputs.exec.dropwizard_tag_paths] # [inputs.exec.dropwizard_tag_paths]
# tag1 = "tags.tag1" # tag1 = "tags.tag1"
# tag2 = "tags.tag2" # tag2 = "tags.tag2"
```
#### Grok
Parse logstash-style "grok" patterns. Patterns can be added to patterns, or custom patterns read from custom_pattern_files.
# View logstash grok pattern docs here:
# https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
# All default logstash patterns are supported, these can be viewed here:
# https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
# Available modifiers:
# string (default if nothing is specified)
# int
# float
# duration (ie, 5.23ms gets converted to int nanoseconds)
# tag (converts the field into a tag)
# drop (drops the field completely)
# Timestamp modifiers:
# ts-ansic ("Mon Jan _2 15:04:05 2006")
# ts-unix ("Mon Jan _2 15:04:05 MST 2006")
# ts-ruby ("Mon Jan 02 15:04:05 -0700 2006")
# ts-rfc822 ("02 Jan 06 15:04 MST")
# ts-rfc822z ("02 Jan 06 15:04 -0700")
# ts-rfc850 ("Monday, 02-Jan-06 15:04:05 MST")
# ts-rfc1123 ("Mon, 02 Jan 2006 15:04:05 MST")
# ts-rfc1123z ("Mon, 02 Jan 2006 15:04:05 -0700")
# ts-rfc3339 ("2006-01-02T15:04:05Z07:00")
# ts-rfc3339nano ("2006-01-02T15:04:05.999999999Z07:00")
# ts-httpd ("02/Jan/2006:15:04:05 -0700")
# ts-epoch (seconds since unix epoch)
# ts-epochnano (nanoseconds since unix epoch)
# ts-"CUSTOM"
# CUSTOM time layouts must be within quotes and be the representation of the
# "reference time", which is Mon Jan 2 15:04:05 -0700 MST 2006
# See https://golang.org/pkg/time/#Parse for more details.
# Example log file pattern, example log looks like this:
# [04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432µs
# Breakdown of the DURATION pattern below:
# NUMBER is a builtin logstash grok pattern matching float & int numbers.
# [nuµm]? is a regex specifying 0 or 1 of the characters within brackets.
# s is also regex, this pattern must end in "s".
# so DURATION will match something like '5.324ms' or '6.1µs' or '10s'
DURATION %{NUMBER}[nuµm]?s
RESPONSE_CODE %{NUMBER:response_code:tag}
RESPONSE_TIME %{DURATION:response_time_ns:duration}
EXAMPLE_LOG \[%{HTTPDATE:ts:ts-httpd}\] %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME}
# Wider-ranging username matching vs. logstash built-in %{USER}
NGUSERNAME [a-zA-Z0-9\.\@\-\+_%]+
NGUSER %{NGUSERNAME}
# Wider-ranging client IP matching
CLIENT (?:%{IPORHOST}|%{HOSTPORT}|::1)
##
## COMMON LOG PATTERNS
##
# apache & nginx logs, this is also known as the "common log format"
# see https://en.wikipedia.org/wiki/Common_Log_Format
COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NOTSPACE:ident} %{NOTSPACE:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-)
# Combined log format is the same as the common log format but with the addition
# of two quoted strings at the end for "referrer" and "agent"
# See Examples at http://httpd.apache.org/docs/current/mod/mod_log_config.html
COMBINED_LOG_FORMAT %{COMMON_LOG_FORMAT} %{QS:referrer} %{QS:agent}
# HTTPD log formats
HTTPD20_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{LOGLEVEL:loglevel:tag}\] (?:\[client %{IPORHOST:clientip}\] ){0,1}%{GREEDYDATA:errormsg}
HTTPD24_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{WORD:module}:%{LOGLEVEL:loglevel:tag}\] \[pid %{POSINT:pid:int}:tid %{NUMBER:tid:int}\]( \(%{POSINT:proxy_errorcode:int}\)%{DATA:proxy_errormessage}:)?( \[client %{IPORHOST:client}:%{POSINT:clientport}\])? %{DATA:errorcode}: %{GREEDYDATA:message}
HTTPD_ERRORLOG %{HTTPD20_ERRORLOG}|%{HTTPD24_ERRORLOG}
#### Grok Configuration:
```toml
[[inputs.reader]]
## This is a list of patterns to check the given log file(s) for.
## Note that adding patterns here increases processing time. The most
## efficient configuration is to have one pattern per logparser.
## Other common built-in patterns are:
## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs)
## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent)
grok_patterns = ["%{COMBINED_LOG_FORMAT}"]
## Name of the outputted measurement name.
grok_name_override = "apache_access_log"
## Full path(s) to custom pattern files.
grok_custom_pattern_files = []
## Custom patterns can also be defined here. Put one pattern per line.
grok_custom_patterns = '''
## Timezone allows you to provide an override for timestamps that
## don't already include an offset
## e.g. 04/06/2016 12:41:45 data one two 5.43µs
##
## Default: "" which renders UTC
## Options are as follows:
## 1. Local -- interpret based on machine localtime
## 2. "Canada/Eastern" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
## 3. UTC -- or blank/unspecified, will return timestamp in UTC
grok_timezone = "Canada/Eastern"
``` ```

View File

@@ -1261,6 +1261,18 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
} }
} }
if node, ok := tbl.Fields["field_keys"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.FieldKeys = append(c.FieldKeys, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["data_type"]; ok { if node, ok := tbl.Fields["data_type"]; ok {
if kv, ok := node.(*ast.KeyValue); ok { if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok { if str, ok := kv.Value.(*ast.String); ok {
@@ -1338,65 +1350,13 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
} }
} }
//for grok data_format
if node, ok := tbl.Fields["grok_named_patterns"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.NamedPatterns = append(c.NamedPatterns, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["grok_patterns"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.Patterns = append(c.Patterns, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["grok_custom_patterns"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.CustomPatterns = str.Value
}
}
}
if node, ok := tbl.Fields["grok_custom_pattern_files"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.CustomPatternFiles = append(c.CustomPatternFiles, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["grok_timezone"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.TimeZone = str.Value
}
}
}
c.MetricName = name c.MetricName = name
delete(tbl.Fields, "data_format") delete(tbl.Fields, "data_format")
delete(tbl.Fields, "separator") delete(tbl.Fields, "separator")
delete(tbl.Fields, "templates") delete(tbl.Fields, "templates")
delete(tbl.Fields, "tag_keys") delete(tbl.Fields, "tag_keys")
delete(tbl.Fields, "field_keys")
delete(tbl.Fields, "data_type") delete(tbl.Fields, "data_type")
delete(tbl.Fields, "collectd_auth_file") delete(tbl.Fields, "collectd_auth_file")
delete(tbl.Fields, "collectd_security_level") delete(tbl.Fields, "collectd_security_level")
@@ -1406,11 +1366,6 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
delete(tbl.Fields, "dropwizard_time_format") delete(tbl.Fields, "dropwizard_time_format")
delete(tbl.Fields, "dropwizard_tags_path") delete(tbl.Fields, "dropwizard_tags_path")
delete(tbl.Fields, "dropwizard_tag_paths") delete(tbl.Fields, "dropwizard_tag_paths")
delete(tbl.Fields, "grok_named_patterns")
delete(tbl.Fields, "grok_patterns")
delete(tbl.Fields, "grok_custom_patterns")
delete(tbl.Fields, "grok_custom_pattern_files")
delete(tbl.Fields, "grok_timezone")
return parsers.NewParser(c) return parsers.NewParser(c)
} }

View File

@@ -143,7 +143,10 @@ func TestConfig_LoadDirectory(t *testing.T) {
"Testdata did not produce correct memcached metadata.") "Testdata did not produce correct memcached metadata.")
ex := inputs.Inputs["exec"]().(*exec.Exec) ex := inputs.Inputs["exec"]().(*exec.Exec)
p, err := parsers.NewJSONParser("exec", nil, nil) p, err := parsers.NewParser(&parsers.Config{
MetricName: "exec",
DataFormat: "json",
})
assert.NoError(t, err) assert.NoError(t, err)
ex.SetParser(p) ex.SetParser(p)
ex.Command = "/usr/bin/myothercollector --foo=bar" ex.Command = "/usr/bin/myothercollector --foo=bar"

View File

@@ -85,7 +85,6 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/puppetagent" _ "github.com/influxdata/telegraf/plugins/inputs/puppetagent"
_ "github.com/influxdata/telegraf/plugins/inputs/rabbitmq" _ "github.com/influxdata/telegraf/plugins/inputs/rabbitmq"
_ "github.com/influxdata/telegraf/plugins/inputs/raindrops" _ "github.com/influxdata/telegraf/plugins/inputs/raindrops"
_ "github.com/influxdata/telegraf/plugins/inputs/reader"
_ "github.com/influxdata/telegraf/plugins/inputs/redis" _ "github.com/influxdata/telegraf/plugins/inputs/redis"
_ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb" _ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb"
_ "github.com/influxdata/telegraf/plugins/inputs/riak" _ "github.com/influxdata/telegraf/plugins/inputs/riak"

View File

@@ -93,7 +93,10 @@ func (r runnerMock) Run(e *Exec, command string, acc telegraf.Accumulator) ([]by
} }
func TestExec(t *testing.T) { func TestExec(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil) parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
e := &Exec{ e := &Exec{
runner: newRunnerMock([]byte(validJson), nil), runner: newRunnerMock([]byte(validJson), nil),
Commands: []string{"testcommand arg1"}, Commands: []string{"testcommand arg1"},
@@ -119,7 +122,10 @@ func TestExec(t *testing.T) {
} }
func TestExecMalformed(t *testing.T) { func TestExecMalformed(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil) parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
e := &Exec{ e := &Exec{
runner: newRunnerMock([]byte(malformedJson), nil), runner: newRunnerMock([]byte(malformedJson), nil),
Commands: []string{"badcommand arg1"}, Commands: []string{"badcommand arg1"},
@@ -132,7 +138,10 @@ func TestExecMalformed(t *testing.T) {
} }
func TestCommandError(t *testing.T) { func TestCommandError(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil) parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
e := &Exec{ e := &Exec{
runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")), runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")),
Commands: []string{"badcommand"}, Commands: []string{"badcommand"},

View File

@@ -26,7 +26,11 @@ func TestHTTPwithJSONFormat(t *testing.T) {
URLs: []string{url}, URLs: []string{url},
} }
metricName := "metricName" metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)
p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
plugin.SetParser(p) plugin.SetParser(p)
var acc testutil.Accumulator var acc testutil.Accumulator
@@ -63,8 +67,11 @@ func TestHTTPHeaders(t *testing.T) {
URLs: []string{url}, URLs: []string{url},
Headers: map[string]string{header: headerValue}, Headers: map[string]string{header: headerValue},
} }
metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil) p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
plugin.SetParser(p) plugin.SetParser(p)
var acc testutil.Accumulator var acc testutil.Accumulator
@@ -83,7 +90,10 @@ func TestInvalidStatusCode(t *testing.T) {
} }
metricName := "metricName" metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil) p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: metricName,
})
plugin.SetParser(p) plugin.SetParser(p)
var acc testutil.Accumulator var acc testutil.Accumulator
@@ -105,8 +115,10 @@ func TestMethod(t *testing.T) {
Method: "POST", Method: "POST",
} }
metricName := "metricName" p, _ := parsers.NewParser(&parsers.Config{
p, _ := parsers.NewJSONParser(metricName, nil, nil) DataFormat: "json",
MetricName: "metricName",
})
plugin.SetParser(p) plugin.SetParser(p)
var acc testutil.Accumulator var acc testutil.Accumulator

View File

@@ -181,7 +181,12 @@ func (h *HttpJson) gatherServer(
"server": serverURL, "server": serverURL,
} }
parser, err := parsers.NewJSONParser(msrmnt_name, h.TagKeys, tags) parser, err := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: msrmnt_name,
TagKeys: h.TagKeys,
DefaultTags: tags,
})
if err != nil { if err != nil {
return err return err
} }

View File

@@ -125,7 +125,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
k.acc = &acc k.acc = &acc
defer close(k.done) defer close(k.done)
k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil) k.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "kafka_json_test",
})
go k.receiver() go k.receiver()
in <- saramaMsg(testMsgJSON) in <- saramaMsg(testMsgJSON)
acc.Wait(1) acc.Wait(1)

View File

@@ -125,7 +125,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
k.acc = &acc k.acc = &acc
defer close(k.done) defer close(k.done)
k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil) k.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "kafka_json_test",
})
go k.receiver() go k.receiver()
in <- saramaMsg(testMsgJSON) in <- saramaMsg(testMsgJSON)
acc.Wait(1) acc.Wait(1)

View File

@@ -68,10 +68,10 @@ type Parser struct {
// specified by the user in Patterns. // specified by the user in Patterns.
// They will look like: // They will look like:
// GROK_INTERNAL_PATTERN_0, GROK_INTERNAL_PATTERN_1, etc. // GROK_INTERNAL_PATTERN_0, GROK_INTERNAL_PATTERN_1, etc.
NamedPatterns []string namedPatterns []string
CustomPatterns string CustomPatterns string
CustomPatternFiles []string CustomPatternFiles []string
MetricName string Measurement string
// Timezone is an optional component to help render log dates to // Timezone is an optional component to help render log dates to
// your chosen zone. // your chosen zone.
@@ -133,7 +133,7 @@ func (p *Parser) Compile() error {
// Give Patterns fake names so that they can be treated as named // Give Patterns fake names so that they can be treated as named
// "custom patterns" // "custom patterns"
p.NamedPatterns = make([]string, 0, len(p.Patterns)) p.namedPatterns = make([]string, 0, len(p.Patterns))
for i, pattern := range p.Patterns { for i, pattern := range p.Patterns {
pattern = strings.TrimSpace(pattern) pattern = strings.TrimSpace(pattern)
if pattern == "" { if pattern == "" {
@@ -141,10 +141,10 @@ func (p *Parser) Compile() error {
} }
name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i) name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i)
p.CustomPatterns += "\n" + name + " " + pattern + "\n" p.CustomPatterns += "\n" + name + " " + pattern + "\n"
p.NamedPatterns = append(p.NamedPatterns, "%{"+name+"}") p.namedPatterns = append(p.namedPatterns, "%{"+name+"}")
} }
if len(p.NamedPatterns) == 0 { if len(p.namedPatterns) == 0 {
return fmt.Errorf("pattern required") return fmt.Errorf("pattern required")
} }
@@ -167,6 +167,10 @@ func (p *Parser) Compile() error {
p.addCustomPatterns(scanner) p.addCustomPatterns(scanner)
} }
if p.Measurement == "" {
p.Measurement = "logparser_grok"
}
p.loc, err = time.LoadLocation(p.Timezone) p.loc, err = time.LoadLocation(p.Timezone)
if err != nil { if err != nil {
log.Printf("W! improper timezone supplied (%s), setting loc to UTC", p.Timezone) log.Printf("W! improper timezone supplied (%s), setting loc to UTC", p.Timezone)
@@ -187,7 +191,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
var values map[string]string var values map[string]string
// the matching pattern string // the matching pattern string
var patternName string var patternName string
for _, pattern := range p.NamedPatterns { for _, pattern := range p.namedPatterns {
if values, err = p.g.Parse(pattern, line); err != nil { if values, err = p.g.Parse(pattern, line); err != nil {
return nil, err return nil, err
} }
@@ -331,6 +335,9 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
case DROP: case DROP:
// goodbye! // goodbye!
default: default:
// Replace commas with dot character
v = strings.Replace(v, ",", ".", -1)
ts, err := time.ParseInLocation(t, v, p.loc) ts, err := time.ParseInLocation(t, v, p.loc)
if err == nil { if err == nil {
timestamp = ts timestamp = ts
@@ -344,26 +351,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
return nil, fmt.Errorf("logparser_grok: must have one or more fields") return nil, fmt.Errorf("logparser_grok: must have one or more fields")
} }
return metric.New(p.MetricName, tags, fields, p.tsModder.tsMod(timestamp)) return metric.New(p.Measurement, tags, fields, p.tsModder.tsMod(timestamp))
}
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
lines := strings.Split(string(buf), "\n")
var metrics []telegraf.Metric
for _, line := range lines {
m, err := p.ParseLine(line)
if err != nil {
return nil, err
}
metrics = append(metrics, m)
}
return metrics, nil
}
func (p *Parser) SetDefaultTags(tags map[string]string) {
//needs implementation
} }
func (p *Parser) addCustomPatterns(scanner *bufio.Scanner) { func (p *Parser) addCustomPatterns(scanner *bufio.Scanner) {

View File

@@ -4,18 +4,79 @@ import (
"testing" "testing"
"time" "time"
"github.com/influxdata/telegraf"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestGrokParse(t *testing.T) { var benchM telegraf.Metric
parser := Parser{
MetricName: "t_met", func Benchmark_ParseLine_CommonLogFormat(b *testing.B) {
Patterns: []string{"%{COMMON_LOG_FORMAT}"}, p := &Parser{
Patterns: []string{"%{COMMON_LOG_FORMAT}"},
} }
parser.Compile() _ = p.Compile()
_, err := parser.Parse([]byte(`127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326`))
var m telegraf.Metric
for n := 0; n < b.N; n++ {
m, _ = p.ParseLine(`127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326`)
}
benchM = m
}
func Benchmark_ParseLine_CombinedLogFormat(b *testing.B) {
p := &Parser{
Patterns: []string{"%{COMBINED_LOG_FORMAT}"},
}
_ = p.Compile()
var m telegraf.Metric
for n := 0; n < b.N; n++ {
m, _ = p.ParseLine(`127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326 "-" "Mozilla"`)
}
benchM = m
}
func Benchmark_ParseLine_CustomPattern(b *testing.B) {
p := &Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatterns: `
DURATION %{NUMBER}[nuµm]?s
RESPONSE_CODE %{NUMBER:response_code:tag}
RESPONSE_TIME %{DURATION:response_time:duration}
TEST_LOG_A %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME}
`,
}
_ = p.Compile()
var m telegraf.Metric
for n := 0; n < b.N; n++ {
m, _ = p.ParseLine(`[04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432µs 101`)
}
benchM = m
}
// Test a very simple parse pattern.
func TestSimpleParse(t *testing.T) {
p := &Parser{
Patterns: []string{"%{TESTLOG}"},
CustomPatterns: `
TESTLOG %{NUMBER:num:int} %{WORD:client}
`,
}
assert.NoError(t, p.Compile())
m, err := p.ParseLine(`142 bot`)
assert.NoError(t, err) assert.NoError(t, err)
require.NotNil(t, m)
assert.Equal(t,
map[string]interface{}{
"num": int64(142),
"client": "bot",
},
m.Fields())
} }
// Verify that patterns with a regex lookahead fail at compile time. // Verify that patterns with a regex lookahead fail at compile time.
@@ -35,7 +96,8 @@ func TestParsePatternsWithLookahead(t *testing.T) {
func TestMeasurementName(t *testing.T) { func TestMeasurementName(t *testing.T) {
p := &Parser{ p := &Parser{
Patterns: []string{"%{COMMON_LOG_FORMAT}"}, Measurement: "my_web_log",
Patterns: []string{"%{COMMON_LOG_FORMAT}"},
} }
assert.NoError(t, p.Compile()) assert.NoError(t, p.Compile())
@@ -54,11 +116,13 @@ func TestMeasurementName(t *testing.T) {
}, },
m.Fields()) m.Fields())
assert.Equal(t, map[string]string{"verb": "GET", "resp_code": "200"}, m.Tags()) assert.Equal(t, map[string]string{"verb": "GET", "resp_code": "200"}, m.Tags())
assert.Equal(t, "my_web_log", m.Name())
} }
func TestCLF_IPv6(t *testing.T) { func TestCLF_IPv6(t *testing.T) {
p := &Parser{ p := &Parser{
Patterns: []string{"%{COMMON_LOG_FORMAT}"}, Measurement: "my_web_log",
Patterns: []string{"%{COMMON_LOG_FORMAT}"},
} }
assert.NoError(t, p.Compile()) assert.NoError(t, p.Compile())
@@ -76,6 +140,7 @@ func TestCLF_IPv6(t *testing.T) {
}, },
m.Fields()) m.Fields())
assert.Equal(t, map[string]string{"verb": "GET", "resp_code": "200"}, m.Tags()) assert.Equal(t, map[string]string{"verb": "GET", "resp_code": "200"}, m.Tags())
assert.Equal(t, "my_web_log", m.Name())
m, err = p.ParseLine(`::1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326`) m, err = p.ParseLine(`::1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326`)
require.NotNil(t, m) require.NotNil(t, m)
@@ -91,6 +156,7 @@ func TestCLF_IPv6(t *testing.T) {
}, },
m.Fields()) m.Fields())
assert.Equal(t, map[string]string{"verb": "GET", "resp_code": "200"}, m.Tags()) assert.Equal(t, map[string]string{"verb": "GET", "resp_code": "200"}, m.Tags())
assert.Equal(t, "my_web_log", m.Name())
} }
func TestCustomInfluxdbHttpd(t *testing.T) { func TestCustomInfluxdbHttpd(t *testing.T) {
@@ -904,3 +970,33 @@ func TestNewlineInPatterns(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, m) require.NotNil(t, m)
} }
func TestSyslogTimestampParser(t *testing.T) {
p := &Parser{
Patterns: []string{`%{SYSLOGTIMESTAMP:timestamp:ts-syslog} value=%{NUMBER:value:int}`},
timeFunc: func() time.Time { return time.Date(2018, time.April, 1, 0, 0, 0, 0, nil) },
}
require.NoError(t, p.Compile())
m, err := p.ParseLine("Sep 25 09:01:55 value=42")
require.NoError(t, err)
require.NotNil(t, m)
require.Equal(t, 2018, m.Time().Year())
}
func TestReplaceTimestampComma(t *testing.T) {
p := &Parser{
Patterns: []string{`%{TIMESTAMP_ISO8601:timestamp:ts-"2006-01-02 15:04:05.000"} successfulMatches=%{NUMBER:value:int}`},
}
require.NoError(t, p.Compile())
m, err := p.ParseLine("2018-02-21 13:10:34,555 successfulMatches=1")
require.NoError(t, err)
require.NotNil(t, m)
require.Equal(t, 2018, m.Time().Year())
require.Equal(t, 13, m.Time().Hour())
require.Equal(t, 34, m.Time().Second())
//Convert Nanosecond to milisecond for compare
require.Equal(t, 555, m.Time().Nanosecond()/1000000)
}

View File

@@ -14,8 +14,9 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/globpath" "github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
// Parsers // Parsers
"github.com/influxdata/telegraf/plugins/inputs/logparser/grok"
) )
const ( const (
@@ -35,10 +36,9 @@ type logEntry struct {
// LogParserPlugin is the primary struct to implement the interface for logparser plugin // LogParserPlugin is the primary struct to implement the interface for logparser plugin
type LogParserPlugin struct { type LogParserPlugin struct {
Files []string Files []string
FromBeginning bool FromBeginning bool
WatchMethod string WatchMethod string
MeasurementName string `toml:"measurement"`
tailers map[string]*tail.Tail tailers map[string]*tail.Tail
lines chan logEntry lines chan logEntry
@@ -49,13 +49,7 @@ type LogParserPlugin struct {
sync.Mutex sync.Mutex
GrokParser parsers.Parser `toml:"grok"` GrokParser *grok.Parser `toml:"grok"`
Patterns []string
NamedPatterns []string
CustomPatterns string
CustomPatternFiles []string
TimeZone string
} }
const sampleConfig = ` const sampleConfig = `
@@ -138,21 +132,6 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
// Looks for fields which implement LogParser interface // Looks for fields which implement LogParser interface
l.parsers = []LogParser{} l.parsers = []LogParser{}
config := &parsers.Config{
Patterns: l.Patterns,
NamedPatterns: l.NamedPatterns,
CustomPatterns: l.CustomPatterns,
CustomPatternFiles: l.CustomPatternFiles,
TimeZone: l.TimeZone,
DataFormat: "grok",
}
var err error
l.GrokParser, err = parsers.NewParser(config)
if err != nil {
return err
}
s := reflect.ValueOf(l).Elem() s := reflect.ValueOf(l).Elem()
for i := 0; i < s.NumField(); i++ { for i := 0; i < s.NumField(); i++ {
f := s.Field(i) f := s.Field(i)
@@ -173,6 +152,13 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
return fmt.Errorf("logparser input plugin: no parser defined") return fmt.Errorf("logparser input plugin: no parser defined")
} }
// compile log parser patterns:
for _, parser := range l.parsers {
if err := parser.Compile(); err != nil {
return err
}
}
l.wg.Add(1) l.wg.Add(1)
go l.parser() go l.parser()
@@ -261,8 +247,8 @@ func (l *LogParserPlugin) receiver(tailer *tail.Tail) {
} }
} }
// parse is launched as a goroutine to watch the l.lines channel. // parser is launched as a goroutine to watch the l.lines channel.
// when a line is available, parse parses it and adds the metric(s) to the // when a line is available, parser parses it and adds the metric(s) to the
// accumulator. // accumulator.
func (l *LogParserPlugin) parser() { func (l *LogParserPlugin) parser() {
defer l.wg.Done() defer l.wg.Done()
@@ -279,17 +265,18 @@ func (l *LogParserPlugin) parser() {
continue continue
} }
} }
m, err = l.GrokParser.ParseLine(entry.line) for _, parser := range l.parsers {
if err == nil { m, err = parser.ParseLine(entry.line)
if m != nil { if err == nil {
tags := m.Tags() if m != nil {
tags["path"] = entry.path tags := m.Tags()
l.acc.AddFields(l.MeasurementName, m.Fields(), tags, m.Time()) tags["path"] = entry.path
l.acc.AddFields(m.Name(), m.Fields(), tags, m.Time())
}
} else {
log.Println("E! Error parsing log line: " + err.Error())
} }
} else {
log.Println("E! Error parsing log line: " + err.Error())
} }
} }
} }

View File

@@ -2,7 +2,6 @@ package logparser
import ( import (
"io/ioutil" "io/ioutil"
"log"
"os" "os"
"runtime" "runtime"
"strings" "strings"
@@ -10,6 +9,8 @@ import (
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/influxdata/telegraf/plugins/inputs/logparser/grok"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@@ -25,14 +26,17 @@ func TestStartNoParsers(t *testing.T) {
func TestGrokParseLogFilesNonExistPattern(t *testing.T) { func TestGrokParseLogFilesNonExistPattern(t *testing.T) {
thisdir := getCurrentDir() thisdir := getCurrentDir()
p := &grok.Parser{
logparser := &LogParserPlugin{
FromBeginning: true,
Files: []string{thisdir + "grok/testdata/*.log"},
Patterns: []string{"%{FOOBAR}"}, Patterns: []string{"%{FOOBAR}"},
CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"}, CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"},
} }
logparser := &LogParserPlugin{
FromBeginning: true,
Files: []string{thisdir + "grok/testdata/*.log"},
GrokParser: p,
}
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
err := logparser.Start(&acc) err := logparser.Start(&acc)
assert.Error(t, err) assert.Error(t, err)
@@ -40,13 +44,15 @@ func TestGrokParseLogFilesNonExistPattern(t *testing.T) {
func TestGrokParseLogFiles(t *testing.T) { func TestGrokParseLogFiles(t *testing.T) {
thisdir := getCurrentDir() thisdir := getCurrentDir()
p := &grok.Parser{
logparser := &LogParserPlugin{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"}, Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"}, CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"},
FromBeginning: true, }
Files: []string{thisdir + "grok/testdata/*.log"},
MeasurementName: "logparser_grok", logparser := &LogParserPlugin{
FromBeginning: true,
Files: []string{thisdir + "grok/testdata/*.log"},
GrokParser: p,
} }
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
@@ -56,7 +62,6 @@ func TestGrokParseLogFiles(t *testing.T) {
logparser.Stop() logparser.Stop()
log.Printf("metric[0] %v, tags: %v, fields: %v", acc.Metrics[0].Measurement, acc.Metrics[0].Tags, acc.Metrics[0].Fields)
acc.AssertContainsTaggedFields(t, "logparser_grok", acc.AssertContainsTaggedFields(t, "logparser_grok",
map[string]interface{}{ map[string]interface{}{
"clientip": "192.168.1.1", "clientip": "192.168.1.1",
@@ -86,13 +91,15 @@ func TestGrokParseLogFilesAppearLater(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
thisdir := getCurrentDir() thisdir := getCurrentDir()
p := &grok.Parser{
logparser := &LogParserPlugin{
FromBeginning: true,
Files: []string{emptydir + "/*.log"},
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"}, Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"}, CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"},
MeasurementName: "logparser_grok", }
logparser := &LogParserPlugin{
FromBeginning: true,
Files: []string{emptydir + "/*.log"},
GrokParser: p,
} }
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
@@ -123,13 +130,16 @@ func TestGrokParseLogFilesAppearLater(t *testing.T) {
// pattern available for test_b.log // pattern available for test_b.log
func TestGrokParseLogFilesOneBad(t *testing.T) { func TestGrokParseLogFilesOneBad(t *testing.T) {
thisdir := getCurrentDir() thisdir := getCurrentDir()
p := &grok.Parser{
logparser := &LogParserPlugin{
FromBeginning: true,
Files: []string{thisdir + "grok/testdata/test_a.log"},
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_BAD}"}, Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_BAD}"},
CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"}, CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"},
MeasurementName: "logparser_grok", }
assert.NoError(t, p.Compile())
logparser := &LogParserPlugin{
FromBeginning: true,
Files: []string{thisdir + "grok/testdata/test_a.log"},
GrokParser: p,
} }
acc := testutil.Accumulator{} acc := testutil.Accumulator{}

View File

@@ -172,7 +172,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
n.acc = &acc n.acc = &acc
defer close(n.done) defer close(n.done)
n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil) n.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "nats_json_test",
})
go n.receiver() go n.receiver()
in <- mqttMsg(testMsgJSON) in <- mqttMsg(testMsgJSON)

View File

@@ -108,7 +108,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
n.acc = &acc n.acc = &acc
defer close(n.done) defer close(n.done)
n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil) n.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "nats_json_test",
})
n.wg.Add(1) n.wg.Add(1)
go n.receiver() go n.receiver()
in <- natsMsg(testMsgJSON) in <- natsMsg(testMsgJSON)

View File

@@ -1,23 +0,0 @@
# Reader Input Plugin
The Reader Plugin updates a list of files every interval and parses the data inside.
Files will always be read from the beginning.
This plugin can parse any "data_format" formats.
### Configuration:
```toml
[[inputs.reader]]
## Files to parse each interval.
## These accept standard unix glob matching rules, but with the addition of
## ** as a "super asterisk". ie:
## /var/log/**.log -> recursively find all .log files in /var/log
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
## /var/log/apache.log -> only tail the apache log file
files = ["/var/log/apache/access.log"]
## The dataformat to be read from files
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = ""
```

View File

@@ -1,13 +0,0 @@
version: '3'
services:
telegraf:
image: glinton/scratch
volumes:
- ./telegraf.conf:/telegraf.conf
- ../../../../telegraf:/telegraf
- ./json_a.log:/var/log/test.log
entrypoint:
- /telegraf
- --config
- /telegraf.conf

View File

@@ -1,14 +0,0 @@
{
"parent": {
"child": 3.0,
"ignored_child": "hi"
},
"ignored_null": null,
"integer": 4,
"list": [3, 4],
"ignored_parent": {
"another_ignored_null": null,
"ignored_string": "hello, world!"
},
"another_list": [4]
}

View File

@@ -1,7 +0,0 @@
[[inputs.reader]]
files = ["/var/log/test.log"]
data_format = "json"
name_override = "json_reader"
[[outputs.file]]
files = ["stdout"]

View File

@@ -1,95 +0,0 @@
package reader
import (
"fmt"
"io/ioutil"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)
type Reader struct {
Filepaths []string `toml:"files"`
FromBeginning bool
parser parsers.Parser
Filenames []string
}
const sampleConfig = `## Files to parse each interval.
## These accept standard unix glob matching rules, but with the addition of
## ** as a "super asterisk". ie:
## /var/log/**.log -> recursively find all .log files in /var/log
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
## /var/log/apache.log -> only tail the apache log file
files = ["/var/log/apache/access.log"]
## The dataformat to be read from files
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = ""
`
// SampleConfig returns the default configuration of the Input
func (r *Reader) SampleConfig() string {
return sampleConfig
}
func (r *Reader) Description() string {
return "reload and gather from file[s] on telegraf's interval"
}
func (r *Reader) Gather(acc telegraf.Accumulator) error {
r.refreshFilePaths()
for _, k := range r.Filenames {
metrics, err := r.readMetric(k)
if err != nil {
return err
}
for _, m := range metrics {
acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
}
return nil
}
func (r *Reader) SetParser(p parsers.Parser) {
r.parser = p
}
func (r *Reader) refreshFilePaths() error {
var allFiles []string
for _, filepath := range r.Filepaths {
g, err := globpath.Compile(filepath)
if err != nil {
return fmt.Errorf("E! Error Glob: %v could not be compiled, %s", filepath, err)
}
files := g.Match()
for k := range files {
allFiles = append(allFiles, k)
}
}
r.Filenames = allFiles
return nil
}
func (r *Reader) readMetric(filename string) ([]telegraf.Metric, error) {
fileContents, err := ioutil.ReadFile(filename)
if err != nil {
return nil, fmt.Errorf("E! Error file: %v could not be read, %s", filename, err)
}
return r.parser.Parse(fileContents)
}
func init() {
inputs.Add("reader", func() telegraf.Input {
return &Reader{}
})
}

View File

@@ -1,64 +0,0 @@
package reader
import (
"runtime"
"strings"
"testing"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
func TestRefreshFilePaths(t *testing.T) {
testDir := getPluginDir()
r := Reader{
Filepaths: []string{testDir + "/logparser/grok/testdata/**.log"},
}
r.refreshFilePaths()
assert.Equal(t, len(r.Filenames), 2)
}
func TestJSONParserCompile(t *testing.T) {
testDir := getPluginDir()
var acc testutil.Accumulator
r := Reader{
Filepaths: []string{testDir + "/reader/testfiles/json_a.log"},
}
parserConfig := parsers.Config{
DataFormat: "json",
TagKeys: []string{"parent_ignored_child"},
}
nParser, err := parsers.NewParser(&parserConfig)
r.parser = nParser
assert.NoError(t, err)
r.Gather(&acc)
assert.Equal(t, map[string]string{"parent_ignored_child": "hi"}, acc.Metrics[0].Tags)
assert.Equal(t, 5, len(acc.Metrics[0].Fields))
}
func TestGrokParser(t *testing.T) {
testDir := getPluginDir()
var acc testutil.Accumulator
r := Reader{
Filepaths: []string{testDir + "/reader/testfiles/grok_a.log"},
}
parserConfig := parsers.Config{
DataFormat: "grok",
Patterns: []string{"%{COMMON_LOG_FORMAT}"},
}
nParser, err := parsers.NewParser(&parserConfig)
r.parser = nParser
assert.NoError(t, err)
err = r.Gather(&acc)
assert.Equal(t, 2, len(acc.Metrics))
}
func getPluginDir() string {
_, filename, _, _ := runtime.Caller(1)
return strings.Replace(filename, "/reader/reader_test.go", "", 1)
}

View File

@@ -1,2 +0,0 @@
127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326
128.0.0.1 user-identifier tony [10/Oct/2000:13:55:36 -0800] "GET /apache_pb.gif HTTP/1.0" 300 45

View File

@@ -1,14 +0,0 @@
{
"parent": {
"child": 3.0,
"ignored_child": "hi"
},
"ignored_null": null,
"integer": 4,
"list": [3, 4],
"ignored_parent": {
"another_ignored_null": null,
"ignored_string": "hello, world!"
},
"another_list": [4]
}

View File

@@ -300,7 +300,10 @@ func TestRunParserJSONMsg(t *testing.T) {
listener.acc = &acc listener.acc = &acc
defer close(listener.done) defer close(listener.done)
listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil) listener.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "udp_json_test",
})
listener.wg.Add(1) listener.wg.Add(1)
go listener.tcpParser() go listener.tcpParser()

View File

@@ -193,7 +193,10 @@ func TestRunParserJSONMsg(t *testing.T) {
listener.acc = &acc listener.acc = &acc
defer close(listener.done) defer close(listener.done)
listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil) listener.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "udp_json_test",
})
listener.wg.Add(1) listener.wg.Add(1)
go listener.udpParser() go listener.udpParser()

View File

@@ -1,73 +0,0 @@
# Captures are a slightly modified version of logstash "grok" patterns, with
# the format %{<capture syntax>[:<semantic name>][:<modifier>]}
# By default all named captures are converted into string fields.
# Modifiers can be used to convert captures to other types or tags.
# Timestamp modifiers can be used to convert captures to the timestamp of the
# parsed metric.
# View logstash grok pattern docs here:
# https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
# All default logstash patterns are supported, these can be viewed here:
# https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
# Available modifiers:
# string (default if nothing is specified)
# int
# float
# duration (ie, 5.23ms gets converted to int nanoseconds)
# tag (converts the field into a tag)
# drop (drops the field completely)
# Timestamp modifiers:
# ts-ansic ("Mon Jan _2 15:04:05 2006")
# ts-unix ("Mon Jan _2 15:04:05 MST 2006")
# ts-ruby ("Mon Jan 02 15:04:05 -0700 2006")
# ts-rfc822 ("02 Jan 06 15:04 MST")
# ts-rfc822z ("02 Jan 06 15:04 -0700")
# ts-rfc850 ("Monday, 02-Jan-06 15:04:05 MST")
# ts-rfc1123 ("Mon, 02 Jan 2006 15:04:05 MST")
# ts-rfc1123z ("Mon, 02 Jan 2006 15:04:05 -0700")
# ts-rfc3339 ("2006-01-02T15:04:05Z07:00")
# ts-rfc3339nano ("2006-01-02T15:04:05.999999999Z07:00")
# ts-httpd ("02/Jan/2006:15:04:05 -0700")
# ts-epoch (seconds since unix epoch)
# ts-epochnano (nanoseconds since unix epoch)
# ts-"CUSTOM"
# CUSTOM time layouts must be within quotes and be the representation of the
# "reference time", which is Mon Jan 2 15:04:05 -0700 MST 2006
# See https://golang.org/pkg/time/#Parse for more details.
# Example log file pattern, example log looks like this:
# [04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432µs
# Breakdown of the DURATION pattern below:
# NUMBER is a builtin logstash grok pattern matching float & int numbers.
# [nuµm]? is a regex specifying 0 or 1 of the characters within brackets.
# s is also regex, this pattern must end in "s".
# so DURATION will match something like '5.324ms' or '6.1µs' or '10s'
DURATION %{NUMBER}[nuµm]?s
RESPONSE_CODE %{NUMBER:response_code:tag}
RESPONSE_TIME %{DURATION:response_time_ns:duration}
EXAMPLE_LOG \[%{HTTPDATE:ts:ts-httpd}\] %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME}
# Wider-ranging username matching vs. logstash built-in %{USER}
NGUSERNAME [a-zA-Z0-9\.\@\-\+_%]+
NGUSER %{NGUSERNAME}
# Wider-ranging client IP matching
CLIENT (?:%{IPORHOST}|%{HOSTPORT}|::1)
##
## COMMON LOG PATTERNS
##
# apache & nginx logs, this is also known as the "common log format"
# see https://en.wikipedia.org/wiki/Common_Log_Format
COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NOTSPACE:ident} %{NOTSPACE:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-)
# Combined log format is the same as the common log format but with the addition
# of two quoted strings at the end for "referrer" and "agent"
# See Examples at http://httpd.apache.org/docs/current/mod/mod_log_config.html
COMBINED_LOG_FORMAT %{COMMON_LOG_FORMAT} %{QS:referrer} %{QS:agent}
# HTTPD log formats
HTTPD20_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{LOGLEVEL:loglevel:tag}\] (?:\[client %{IPORHOST:clientip}\] ){0,1}%{GREEDYDATA:errormsg}
HTTPD24_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{WORD:module}:%{LOGLEVEL:loglevel:tag}\] \[pid %{POSINT:pid:int}:tid %{NUMBER:tid:int}\]( \(%{POSINT:proxy_errorcode:int}\)%{DATA:proxy_errormessage}:)?( \[client %{IPORHOST:client}:%{POSINT:clientport}\])? %{DATA:errorcode}: %{GREEDYDATA:message}
HTTPD_ERRORLOG %{HTTPD20_ERRORLOG}|%{HTTPD24_ERRORLOG}

Binary file not shown.

View File

@@ -20,6 +20,7 @@ var (
type JSONParser struct { type JSONParser struct {
MetricName string MetricName string
TagKeys []string TagKeys []string
FieldKeys []string
DefaultTags map[string]string DefaultTags map[string]string
} }
@@ -86,6 +87,17 @@ func (p *JSONParser) switchFieldToTag(tags map[string]string, fields map[string]
} }
} }
//if field_keys is specified, only those values should be reported as fields
if len(p.FieldKeys) > 0 {
nFields := make(map[string]interface{})
for _, name := range p.FieldKeys {
if fields[name] != nil {
nFields[name] = fields[name]
}
}
return tags, nFields
}
//remove any additional string/bool values from fields //remove any additional string/bool values from fields
for k := range fields { for k := range fields {
switch fields[k].(type) { switch fields[k].(type) {

View File

@@ -1,6 +1,7 @@
package json package json
import ( import (
"log"
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@@ -448,22 +449,28 @@ func TestJSONParseNestedArray(t *testing.T) {
"total_devices": 5, "total_devices": 5,
"total_threads": 10, "total_threads": 10,
"shares": { "shares": {
"total": 5, "total": 5,
"accepted": 5, "accepted": 5,
"rejected": 0, "rejected": 0,
"avg_find_time": 4, "avg_find_time": 4,
"tester": "work", "tester": "work",
"tester2": "don't want this", "tester2": "don't want this",
"tester3": 7.93 "tester3": {
"hello":"sup",
"fun":"money",
"break":9
}
} }
}` }`
parser := JSONParser{ parser := JSONParser{
MetricName: "json_test", MetricName: "json_test",
TagKeys: []string{"total_devices", "total_threads", "shares_tester", "shares_tester3"}, TagKeys: []string{"total_devices", "total_threads", "shares_tester3_fun"},
FieldKeys: []string{"shares_tester", "shares_tester3_break"},
} }
metrics, err := parser.Parse([]byte(testString)) metrics, err := parser.Parse([]byte(testString))
log.Printf("m[0] name: %v, tags: %v, fields: %v", metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, len(parser.TagKeys), len(metrics[0].Tags())) require.Equal(t, len(parser.TagKeys), len(metrics[0].Tags()))
} }

View File

@@ -8,7 +8,6 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/collectd" "github.com/influxdata/telegraf/plugins/parsers/collectd"
"github.com/influxdata/telegraf/plugins/parsers/dropwizard" "github.com/influxdata/telegraf/plugins/parsers/dropwizard"
"github.com/influxdata/telegraf/plugins/parsers/graphite" "github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/grok"
"github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/parsers/nagios" "github.com/influxdata/telegraf/plugins/parsers/nagios"
@@ -57,6 +56,8 @@ type Config struct {
// TagKeys only apply to JSON data // TagKeys only apply to JSON data
TagKeys []string TagKeys []string
// FieldKeys only apply to JSON
FieldKeys []string
// MetricName applies to JSON & value. This will be the name of the measurement. // MetricName applies to JSON & value. This will be the name of the measurement.
MetricName string MetricName string
@@ -88,13 +89,6 @@ type Config struct {
// an optional map containing tag names as keys and json paths to retrieve the tag values from as values // an optional map containing tag names as keys and json paths to retrieve the tag values from as values
// used if TagsPath is empty or doesn't return any tags // used if TagsPath is empty or doesn't return any tags
DropwizardTagPathsMap map[string]string DropwizardTagPathsMap map[string]string
//grok patterns
Patterns []string
NamedPatterns []string
CustomPatterns string
CustomPatternFiles []string
TimeZone string
} }
// NewParser returns a Parser interface based on the given config. // NewParser returns a Parser interface based on the given config.
@@ -103,8 +97,8 @@ func NewParser(config *Config) (Parser, error) {
var parser Parser var parser Parser
switch config.DataFormat { switch config.DataFormat {
case "json": case "json":
parser, err = NewJSONParser(config.MetricName, parser, err = newJSONParser(config.MetricName,
config.TagKeys, config.DefaultTags) config.TagKeys, config.FieldKeys, config.DefaultTags)
case "value": case "value":
parser, err = NewValueParser(config.MetricName, parser, err = NewValueParser(config.MetricName,
config.DataType, config.DefaultTags) config.DataType, config.DefaultTags)
@@ -128,38 +122,28 @@ func NewParser(config *Config) (Parser, error) {
config.DefaultTags, config.DefaultTags,
config.Separator, config.Separator,
config.Templates) config.Templates)
case "grok":
parser, err = newGrokParser(
config.MetricName,
config.Patterns,
config.NamedPatterns,
config.CustomPatterns,
config.CustomPatternFiles,
config.TimeZone)
default: default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat) err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
} }
return parser, err return parser, err
} }
func newGrokParser(metricName string, func newJSONParser(
patterns []string, metricName string,
nPatterns []string, tagKeys []string,
cPatterns string, fieldKeys []string,
cPatternFiles []string, tZone string) (Parser, error) { defaultTags map[string]string,
parser := grok.Parser{ ) (Parser, error) {
MetricName: metricName, parser := &json.JSONParser{
Patterns: patterns, MetricName: metricName,
NamedPatterns: nPatterns, TagKeys: tagKeys,
CustomPatterns: cPatterns, FieldKeys: fieldKeys,
CustomPatternFiles: cPatternFiles, DefaultTags: defaultTags,
Timezone: tZone,
} }
return parser, nil
err := parser.Compile()
return &parser, err
} }
//Deprecated: Use NewParser to get a JSONParser object
func NewJSONParser( func NewJSONParser(
metricName string, metricName string,
tagKeys []string, tagKeys []string,