From e12eced211aa1d185c89e959a874b6fb3aff0fb9 Mon Sep 17 00:00:00 2001 From: Max U Date: Thu, 21 Jun 2018 11:26:14 -0700 Subject: [PATCH 01/18] input plugin that reads files each interval --- plugins/inputs/reader/reader.go | 106 +++++++++++++++++++++ plugins/inputs/reader/reader_test.go | 41 ++++++++ plugins/inputs/reader/testfiles/json_a.log | 14 +++ 3 files changed, 161 insertions(+) create mode 100644 plugins/inputs/reader/reader.go create mode 100644 plugins/inputs/reader/reader_test.go create mode 100644 plugins/inputs/reader/testfiles/json_a.log diff --git a/plugins/inputs/reader/reader.go b/plugins/inputs/reader/reader.go new file mode 100644 index 000000000..74b180e25 --- /dev/null +++ b/plugins/inputs/reader/reader.go @@ -0,0 +1,106 @@ +package reader + +import ( + "io/ioutil" + "log" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/globpath" + "github.com/influxdata/telegraf/plugins/parsers" +) + +type Reader struct { + Filepaths []string `toml:"files"` + FromBeginning bool + DataFormat string `toml:"data_format"` + ParserConfig parsers.Config + Parser parsers.Parser + Tags []string + + Filenames []string +} + +const sampleConfig = `## Log files to parse. +## These accept standard unix glob matching rules, but with the addition of +## ** as a "super asterisk". ie: +## /var/log/**.log -> recursively find all .log files in /var/log +## /var/log/*/*.log -> find all .log files with a parent dir in /var/log +## /var/log/apache.log -> only tail the apache log file +files = ["/var/log/apache/access.log"] + +## The dataformat to be read from files +## Each data format has its own unique set of configuration options, read +## more about them here: +## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md +data_format = "" +'''` + +// SampleConfig returns the default configuration of the Input +func (r *Reader) SampleConfig() string { + return sampleConfig +} + +func (r *Reader) Description() string { + return "reload and gather from file[s] on telegraf's interval" +} + +func (r *Reader) Gather(acc telegraf.Accumulator) error { + r.refreshFilePaths() + for _, k := range r.Filenames { + metrics, err := r.readMetric(k) + if err != nil { + return err + } + + for _, m := range metrics { + acc.AddFields(m.Name(), m.Fields(), m.Tags()) + } + } + return nil +} + +func (r *Reader) compileParser() { + if r.DataFormat == "grok" { + log.Printf("Grok isn't supported yet") + return + } + r.ParserConfig = parsers.Config{ + DataFormat: r.DataFormat, + TagKeys: r.Tags, + } + nParser, err := parsers.NewParser(&r.ParserConfig) + if err != nil { + log.Printf("E! Error building parser: %v", err) + } + + r.Parser = nParser +} + +func (r *Reader) refreshFilePaths() { + var allFiles []string + for _, filepath := range r.Filepaths { + g, err := globpath.Compile(filepath) + if err != nil { + log.Printf("E! Error Glob %s failed to compile, %s", filepath, err) + continue + } + files := g.Match() + + for k := range files { + allFiles = append(allFiles, k) + } + } + + r.Filenames = allFiles +} + +//requires that Parser has been compiled +func (r *Reader) readMetric(filename string) ([]telegraf.Metric, error) { + fileContents, err := ioutil.ReadFile(filename) + if err != nil { + log.Printf("E! File could not be opened: %v", filename) + } + + return r.Parser.Parse(fileContents) + +} diff --git a/plugins/inputs/reader/reader_test.go b/plugins/inputs/reader/reader_test.go new file mode 100644 index 000000000..e073a6f54 --- /dev/null +++ b/plugins/inputs/reader/reader_test.go @@ -0,0 +1,41 @@ +package reader + +import ( + "log" + "runtime" + "strings" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" +) + +func TestRefreshFilePaths(t *testing.T) { + testDir := getPluginDir() + r := Reader{ + Filepaths: []string{testDir + "/logparser/grok/testdata/**.log"}, + } + + r.refreshFilePaths() + //log.Printf("filenames: %v", filenames) + assert.Equal(t, len(r.Filenames), 2) +} +func TestJSONParserCompile(t *testing.T) { + testDir := getPluginDir() + var acc testutil.Accumulator + r := Reader{ + Filepaths: []string{testDir + "/reader/testfiles/**.log"}, + DataFormat: "json", + Tags: []string{"parent_ignored_child"}, + } + r.compileParser() + r.Gather(&acc) + log.Printf("acc: %v", acc.Metrics[0].Tags) + assert.Equal(t, map[string]string{"parent_ignored_child": "hi"}, acc.Metrics[0].Tags) + assert.Equal(t, 5, len(acc.Metrics[0].Fields)) +} + +func getPluginDir() string { + _, filename, _, _ := runtime.Caller(1) + return strings.Replace(filename, "/reader/reader_test.go", "", 1) +} diff --git a/plugins/inputs/reader/testfiles/json_a.log b/plugins/inputs/reader/testfiles/json_a.log new file mode 100644 index 000000000..739fd65d8 --- /dev/null +++ b/plugins/inputs/reader/testfiles/json_a.log @@ -0,0 +1,14 @@ +{ + "parent": { + "child": 3.0, + "ignored_child": "hi" + }, + "ignored_null": null, + "integer": 4, + "list": [3, 4], + "ignored_parent": { + "another_ignored_null": null, + "ignored_string": "hello, world!" + }, + "another_list": [4] + } \ No newline at end of file From 08a11d7bfd3a9ca2414e777a5fee3b33856a17e1 Mon Sep 17 00:00:00 2001 From: Max U Date: Thu, 21 Jun 2018 11:44:02 -0700 Subject: [PATCH 02/18] change config file --- plugins/inputs/reader/reader.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/plugins/inputs/reader/reader.go b/plugins/inputs/reader/reader.go index 74b180e25..a8bf5c468 100644 --- a/plugins/inputs/reader/reader.go +++ b/plugins/inputs/reader/reader.go @@ -20,7 +20,7 @@ type Reader struct { Filenames []string } -const sampleConfig = `## Log files to parse. +const sampleConfig = `## Files to parse. ## These accept standard unix glob matching rules, but with the addition of ## ** as a "super asterisk". ie: ## /var/log/**.log -> recursively find all .log files in /var/log @@ -32,8 +32,7 @@ files = ["/var/log/apache/access.log"] ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md -data_format = "" -'''` +data_format = ""` // SampleConfig returns the default configuration of the Input func (r *Reader) SampleConfig() string { From 9c4b52256ddab66f696745d5e0a4d652e0cfe026 Mon Sep 17 00:00:00 2001 From: Max U Date: Thu, 21 Jun 2018 13:13:46 -0700 Subject: [PATCH 03/18] tweak metric output --- plugins/inputs/reader/reader.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/plugins/inputs/reader/reader.go b/plugins/inputs/reader/reader.go index a8bf5c468..274f22a66 100644 --- a/plugins/inputs/reader/reader.go +++ b/plugins/inputs/reader/reader.go @@ -59,6 +59,10 @@ func (r *Reader) Gather(acc telegraf.Accumulator) error { } func (r *Reader) compileParser() { + if r.DataFormat == "" { + log.Printf("E! No data_format specified") + return + } if r.DataFormat == "grok" { log.Printf("Grok isn't supported yet") return From 4e24a1bbe35abf9b850b4c29bcd5d97b6b63c053 Mon Sep 17 00:00:00 2001 From: Max U Date: Thu, 21 Jun 2018 15:56:20 -0700 Subject: [PATCH 04/18] add grok as a top level parser --- plugins/inputs/reader/reader.go | 16 ++++++++++---- plugins/inputs/reader/reader_test.go | 17 +++++++++++++++ plugins/parsers/registry.go | 31 ++++++++++++++++++++++++++++ 3 files changed, 60 insertions(+), 4 deletions(-) diff --git a/plugins/inputs/reader/reader.go b/plugins/inputs/reader/reader.go index 274f22a66..36c2a742e 100644 --- a/plugins/inputs/reader/reader.go +++ b/plugins/inputs/reader/reader.go @@ -18,6 +18,12 @@ type Reader struct { Tags []string Filenames []string + + //for grok parser + Patterns []string + namedPatterns []string + CustomPatterns string + CustomPatternFiles []string } const sampleConfig = `## Files to parse. @@ -63,13 +69,15 @@ func (r *Reader) compileParser() { log.Printf("E! No data_format specified") return } - if r.DataFormat == "grok" { - log.Printf("Grok isn't supported yet") - return - } r.ParserConfig = parsers.Config{ DataFormat: r.DataFormat, TagKeys: r.Tags, + + //grok settings + Patterns: r.Patterns, + NamedPatterns: r.namedPatterns, + CustomPatterns: r.CustomPatterns, + CustomPatternFiles: r.CustomPatternFiles, } nParser, err := parsers.NewParser(&r.ParserConfig) if err != nil { diff --git a/plugins/inputs/reader/reader_test.go b/plugins/inputs/reader/reader_test.go index e073a6f54..a30ec2f92 100644 --- a/plugins/inputs/reader/reader_test.go +++ b/plugins/inputs/reader/reader_test.go @@ -35,6 +35,23 @@ func TestJSONParserCompile(t *testing.T) { assert.Equal(t, 5, len(acc.Metrics[0].Fields)) } +func TestGrokParser(t *testing.T) { + testDir := getPluginDir() + var acc testutil.Accumulator + r := Reader{ + Filepaths: []string{testDir + "/reader/testfiles/grok_a.log"}, + DataFormat: "grok", + Patterns: []string{"%{COMMON_LOG_FORMAT}"}, + } + + r.compileParser() + err := r.Gather(&acc) + log.Printf("err: %v", err) + log.Printf("metric[0]_tags: %v, metric[0]_fields: %v", acc.Metrics[0].Tags, acc.Metrics[0].Fields) + log.Printf("metric[1]_tags: %v, metric[1]_fields: %v", acc.Metrics[1].Tags, acc.Metrics[1].Fields) + t.Error() +} + func getPluginDir() string { _, filename, _, _ := runtime.Caller(1) return strings.Replace(filename, "/reader/reader_test.go", "", 1) diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 58fce1722..e15632b8e 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -8,6 +8,7 @@ import ( "github.com/influxdata/telegraf/plugins/parsers/collectd" "github.com/influxdata/telegraf/plugins/parsers/dropwizard" "github.com/influxdata/telegraf/plugins/parsers/graphite" + "github.com/influxdata/telegraf/plugins/parsers/grok" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/plugins/parsers/nagios" @@ -87,6 +88,12 @@ type Config struct { // an optional map containing tag names as keys and json paths to retrieve the tag values from as values // used if TagsPath is empty or doesn't return any tags DropwizardTagPathsMap map[string]string + + //grok patterns + Patterns []string + NamedPatterns []string + CustomPatterns string + CustomPatternFiles []string } // NewParser returns a Parser interface based on the given config. @@ -120,12 +127,36 @@ func NewParser(config *Config) (Parser, error) { config.DefaultTags, config.Separator, config.Templates) + case "grok": + parser, err = NewGrokParser( + config.MetricName, + config.Patterns, + config.NamedPatterns, + config.CustomPatterns, + config.CustomPatternFiles) default: err = fmt.Errorf("Invalid data format: %s", config.DataFormat) } return parser, err } +func NewGrokParser(metricName string, + patterns []string, + nPatterns []string, + cPatterns string, + cPatternFiles []string) (Parser, error) { + parser := grok.Parser{ + Measurement: metricName, + Patterns: patterns, + NamedPatterns: nPatterns, + CustomPatterns: cPatterns, + CustomPatternFiles: cPatternFiles, + } + + parser.Compile() + return &parser, nil +} + func NewJSONParser( metricName string, tagKeys []string, From ec7f13111f7a12e0b0b7668882379af145b3cefa Mon Sep 17 00:00:00 2001 From: Max U Date: Thu, 21 Jun 2018 16:06:36 -0700 Subject: [PATCH 05/18] add more test files --- plugins/inputs/reader/testfiles/grok_a.log | 2 + plugins/parsers/grok/influx-patterns | 73 +++ plugins/parsers/grok/influx_patterns.go | 78 +++ plugins/parsers/grok/parser.go | 527 +++++++++++++++++++++ plugins/parsers/grok/parser_test.go | 21 + 5 files changed, 701 insertions(+) create mode 100644 plugins/inputs/reader/testfiles/grok_a.log create mode 100644 plugins/parsers/grok/influx-patterns create mode 100644 plugins/parsers/grok/influx_patterns.go create mode 100644 plugins/parsers/grok/parser.go create mode 100644 plugins/parsers/grok/parser_test.go diff --git a/plugins/inputs/reader/testfiles/grok_a.log b/plugins/inputs/reader/testfiles/grok_a.log new file mode 100644 index 000000000..5295fcb75 --- /dev/null +++ b/plugins/inputs/reader/testfiles/grok_a.log @@ -0,0 +1,2 @@ +127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326 +128.0.0.1 user-identifier tony [10/Oct/2000:13:55:36 -0800] "GET /apache_pb.gif HTTP/1.0" 300 45 \ No newline at end of file diff --git a/plugins/parsers/grok/influx-patterns b/plugins/parsers/grok/influx-patterns new file mode 100644 index 000000000..931b61bc8 --- /dev/null +++ b/plugins/parsers/grok/influx-patterns @@ -0,0 +1,73 @@ +# Captures are a slightly modified version of logstash "grok" patterns, with +# the format %{[:][:]} +# By default all named captures are converted into string fields. +# Modifiers can be used to convert captures to other types or tags. +# Timestamp modifiers can be used to convert captures to the timestamp of the +# parsed metric. + +# View logstash grok pattern docs here: +# https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html +# All default logstash patterns are supported, these can be viewed here: +# https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns + +# Available modifiers: +# string (default if nothing is specified) +# int +# float +# duration (ie, 5.23ms gets converted to int nanoseconds) +# tag (converts the field into a tag) +# drop (drops the field completely) +# Timestamp modifiers: +# ts-ansic ("Mon Jan _2 15:04:05 2006") +# ts-unix ("Mon Jan _2 15:04:05 MST 2006") +# ts-ruby ("Mon Jan 02 15:04:05 -0700 2006") +# ts-rfc822 ("02 Jan 06 15:04 MST") +# ts-rfc822z ("02 Jan 06 15:04 -0700") +# ts-rfc850 ("Monday, 02-Jan-06 15:04:05 MST") +# ts-rfc1123 ("Mon, 02 Jan 2006 15:04:05 MST") +# ts-rfc1123z ("Mon, 02 Jan 2006 15:04:05 -0700") +# ts-rfc3339 ("2006-01-02T15:04:05Z07:00") +# ts-rfc3339nano ("2006-01-02T15:04:05.999999999Z07:00") +# ts-httpd ("02/Jan/2006:15:04:05 -0700") +# ts-epoch (seconds since unix epoch) +# ts-epochnano (nanoseconds since unix epoch) +# ts-"CUSTOM" +# CUSTOM time layouts must be within quotes and be the representation of the +# "reference time", which is Mon Jan 2 15:04:05 -0700 MST 2006 +# See https://golang.org/pkg/time/#Parse for more details. + +# Example log file pattern, example log looks like this: +# [04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432µs +# Breakdown of the DURATION pattern below: +# NUMBER is a builtin logstash grok pattern matching float & int numbers. +# [nuµm]? is a regex specifying 0 or 1 of the characters within brackets. +# s is also regex, this pattern must end in "s". +# so DURATION will match something like '5.324ms' or '6.1µs' or '10s' +DURATION %{NUMBER}[nuµm]?s +RESPONSE_CODE %{NUMBER:response_code:tag} +RESPONSE_TIME %{DURATION:response_time_ns:duration} +EXAMPLE_LOG \[%{HTTPDATE:ts:ts-httpd}\] %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME} + +# Wider-ranging username matching vs. logstash built-in %{USER} +NGUSERNAME [a-zA-Z0-9\.\@\-\+_%]+ +NGUSER %{NGUSERNAME} +# Wider-ranging client IP matching +CLIENT (?:%{IPORHOST}|%{HOSTPORT}|::1) + +## +## COMMON LOG PATTERNS +## + +# apache & nginx logs, this is also known as the "common log format" +# see https://en.wikipedia.org/wiki/Common_Log_Format +COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NOTSPACE:ident} %{NOTSPACE:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-) + +# Combined log format is the same as the common log format but with the addition +# of two quoted strings at the end for "referrer" and "agent" +# See Examples at http://httpd.apache.org/docs/current/mod/mod_log_config.html +COMBINED_LOG_FORMAT %{COMMON_LOG_FORMAT} %{QS:referrer} %{QS:agent} + +# HTTPD log formats +HTTPD20_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{LOGLEVEL:loglevel:tag}\] (?:\[client %{IPORHOST:clientip}\] ){0,1}%{GREEDYDATA:errormsg} +HTTPD24_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{WORD:module}:%{LOGLEVEL:loglevel:tag}\] \[pid %{POSINT:pid:int}:tid %{NUMBER:tid:int}\]( \(%{POSINT:proxy_errorcode:int}\)%{DATA:proxy_errormessage}:)?( \[client %{IPORHOST:client}:%{POSINT:clientport}\])? %{DATA:errorcode}: %{GREEDYDATA:message} +HTTPD_ERRORLOG %{HTTPD20_ERRORLOG}|%{HTTPD24_ERRORLOG} diff --git a/plugins/parsers/grok/influx_patterns.go b/plugins/parsers/grok/influx_patterns.go new file mode 100644 index 000000000..6dc990622 --- /dev/null +++ b/plugins/parsers/grok/influx_patterns.go @@ -0,0 +1,78 @@ +package grok + +// DEFAULT_PATTERNS SHOULD BE KEPT IN-SYNC WITH patterns/influx-patterns +const DEFAULT_PATTERNS = ` +# Captures are a slightly modified version of logstash "grok" patterns, with +# the format %{[:][:]} +# By default all named captures are converted into string fields. +# Modifiers can be used to convert captures to other types or tags. +# Timestamp modifiers can be used to convert captures to the timestamp of the +# parsed metric. + +# View logstash grok pattern docs here: +# https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html +# All default logstash patterns are supported, these can be viewed here: +# https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns + +# Available modifiers: +# string (default if nothing is specified) +# int +# float +# duration (ie, 5.23ms gets converted to int nanoseconds) +# tag (converts the field into a tag) +# drop (drops the field completely) +# Timestamp modifiers: +# ts-ansic ("Mon Jan _2 15:04:05 2006") +# ts-unix ("Mon Jan _2 15:04:05 MST 2006") +# ts-ruby ("Mon Jan 02 15:04:05 -0700 2006") +# ts-rfc822 ("02 Jan 06 15:04 MST") +# ts-rfc822z ("02 Jan 06 15:04 -0700") +# ts-rfc850 ("Monday, 02-Jan-06 15:04:05 MST") +# ts-rfc1123 ("Mon, 02 Jan 2006 15:04:05 MST") +# ts-rfc1123z ("Mon, 02 Jan 2006 15:04:05 -0700") +# ts-rfc3339 ("2006-01-02T15:04:05Z07:00") +# ts-rfc3339nano ("2006-01-02T15:04:05.999999999Z07:00") +# ts-httpd ("02/Jan/2006:15:04:05 -0700") +# ts-epoch (seconds since unix epoch) +# ts-epochnano (nanoseconds since unix epoch) +# ts-"CUSTOM" +# CUSTOM time layouts must be within quotes and be the representation of the +# "reference time", which is Mon Jan 2 15:04:05 -0700 MST 2006 +# See https://golang.org/pkg/time/#Parse for more details. + +# Example log file pattern, example log looks like this: +# [04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432µs +# Breakdown of the DURATION pattern below: +# NUMBER is a builtin logstash grok pattern matching float & int numbers. +# [nuµm]? is a regex specifying 0 or 1 of the characters within brackets. +# s is also regex, this pattern must end in "s". +# so DURATION will match something like '5.324ms' or '6.1µs' or '10s' +DURATION %{NUMBER}[nuµm]?s +RESPONSE_CODE %{NUMBER:response_code:tag} +RESPONSE_TIME %{DURATION:response_time_ns:duration} +EXAMPLE_LOG \[%{HTTPDATE:ts:ts-httpd}\] %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME} + +# Wider-ranging username matching vs. logstash built-in %{USER} +NGUSERNAME [a-zA-Z0-9\.\@\-\+_%]+ +NGUSER %{NGUSERNAME} +# Wider-ranging client IP matching +CLIENT (?:%{IPV6}|%{IPV4}|%{HOSTNAME}|%{HOSTPORT}) + +## +## COMMON LOG PATTERNS +## + +# apache & nginx logs, this is also known as the "common log format" +# see https://en.wikipedia.org/wiki/Common_Log_Format +COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NOTSPACE:ident} %{NOTSPACE:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-) + +# Combined log format is the same as the common log format but with the addition +# of two quoted strings at the end for "referrer" and "agent" +# See Examples at http://httpd.apache.org/docs/current/mod/mod_log_config.html +COMBINED_LOG_FORMAT %{COMMON_LOG_FORMAT} %{QS:referrer} %{QS:agent} + +# HTTPD log formats +HTTPD20_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{LOGLEVEL:loglevel:tag}\] (?:\[client %{IPORHOST:clientip}\] ){0,1}%{GREEDYDATA:errormsg} +HTTPD24_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{WORD:module}:%{LOGLEVEL:loglevel:tag}\] \[pid %{POSINT:pid:int}:tid %{NUMBER:tid:int}\]( \(%{POSINT:proxy_errorcode:int}\)%{DATA:proxy_errormessage}:)?( \[client %{IPORHOST:client}:%{POSINT:clientport}\])? %{DATA:errorcode}: %{GREEDYDATA:message} +HTTPD_ERRORLOG %{HTTPD20_ERRORLOG}|%{HTTPD24_ERRORLOG} +` diff --git a/plugins/parsers/grok/parser.go b/plugins/parsers/grok/parser.go new file mode 100644 index 000000000..e3d2acf3b --- /dev/null +++ b/plugins/parsers/grok/parser.go @@ -0,0 +1,527 @@ +package grok + +import ( + "bufio" + "fmt" + "log" + "os" + "regexp" + "strconv" + "strings" + "time" + + "github.com/vjeantet/grok" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" +) + +var timeLayouts = map[string]string{ + "ts-ansic": "Mon Jan _2 15:04:05 2006", + "ts-unix": "Mon Jan _2 15:04:05 MST 2006", + "ts-ruby": "Mon Jan 02 15:04:05 -0700 2006", + "ts-rfc822": "02 Jan 06 15:04 MST", + "ts-rfc822z": "02 Jan 06 15:04 -0700", // RFC822 with numeric zone + "ts-rfc850": "Monday, 02-Jan-06 15:04:05 MST", + "ts-rfc1123": "Mon, 02 Jan 2006 15:04:05 MST", + "ts-rfc1123z": "Mon, 02 Jan 2006 15:04:05 -0700", // RFC1123 with numeric zone + "ts-rfc3339": "2006-01-02T15:04:05Z07:00", + "ts-rfc3339nano": "2006-01-02T15:04:05.999999999Z07:00", + "ts-httpd": "02/Jan/2006:15:04:05 -0700", + // These three are not exactly "layouts", but they are special cases that + // will get handled in the ParseLine function. + "ts-epoch": "EPOCH", + "ts-epochnano": "EPOCH_NANO", + "ts-syslog": "SYSLOG_TIMESTAMP", + "ts": "GENERIC_TIMESTAMP", // try parsing all known timestamp layouts. +} + +const ( + INT = "int" + TAG = "tag" + FLOAT = "float" + STRING = "string" + DURATION = "duration" + DROP = "drop" + EPOCH = "EPOCH" + EPOCH_NANO = "EPOCH_NANO" + SYSLOG_TIMESTAMP = "SYSLOG_TIMESTAMP" + GENERIC_TIMESTAMP = "GENERIC_TIMESTAMP" +) + +var ( + // matches named captures that contain a modifier. + // ie, + // %{NUMBER:bytes:int} + // %{IPORHOST:clientip:tag} + // %{HTTPDATE:ts1:ts-http} + // %{HTTPDATE:ts2:ts-"02 Jan 06 15:04"} + modifierRe = regexp.MustCompile(`%{\w+:(\w+):(ts-".+"|t?s?-?\w+)}`) + // matches a plain pattern name. ie, %{NUMBER} + patternOnlyRe = regexp.MustCompile(`%{(\w+)}`) +) + +// Parser is the primary struct to handle and grok-patterns defined in the config toml +type Parser struct { + Patterns []string + // namedPatterns is a list of internally-assigned names to the patterns + // specified by the user in Patterns. + // They will look like: + // GROK_INTERNAL_PATTERN_0, GROK_INTERNAL_PATTERN_1, etc. + NamedPatterns []string + CustomPatterns string + CustomPatternFiles []string + Measurement string + + // Timezone is an optional component to help render log dates to + // your chosen zone. + // Default: "" which renders UTC + // Options are as follows: + // 1. Local -- interpret based on machine localtime + // 2. "America/Chicago" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones + // 3. UTC -- or blank/unspecified, will return timestamp in UTC + Timezone string + loc *time.Location + + // typeMap is a map of patterns -> capture name -> modifier, + // ie, { + // "%{TESTLOG}": + // { + // "bytes": "int", + // "clientip": "tag" + // } + // } + typeMap map[string]map[string]string + // tsMap is a map of patterns -> capture name -> timestamp layout. + // ie, { + // "%{TESTLOG}": + // { + // "httptime": "02/Jan/2006:15:04:05 -0700" + // } + // } + tsMap map[string]map[string]string + // patterns is a map of all of the parsed patterns from CustomPatterns + // and CustomPatternFiles. + // ie, { + // "DURATION": "%{NUMBER}[nuµm]?s" + // "RESPONSE_CODE": "%{NUMBER:rc:tag}" + // } + patterns map[string]string + // foundTsLayouts is a slice of timestamp patterns that have been found + // in the log lines. This slice gets updated if the user uses the generic + // 'ts' modifier for timestamps. This slice is checked first for matches, + // so that previously-matched layouts get priority over all other timestamp + // layouts. + foundTsLayouts []string + + timeFunc func() time.Time + g *grok.Grok + tsModder *tsModder +} + +// Compile is a bound method to Parser which will process the options for our parser +func (p *Parser) Compile() error { + p.typeMap = make(map[string]map[string]string) + p.tsMap = make(map[string]map[string]string) + p.patterns = make(map[string]string) + p.tsModder = &tsModder{} + var err error + p.g, err = grok.NewWithConfig(&grok.Config{NamedCapturesOnly: true}) + if err != nil { + return err + } + + // Give Patterns fake names so that they can be treated as named + // "custom patterns" + p.NamedPatterns = make([]string, 0, len(p.Patterns)) + for i, pattern := range p.Patterns { + pattern = strings.TrimSpace(pattern) + if pattern == "" { + continue + } + name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i) + p.CustomPatterns += "\n" + name + " " + pattern + "\n" + p.NamedPatterns = append(p.NamedPatterns, "%{"+name+"}") + } + + if len(p.NamedPatterns) == 0 { + return fmt.Errorf("pattern required") + } + + // Combine user-supplied CustomPatterns with DEFAULT_PATTERNS and parse + // them together as the same type of pattern. + p.CustomPatterns = DEFAULT_PATTERNS + p.CustomPatterns + if len(p.CustomPatterns) != 0 { + scanner := bufio.NewScanner(strings.NewReader(p.CustomPatterns)) + p.addCustomPatterns(scanner) + } + + // Parse any custom pattern files supplied. + for _, filename := range p.CustomPatternFiles { + file, fileErr := os.Open(filename) + if fileErr != nil { + return fileErr + } + + scanner := bufio.NewScanner(bufio.NewReader(file)) + p.addCustomPatterns(scanner) + } + + if p.Measurement == "" { + p.Measurement = "logparser_grok" + } + + p.loc, err = time.LoadLocation(p.Timezone) + if err != nil { + log.Printf("W! improper timezone supplied (%s), setting loc to UTC", p.Timezone) + p.loc, _ = time.LoadLocation("UTC") + } + + if p.timeFunc == nil { + p.timeFunc = time.Now + } + + return p.compileCustomPatterns() +} + +// ParseLine is the primary function to process individual lines, returning the metrics +func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { + var err error + // values are the parsed fields from the log line + var values map[string]string + // the matching pattern string + var patternName string + for _, pattern := range p.NamedPatterns { + if values, err = p.g.Parse(pattern, line); err != nil { + return nil, err + } + if len(values) != 0 { + patternName = pattern + break + } + } + + if len(values) == 0 { + log.Printf("D! Grok no match found for: %q", line) + return nil, nil + } + + fields := make(map[string]interface{}) + tags := make(map[string]string) + timestamp := time.Now() + for k, v := range values { + if k == "" || v == "" { + continue + } + + // t is the modifier of the field + var t string + // check if pattern has some modifiers + if types, ok := p.typeMap[patternName]; ok { + t = types[k] + } + // if we didn't find a modifier, check if we have a timestamp layout + if t == "" { + if ts, ok := p.tsMap[patternName]; ok { + // check if the modifier is a timestamp layout + if layout, ok := ts[k]; ok { + t = layout + } + } + } + // if we didn't find a type OR timestamp modifier, assume string + if t == "" { + t = STRING + } + + switch t { + case INT: + iv, err := strconv.ParseInt(v, 10, 64) + if err != nil { + log.Printf("E! Error parsing %s to int: %s", v, err) + } else { + fields[k] = iv + } + case FLOAT: + fv, err := strconv.ParseFloat(v, 64) + if err != nil { + log.Printf("E! Error parsing %s to float: %s", v, err) + } else { + fields[k] = fv + } + case DURATION: + d, err := time.ParseDuration(v) + if err != nil { + log.Printf("E! Error parsing %s to duration: %s", v, err) + } else { + fields[k] = int64(d) + } + case TAG: + tags[k] = v + case STRING: + fields[k] = strings.Trim(v, `"`) + case EPOCH: + parts := strings.SplitN(v, ".", 2) + if len(parts) == 0 { + log.Printf("E! Error parsing %s to timestamp: %s", v, err) + break + } + + sec, err := strconv.ParseInt(parts[0], 10, 64) + if err != nil { + log.Printf("E! Error parsing %s to timestamp: %s", v, err) + break + } + ts := time.Unix(sec, 0) + + if len(parts) == 2 { + padded := fmt.Sprintf("%-9s", parts[1]) + nsString := strings.Replace(padded[:9], " ", "0", -1) + nanosec, err := strconv.ParseInt(nsString, 10, 64) + if err != nil { + log.Printf("E! Error parsing %s to timestamp: %s", v, err) + break + } + ts = ts.Add(time.Duration(nanosec) * time.Nanosecond) + } + timestamp = ts + case EPOCH_NANO: + iv, err := strconv.ParseInt(v, 10, 64) + if err != nil { + log.Printf("E! Error parsing %s to int: %s", v, err) + } else { + timestamp = time.Unix(0, iv) + } + case SYSLOG_TIMESTAMP: + ts, err := time.ParseInLocation("Jan 02 15:04:05", v, p.loc) + if err == nil { + if ts.Year() == 0 { + ts = ts.AddDate(timestamp.Year(), 0, 0) + } + timestamp = ts + } else { + log.Printf("E! Error parsing %s to time layout [%s]: %s", v, t, err) + } + case GENERIC_TIMESTAMP: + var foundTs bool + // first try timestamp layouts that we've already found + for _, layout := range p.foundTsLayouts { + ts, err := time.ParseInLocation(layout, v, p.loc) + if err == nil { + timestamp = ts + foundTs = true + break + } + } + // if we haven't found a timestamp layout yet, try all timestamp + // layouts. + if !foundTs { + for _, layout := range timeLayouts { + ts, err := time.ParseInLocation(layout, v, p.loc) + if err == nil { + timestamp = ts + foundTs = true + p.foundTsLayouts = append(p.foundTsLayouts, layout) + break + } + } + } + // if we still haven't found a timestamp layout, log it and we will + // just use time.Now() + if !foundTs { + log.Printf("E! Error parsing timestamp [%s], could not find any "+ + "suitable time layouts.", v) + } + case DROP: + // goodbye! + default: + ts, err := time.ParseInLocation(t, v, p.loc) + if err == nil { + timestamp = ts + } else { + log.Printf("E! Error parsing %s to time layout [%s]: %s", v, t, err) + } + } + } + + if len(fields) == 0 { + return nil, fmt.Errorf("logparser_grok: must have one or more fields") + } + + return metric.New(p.Measurement, tags, fields, p.tsModder.tsMod(timestamp)) +} + +func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { + lines := strings.Split(string(buf), "\n") + var metrics []telegraf.Metric + + for _, line := range lines { + m, err := p.ParseLine(line) + if err != nil { + return nil, err + } + metrics = append(metrics, m) + } + + return metrics, nil +} + +func (p *Parser) SetDefaultTags(tags map[string]string) { + //needs implementation +} + +func (p *Parser) addCustomPatterns(scanner *bufio.Scanner) { + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if len(line) > 0 && line[0] != '#' { + names := strings.SplitN(line, " ", 2) + p.patterns[names[0]] = names[1] + } + } +} + +func (p *Parser) compileCustomPatterns() error { + var err error + // check if the pattern contains a subpattern that is already defined + // replace it with the subpattern for modifier inheritance. + for i := 0; i < 2; i++ { + for name, pattern := range p.patterns { + subNames := patternOnlyRe.FindAllStringSubmatch(pattern, -1) + for _, subName := range subNames { + if subPattern, ok := p.patterns[subName[1]]; ok { + pattern = strings.Replace(pattern, subName[0], subPattern, 1) + } + } + p.patterns[name] = pattern + } + } + + // check if pattern contains modifiers. Parse them out if it does. + for name, pattern := range p.patterns { + if modifierRe.MatchString(pattern) { + // this pattern has modifiers, so parse out the modifiers + pattern, err = p.parseTypedCaptures(name, pattern) + if err != nil { + return err + } + p.patterns[name] = pattern + } + } + + return p.g.AddPatternsFromMap(p.patterns) +} + +// parseTypedCaptures parses the capture modifiers, and then deletes the +// modifier from the line so that it is a valid "grok" pattern again. +// ie, +// %{NUMBER:bytes:int} => %{NUMBER:bytes} (stores %{NUMBER}->bytes->int) +// %{IPORHOST:clientip:tag} => %{IPORHOST:clientip} (stores %{IPORHOST}->clientip->tag) +func (p *Parser) parseTypedCaptures(name, pattern string) (string, error) { + matches := modifierRe.FindAllStringSubmatch(pattern, -1) + + // grab the name of the capture pattern + patternName := "%{" + name + "}" + // create type map for this pattern + p.typeMap[patternName] = make(map[string]string) + p.tsMap[patternName] = make(map[string]string) + + // boolean to verify that each pattern only has a single ts- data type. + hasTimestamp := false + for _, match := range matches { + // regex capture 1 is the name of the capture + // regex capture 2 is the modifier of the capture + if strings.HasPrefix(match[2], "ts") { + if hasTimestamp { + return pattern, fmt.Errorf("logparser pattern compile error: "+ + "Each pattern is allowed only one named "+ + "timestamp data type. pattern: %s", pattern) + } + if layout, ok := timeLayouts[match[2]]; ok { + // built-in time format + p.tsMap[patternName][match[1]] = layout + } else { + // custom time format + p.tsMap[patternName][match[1]] = strings.TrimSuffix(strings.TrimPrefix(match[2], `ts-"`), `"`) + } + hasTimestamp = true + } else { + p.typeMap[patternName][match[1]] = match[2] + } + + // the modifier is not a valid part of a "grok" pattern, so remove it + // from the pattern. + pattern = strings.Replace(pattern, ":"+match[2]+"}", "}", 1) + } + + return pattern, nil +} + +// tsModder is a struct for incrementing identical timestamps of log lines +// so that we don't push identical metrics that will get overwritten. +type tsModder struct { + dupe time.Time + last time.Time + incr time.Duration + incrn time.Duration + rollover time.Duration +} + +// tsMod increments the given timestamp one unit more from the previous +// duplicate timestamp. +// the increment unit is determined as the next smallest time unit below the +// most significant time unit of ts. +// ie, if the input is at ms precision, it will increment it 1µs. +func (t *tsModder) tsMod(ts time.Time) time.Time { + defer func() { t.last = ts }() + // don't mod the time if we don't need to + if t.last.IsZero() || ts.IsZero() { + t.incrn = 0 + t.rollover = 0 + return ts + } + if !ts.Equal(t.last) && !ts.Equal(t.dupe) { + t.incr = 0 + t.incrn = 0 + t.rollover = 0 + return ts + } + + if ts.Equal(t.last) { + t.dupe = ts + } + + if ts.Equal(t.dupe) && t.incr == time.Duration(0) { + tsNano := ts.UnixNano() + + d := int64(10) + counter := 1 + for { + a := tsNano % d + if a > 0 { + break + } + d = d * 10 + counter++ + } + + switch { + case counter <= 6: + t.incr = time.Nanosecond + case counter <= 9: + t.incr = time.Microsecond + case counter > 9: + t.incr = time.Millisecond + } + } + + t.incrn++ + if t.incrn == 999 && t.incr > time.Nanosecond { + t.rollover = t.incr * t.incrn + t.incrn = 1 + t.incr = t.incr / 1000 + if t.incr < time.Nanosecond { + t.incr = time.Nanosecond + } + } + return ts.Add(t.incr*t.incrn + t.rollover) +} diff --git a/plugins/parsers/grok/parser_test.go b/plugins/parsers/grok/parser_test.go new file mode 100644 index 000000000..5dc01911c --- /dev/null +++ b/plugins/parsers/grok/parser_test.go @@ -0,0 +1,21 @@ +package grok + +import ( + "log" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGrokParse(t *testing.T) { + parser := Parser{ + Measurement: "t_met", + Patterns: []string{"%{COMMON_LOG_FORMAT}"}, + } + //var acc testutil.Accumulator + parser.Compile() + metrics, err := parser.Parse([]byte(`127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326`)) + log.Printf("metric_tags: %v, metric_fields: %v", metrics[0].Tags(), metrics[0].Fields()) + assert.NoError(t, err) + //t.Error() +} From 504d9784465a6250b232e82211e51efe1fd854aa Mon Sep 17 00:00:00 2001 From: Max U Date: Thu, 21 Jun 2018 16:12:26 -0700 Subject: [PATCH 06/18] clean up some test cases --- plugins/inputs/reader/reader_test.go | 2 +- plugins/parsers/grok/parser_test.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/plugins/inputs/reader/reader_test.go b/plugins/inputs/reader/reader_test.go index a30ec2f92..339eba16d 100644 --- a/plugins/inputs/reader/reader_test.go +++ b/plugins/inputs/reader/reader_test.go @@ -49,7 +49,7 @@ func TestGrokParser(t *testing.T) { log.Printf("err: %v", err) log.Printf("metric[0]_tags: %v, metric[0]_fields: %v", acc.Metrics[0].Tags, acc.Metrics[0].Fields) log.Printf("metric[1]_tags: %v, metric[1]_fields: %v", acc.Metrics[1].Tags, acc.Metrics[1].Fields) - t.Error() + assert.Equal(t, 2, len(acc.Metrics)) } func getPluginDir() string { diff --git a/plugins/parsers/grok/parser_test.go b/plugins/parsers/grok/parser_test.go index 5dc01911c..b87c112d2 100644 --- a/plugins/parsers/grok/parser_test.go +++ b/plugins/parsers/grok/parser_test.go @@ -17,5 +17,4 @@ func TestGrokParse(t *testing.T) { metrics, err := parser.Parse([]byte(`127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326`)) log.Printf("metric_tags: %v, metric_fields: %v", metrics[0].Tags(), metrics[0].Fields()) assert.NoError(t, err) - //t.Error() } From 542c030dc834abebc83f36637cbaf04c4a63c481 Mon Sep 17 00:00:00 2001 From: Max U Date: Thu, 21 Jun 2018 16:23:06 -0700 Subject: [PATCH 07/18] knock more errors from test files --- plugins/inputs/reader/reader_test.go | 2 +- plugins/parsers/grok/parser_test.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/plugins/inputs/reader/reader_test.go b/plugins/inputs/reader/reader_test.go index 339eba16d..cb6fb44a8 100644 --- a/plugins/inputs/reader/reader_test.go +++ b/plugins/inputs/reader/reader_test.go @@ -24,7 +24,7 @@ func TestJSONParserCompile(t *testing.T) { testDir := getPluginDir() var acc testutil.Accumulator r := Reader{ - Filepaths: []string{testDir + "/reader/testfiles/**.log"}, + Filepaths: []string{testDir + "/reader/testfiles/json_a.log"}, DataFormat: "json", Tags: []string{"parent_ignored_child"}, } diff --git a/plugins/parsers/grok/parser_test.go b/plugins/parsers/grok/parser_test.go index b87c112d2..77818199a 100644 --- a/plugins/parsers/grok/parser_test.go +++ b/plugins/parsers/grok/parser_test.go @@ -12,7 +12,6 @@ func TestGrokParse(t *testing.T) { Measurement: "t_met", Patterns: []string{"%{COMMON_LOG_FORMAT}"}, } - //var acc testutil.Accumulator parser.Compile() metrics, err := parser.Parse([]byte(`127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326`)) log.Printf("metric_tags: %v, metric_fields: %v", metrics[0].Tags(), metrics[0].Fields()) From 554b960339faf67ce633bdde46ddc36980ee8b38 Mon Sep 17 00:00:00 2001 From: Max U Date: Mon, 25 Jun 2018 09:53:35 -0700 Subject: [PATCH 08/18] add setparser to reader --- plugins/inputs/reader/reader.go | 41 ++++++++++++++++++++++++++++++++- plugins/parsers/registry.go | 7 ++++-- 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/plugins/inputs/reader/reader.go b/plugins/inputs/reader/reader.go index 36c2a742e..504a40b3a 100644 --- a/plugins/inputs/reader/reader.go +++ b/plugins/inputs/reader/reader.go @@ -24,6 +24,7 @@ type Reader struct { namedPatterns []string CustomPatterns string CustomPatternFiles []string + TZone string } const sampleConfig = `## Files to parse. @@ -38,7 +39,40 @@ files = ["/var/log/apache/access.log"] ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md -data_format = ""` +data_format = "" + +## Parse logstash-style "grok" patterns: +## Telegraf built-in parsing patterns: https://goo.gl/dkay10 +[inputs.logparser.grok] + ## This is a list of patterns to check the given log file(s) for. + ## Note that adding patterns here increases processing time. The most + ## efficient configuration is to have one pattern per logparser. + ## Other common built-in patterns are: + ## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs) + ## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent) + patterns = ["%{COMBINED_LOG_FORMAT}"] + + ## Name of the outputted measurement name. + measurement = "apache_access_log" + + ## Full path(s) to custom pattern files. + custom_pattern_files = [] + + ## Custom patterns can also be defined here. Put one pattern per line. + custom_patterns = ''' + ''' + + ## Timezone allows you to provide an override for timestamps that + ## don't already include an offset + ## e.g. 04/06/2016 12:41:45 data one two 5.43µs + ## + ## Default: "" which renders UTC + ## Options are as follows: + ## 1. Local -- interpret based on machine localtime + ## 2. "Canada/Eastern" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones + ## 3. UTC -- or blank/unspecified, will return timestamp in UTC + timezone = "Canada/Eastern" +` // SampleConfig returns the default configuration of the Input func (r *Reader) SampleConfig() string { @@ -64,6 +98,10 @@ func (r *Reader) Gather(acc telegraf.Accumulator) error { return nil } +func (r *Reader) SetParser(p parsers.Parser) { + r.Parser = p +} + func (r *Reader) compileParser() { if r.DataFormat == "" { log.Printf("E! No data_format specified") @@ -78,6 +116,7 @@ func (r *Reader) compileParser() { NamedPatterns: r.namedPatterns, CustomPatterns: r.CustomPatterns, CustomPatternFiles: r.CustomPatternFiles, + TimeZone: r.TZone, } nParser, err := parsers.NewParser(&r.ParserConfig) if err != nil { diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index e15632b8e..b657cf83f 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -94,6 +94,7 @@ type Config struct { NamedPatterns []string CustomPatterns string CustomPatternFiles []string + TimeZone string } // NewParser returns a Parser interface based on the given config. @@ -133,7 +134,8 @@ func NewParser(config *Config) (Parser, error) { config.Patterns, config.NamedPatterns, config.CustomPatterns, - config.CustomPatternFiles) + config.CustomPatternFiles, + config.TimeZone) default: err = fmt.Errorf("Invalid data format: %s", config.DataFormat) } @@ -144,13 +146,14 @@ func NewGrokParser(metricName string, patterns []string, nPatterns []string, cPatterns string, - cPatternFiles []string) (Parser, error) { + cPatternFiles []string, tZone string) (Parser, error) { parser := grok.Parser{ Measurement: metricName, Patterns: patterns, NamedPatterns: nPatterns, CustomPatterns: cPatterns, CustomPatternFiles: cPatternFiles, + Timezone: tZone, } parser.Compile() From f40371e3615dd40b210b08f6fd90ff31968f5729 Mon Sep 17 00:00:00 2001 From: Max U Date: Mon, 25 Jun 2018 10:15:32 -0700 Subject: [PATCH 09/18] add init function to reader --- plugins/inputs/reader/reader.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/plugins/inputs/reader/reader.go b/plugins/inputs/reader/reader.go index 504a40b3a..853405745 100644 --- a/plugins/inputs/reader/reader.go +++ b/plugins/inputs/reader/reader.go @@ -6,6 +6,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/globpath" + "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" ) @@ -154,3 +155,9 @@ func (r *Reader) readMetric(filename string) ([]telegraf.Metric, error) { return r.Parser.Parse(fileContents) } + +func init() { + inputs.Add("reader", func() telegraf.Input { + return &Reader{} + }) +} From 9c845950a7973d1e92e12dba8a3ae84fe8f55927 Mon Sep 17 00:00:00 2001 From: Max U Date: Mon, 25 Jun 2018 15:32:27 -0700 Subject: [PATCH 10/18] add grok as a top level parser, still need README --- internal/config/config.go | 58 ++++++++++++++++++++++ plugins/inputs/all/all.go | 2 +- plugins/inputs/reader/reader.go | 73 ++-------------------------- plugins/inputs/reader/reader_test.go | 26 ++++++++-- 4 files changed, 84 insertions(+), 75 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index 8a31c271e..1a98c61c5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1338,6 +1338,59 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { } } + //for grok data_format + if node, ok := tbl.Fields["named_patterns"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + c.NamedPatterns = append(c.NamedPatterns, str.Value) + } + } + } + } + } + + if node, ok := tbl.Fields["patterns"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + c.Patterns = append(c.Patterns, str.Value) + } + } + } + } + } + + if node, ok := tbl.Fields["custom_patterns"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.CustomPatterns = str.Value + } + } + } + + if node, ok := tbl.Fields["custom_pattern_files"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + c.CustomPatternFiles = append(c.CustomPatternFiles, str.Value) + } + } + } + } + } + + if node, ok := tbl.Fields["timezone"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.TimeZone = str.Value + } + } + } + c.MetricName = name delete(tbl.Fields, "data_format") @@ -1353,6 +1406,11 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { delete(tbl.Fields, "dropwizard_time_format") delete(tbl.Fields, "dropwizard_tags_path") delete(tbl.Fields, "dropwizard_tag_paths") + delete(tbl.Fields, "named_patterns") + delete(tbl.Fields, "patterns") + delete(tbl.Fields, "custom_patterns") + delete(tbl.Fields, "custom_pattern_files") + delete(tbl.Fields, "timezone") return parsers.NewParser(c) } diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index b2be2be5a..de34847d6 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -85,7 +85,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/puppetagent" _ "github.com/influxdata/telegraf/plugins/inputs/rabbitmq" _ "github.com/influxdata/telegraf/plugins/inputs/raindrops" - _ "github.com/influxdata/telegraf/plugins/inputs/redis" + _ "github.com/influxdata/telegraf/plugins/inputs/reader" _ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb" _ "github.com/influxdata/telegraf/plugins/inputs/riak" _ "github.com/influxdata/telegraf/plugins/inputs/salesforce" diff --git a/plugins/inputs/reader/reader.go b/plugins/inputs/reader/reader.go index 853405745..bfccb87d3 100644 --- a/plugins/inputs/reader/reader.go +++ b/plugins/inputs/reader/reader.go @@ -13,19 +13,9 @@ import ( type Reader struct { Filepaths []string `toml:"files"` FromBeginning bool - DataFormat string `toml:"data_format"` - ParserConfig parsers.Config - Parser parsers.Parser - Tags []string + parser parsers.Parser Filenames []string - - //for grok parser - Patterns []string - namedPatterns []string - CustomPatterns string - CustomPatternFiles []string - TZone string } const sampleConfig = `## Files to parse. @@ -41,38 +31,6 @@ files = ["/var/log/apache/access.log"] ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "" - -## Parse logstash-style "grok" patterns: -## Telegraf built-in parsing patterns: https://goo.gl/dkay10 -[inputs.logparser.grok] - ## This is a list of patterns to check the given log file(s) for. - ## Note that adding patterns here increases processing time. The most - ## efficient configuration is to have one pattern per logparser. - ## Other common built-in patterns are: - ## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs) - ## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent) - patterns = ["%{COMBINED_LOG_FORMAT}"] - - ## Name of the outputted measurement name. - measurement = "apache_access_log" - - ## Full path(s) to custom pattern files. - custom_pattern_files = [] - - ## Custom patterns can also be defined here. Put one pattern per line. - custom_patterns = ''' - ''' - - ## Timezone allows you to provide an override for timestamps that - ## don't already include an offset - ## e.g. 04/06/2016 12:41:45 data one two 5.43µs - ## - ## Default: "" which renders UTC - ## Options are as follows: - ## 1. Local -- interpret based on machine localtime - ## 2. "Canada/Eastern" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones - ## 3. UTC -- or blank/unspecified, will return timestamp in UTC - timezone = "Canada/Eastern" ` // SampleConfig returns the default configuration of the Input @@ -100,31 +58,7 @@ func (r *Reader) Gather(acc telegraf.Accumulator) error { } func (r *Reader) SetParser(p parsers.Parser) { - r.Parser = p -} - -func (r *Reader) compileParser() { - if r.DataFormat == "" { - log.Printf("E! No data_format specified") - return - } - r.ParserConfig = parsers.Config{ - DataFormat: r.DataFormat, - TagKeys: r.Tags, - - //grok settings - Patterns: r.Patterns, - NamedPatterns: r.namedPatterns, - CustomPatterns: r.CustomPatterns, - CustomPatternFiles: r.CustomPatternFiles, - TimeZone: r.TZone, - } - nParser, err := parsers.NewParser(&r.ParserConfig) - if err != nil { - log.Printf("E! Error building parser: %v", err) - } - - r.Parser = nParser + r.parser = p } func (r *Reader) refreshFilePaths() { @@ -151,8 +85,7 @@ func (r *Reader) readMetric(filename string) ([]telegraf.Metric, error) { if err != nil { log.Printf("E! File could not be opened: %v", filename) } - - return r.Parser.Parse(fileContents) + return r.parser.Parse(fileContents) } diff --git a/plugins/inputs/reader/reader_test.go b/plugins/inputs/reader/reader_test.go index cb6fb44a8..dcfa9ffc3 100644 --- a/plugins/inputs/reader/reader_test.go +++ b/plugins/inputs/reader/reader_test.go @@ -6,6 +6,7 @@ import ( "strings" "testing" + "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" ) @@ -28,7 +29,14 @@ func TestJSONParserCompile(t *testing.T) { DataFormat: "json", Tags: []string{"parent_ignored_child"}, } - r.compileParser() + parserConfig := parsers.Config{ + DataFormat: r.DataFormat, + TagKeys: r.Tags, + } + nParser, err := parsers.NewParser(&parserConfig) + r.parser = nParser + assert.NoError(t, err) + r.Gather(&acc) log.Printf("acc: %v", acc.Metrics[0].Tags) assert.Equal(t, map[string]string{"parent_ignored_child": "hi"}, acc.Metrics[0].Tags) @@ -41,15 +49,25 @@ func TestGrokParser(t *testing.T) { r := Reader{ Filepaths: []string{testDir + "/reader/testfiles/grok_a.log"}, DataFormat: "grok", - Patterns: []string{"%{COMMON_LOG_FORMAT}"}, } - r.compileParser() - err := r.Gather(&acc) + parserConfig := parsers.Config{ + DataFormat: r.DataFormat, + TagKeys: r.Tags, + Patterns: []string{"{%COMMON-LOG-FORMAT}"}, + } + + nParser, err := parsers.NewParser(&parserConfig) + r.parser = nParser + assert.NoError(t, err) + + log.Printf("path: %v", r.Filepaths[0]) + err = r.Gather(&acc) log.Printf("err: %v", err) log.Printf("metric[0]_tags: %v, metric[0]_fields: %v", acc.Metrics[0].Tags, acc.Metrics[0].Fields) log.Printf("metric[1]_tags: %v, metric[1]_fields: %v", acc.Metrics[1].Tags, acc.Metrics[1].Fields) assert.Equal(t, 2, len(acc.Metrics)) + t.Error() } func getPluginDir() string { From cc406299bac7735056961feb28196caf0ce89165 Mon Sep 17 00:00:00 2001 From: Max U Date: Mon, 25 Jun 2018 15:52:43 -0700 Subject: [PATCH 11/18] allow for import from plugins/all --- plugins/inputs/all/all.go | 1 + plugins/inputs/reader/reader_test.go | 17 ++++++----------- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index de34847d6..fc38b70f5 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -86,6 +86,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/rabbitmq" _ "github.com/influxdata/telegraf/plugins/inputs/raindrops" _ "github.com/influxdata/telegraf/plugins/inputs/reader" + _ "github.com/influxdata/telegraf/plugins/inputs/redis" _ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb" _ "github.com/influxdata/telegraf/plugins/inputs/riak" _ "github.com/influxdata/telegraf/plugins/inputs/salesforce" diff --git a/plugins/inputs/reader/reader_test.go b/plugins/inputs/reader/reader_test.go index dcfa9ffc3..ca076350b 100644 --- a/plugins/inputs/reader/reader_test.go +++ b/plugins/inputs/reader/reader_test.go @@ -25,13 +25,11 @@ func TestJSONParserCompile(t *testing.T) { testDir := getPluginDir() var acc testutil.Accumulator r := Reader{ - Filepaths: []string{testDir + "/reader/testfiles/json_a.log"}, - DataFormat: "json", - Tags: []string{"parent_ignored_child"}, + Filepaths: []string{testDir + "/reader/testfiles/json_a.log"}, } parserConfig := parsers.Config{ - DataFormat: r.DataFormat, - TagKeys: r.Tags, + DataFormat: "json", + TagKeys: []string{"parent_ignored_child"}, } nParser, err := parsers.NewParser(&parserConfig) r.parser = nParser @@ -47,14 +45,12 @@ func TestGrokParser(t *testing.T) { testDir := getPluginDir() var acc testutil.Accumulator r := Reader{ - Filepaths: []string{testDir + "/reader/testfiles/grok_a.log"}, - DataFormat: "grok", + Filepaths: []string{testDir + "/reader/testfiles/grok_a.log"}, } parserConfig := parsers.Config{ - DataFormat: r.DataFormat, - TagKeys: r.Tags, - Patterns: []string{"{%COMMON-LOG-FORMAT}"}, + DataFormat: "grok", + Patterns: []string{"%{COMMON_LOG_FORMAT}"}, } nParser, err := parsers.NewParser(&parserConfig) @@ -67,7 +63,6 @@ func TestGrokParser(t *testing.T) { log.Printf("metric[0]_tags: %v, metric[0]_fields: %v", acc.Metrics[0].Tags, acc.Metrics[0].Fields) log.Printf("metric[1]_tags: %v, metric[1]_fields: %v", acc.Metrics[1].Tags, acc.Metrics[1].Fields) assert.Equal(t, 2, len(acc.Metrics)) - t.Error() } func getPluginDir() string { From 79d9ea476187878b91dcc742c6ecfd75f93806a6 Mon Sep 17 00:00:00 2001 From: Max U Date: Tue, 26 Jun 2018 10:26:48 -0700 Subject: [PATCH 12/18] add docker-image spin up for reader --- Makefile | 14 ++- internal/config/config.go | 9 ++ plugins/inputs/reader/dev/docker-compse.yml | 13 +++ plugins/inputs/reader/dev/telegraf.conf | 106 ++++++++++++++++++++ plugins/inputs/reader/reader.go | 8 +- 5 files changed, 148 insertions(+), 2 deletions(-) create mode 100644 plugins/inputs/reader/dev/docker-compse.yml create mode 100644 plugins/inputs/reader/dev/telegraf.conf diff --git a/Makefile b/Makefile index 8650df267..99d7a3a39 100644 --- a/Makefile +++ b/Makefile @@ -92,4 +92,16 @@ docker-image: plugins/parsers/influx/machine.go: plugins/parsers/influx/machine.go.rl ragel -Z -G2 $^ -o $@ -.PHONY: deps telegraf install test test-windows lint vet test-all package clean docker-image fmtcheck uint64 +static: + @echo "Building static linux binary..." + @CGO_ENABLED=0 \ + GOOS=linux \ + GOARCH=amd64 \ + go build -ldflags "$(LDFLAGS)" ./cmd/telegraf + +plugin-%: + @echo "Starting dev environment for $${$(@)} input plugin..." + @docker-compose -f plugins/inputs/$${$(@)}/dev/docker-compose.yml up + +.PHONY: deps telegraf install test test-windows lint vet test-all package clean docker-image fmtcheck uint64 static + diff --git a/internal/config/config.go b/internal/config/config.go index 1a98c61c5..845b0b92c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1393,6 +1393,14 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { c.MetricName = name + if node, ok := tbl.Fields["name_override"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.MetricName = str.Value + } + } + } + delete(tbl.Fields, "data_format") delete(tbl.Fields, "separator") delete(tbl.Fields, "templates") @@ -1411,6 +1419,7 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { delete(tbl.Fields, "custom_patterns") delete(tbl.Fields, "custom_pattern_files") delete(tbl.Fields, "timezone") + delete(tbl.Fields, "name_override") return parsers.NewParser(c) } diff --git a/plugins/inputs/reader/dev/docker-compse.yml b/plugins/inputs/reader/dev/docker-compse.yml new file mode 100644 index 000000000..a5abde8ac --- /dev/null +++ b/plugins/inputs/reader/dev/docker-compse.yml @@ -0,0 +1,13 @@ +version: '3' + +# services: + telegraf: + image: glinton/scratch + volumes: + - ./telegraf.conf:/telegraf.conf + - ../../../../telegraf:/telegraf + - ./test.log:/var/log/test.log + entrypoint: + - /telegraf + - --config + - /telegraf.conf \ No newline at end of file diff --git a/plugins/inputs/reader/dev/telegraf.conf b/plugins/inputs/reader/dev/telegraf.conf new file mode 100644 index 000000000..971845682 --- /dev/null +++ b/plugins/inputs/reader/dev/telegraf.conf @@ -0,0 +1,106 @@ + +# Global tags can be specified here in key="value" format. +[global_tags] + # dc = "us-east-1" # will tag all metrics with dc=us-east-1 + # rack = "1a" + ## Environment variables can be used as tags, and throughout the config file + # user = "$USER" + + +# Configuration for telegraf agent +[agent] + ## Default data collection interval for all inputs + interval = "10s" + ## Rounds collection interval to 'interval' + ## ie, if interval="10s" then always collect on :00, :10, :20, etc. + round_interval = true + + ## Telegraf will send metrics to outputs in batches of at most + ## metric_batch_size metrics. + ## This controls the size of writes that Telegraf sends to output plugins. + metric_batch_size = 1000 + + ## For failed writes, telegraf will cache metric_buffer_limit metrics for each + ## output, and will flush this buffer on a successful write. Oldest metrics + ## are dropped first when this buffer fills. + ## This buffer only fills when writes fail to output plugin(s). + metric_buffer_limit = 10000 + + ## Collection jitter is used to jitter the collection by a random amount. + ## Each plugin will sleep for a random time within jitter before collecting. + ## This can be used to avoid many plugins querying things like sysfs at the + ## same time, which can have a measurable effect on the system. + collection_jitter = "0s" + + ## Default flushing interval for all outputs. You shouldn't set this below + ## interval. Maximum flush_interval will be flush_interval + flush_jitter + flush_interval = "10s" + ## Jitter the flush interval by a random amount. This is primarily to avoid + ## large write spikes for users running a large number of telegraf instances. + ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s + flush_jitter = "0s" + + ## By default or when set to "0s", precision will be set to the same + ## timestamp order as the collection interval, with the maximum being 1s. + ## ie, when interval = "10s", precision will be "1s" + ## when interval = "250ms", precision will be "1ms" + ## Precision will NOT be used for service inputs. It is up to each individual + ## service input to set the timestamp at the appropriate precision. + ## Valid time units are "ns", "us" (or "µs"), "ms", "s". + precision = "" + + ## Logging configuration: + ## Run telegraf with debug log messages. + debug = false + ## Run telegraf in quiet mode (error log messages only). + quiet = false + ## Specify the log file name. The empty string means to log to stderr. + logfile = "" + + ## Override default hostname, if empty use os.Hostname() + hostname = "" + ## If set to true, do no set the "host" tag in the telegraf agent. + omit_hostname = false + +# # reload and gather from file[s] on telegraf's interval +[[inputs.reader]] +# ## These accept standard unix glob matching rules, but with the addition of +# ## ** as a "super asterisk". ie: +# ## /var/log/**.log -> recursively find all .log files in /var/log +# ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log +# ## /var/log/apache.log -> only tail the apache log file +files = ["/Users/maxu/go/src/github.com/influxdata/telegraf/plugins/inputs/reader/testfiles/grok_a.log"] +# +# ## The dataformat to be read from files +# ## Each data format has its own unique set of configuration options, read +# ## more about them here: +# ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md +data_format = "grok" +# + +patterns = ["%{COMMON_LOG_FORMAT}"] +# +# ## Name of the outputted measurement name. +measurement = "grok_reader" +# +# ## Full path(s) to custom pattern files. +# custom_pattern_files = [] +# +# ## Custom patterns can also be defined here. Put one pattern per line. +# custom_patterns = ''' +# ''' +# +# ## Timezone allows you to provide an override for timestamps that +# ## don't already include an offset +# ## e.g. 04/06/2016 12:41:45 data one two 5.43µs +# ## +# ## Default: "" which renders UTC +# ## Options are as follows: +# ## 1. Local -- interpret based on machine localtime +# ## 2. "Canada/Eastern" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones +# ## 3. UTC -- or blank/unspecified, will return timestamp in UTC +# timezone = "Canada/Eastern" + + +[[outputs.file]] + files = ["stdout"] \ No newline at end of file diff --git a/plugins/inputs/reader/reader.go b/plugins/inputs/reader/reader.go index bfccb87d3..eb2ce4804 100644 --- a/plugins/inputs/reader/reader.go +++ b/plugins/inputs/reader/reader.go @@ -50,7 +50,13 @@ func (r *Reader) Gather(acc telegraf.Accumulator) error { return err } - for _, m := range metrics { + for i, m := range metrics { + + //error if m is nil + if m == nil { + log.Printf("E! Metric could not be parsed from: %v, on line %v", k, i) + continue + } acc.AddFields(m.Name(), m.Fields(), m.Tags()) } } From bbd68b382091f8f581c02d5db75ad9fb43e0bbce Mon Sep 17 00:00:00 2001 From: Max U Date: Tue, 26 Jun 2018 11:26:43 -0700 Subject: [PATCH 13/18] docker will spin up --- internal/config/config.go | 9 --------- plugins/inputs/reader/dev/docker-compse.yml | 13 ------------- plugins/inputs/reader/dev/telegraf.conf | 10 +++++----- plugins/inputs/reader/reader.go | 2 +- 4 files changed, 6 insertions(+), 28 deletions(-) delete mode 100644 plugins/inputs/reader/dev/docker-compse.yml diff --git a/internal/config/config.go b/internal/config/config.go index 845b0b92c..1a98c61c5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1393,14 +1393,6 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { c.MetricName = name - if node, ok := tbl.Fields["name_override"]; ok { - if kv, ok := node.(*ast.KeyValue); ok { - if str, ok := kv.Value.(*ast.String); ok { - c.MetricName = str.Value - } - } - } - delete(tbl.Fields, "data_format") delete(tbl.Fields, "separator") delete(tbl.Fields, "templates") @@ -1419,7 +1411,6 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { delete(tbl.Fields, "custom_patterns") delete(tbl.Fields, "custom_pattern_files") delete(tbl.Fields, "timezone") - delete(tbl.Fields, "name_override") return parsers.NewParser(c) } diff --git a/plugins/inputs/reader/dev/docker-compse.yml b/plugins/inputs/reader/dev/docker-compse.yml deleted file mode 100644 index a5abde8ac..000000000 --- a/plugins/inputs/reader/dev/docker-compse.yml +++ /dev/null @@ -1,13 +0,0 @@ -version: '3' - -# services: - telegraf: - image: glinton/scratch - volumes: - - ./telegraf.conf:/telegraf.conf - - ../../../../telegraf:/telegraf - - ./test.log:/var/log/test.log - entrypoint: - - /telegraf - - --config - - /telegraf.conf \ No newline at end of file diff --git a/plugins/inputs/reader/dev/telegraf.conf b/plugins/inputs/reader/dev/telegraf.conf index 971845682..76f57f0ad 100644 --- a/plugins/inputs/reader/dev/telegraf.conf +++ b/plugins/inputs/reader/dev/telegraf.conf @@ -69,22 +69,22 @@ # ## /var/log/**.log -> recursively find all .log files in /var/log # ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log # ## /var/log/apache.log -> only tail the apache log file -files = ["/Users/maxu/go/src/github.com/influxdata/telegraf/plugins/inputs/reader/testfiles/grok_a.log"] +files = ["/var/log/test.log"] # # ## The dataformat to be read from files # ## Each data format has its own unique set of configuration options, read # ## more about them here: # ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md -data_format = "grok" +data_format = "json" # -patterns = ["%{COMMON_LOG_FORMAT}"] +#patterns = ["%{TEST_LOG_B}","%{TEST_LOG_A}"] # # ## Name of the outputted measurement name. -measurement = "grok_reader" +#name_override = "grok_reader" # # ## Full path(s) to custom pattern files. -# custom_pattern_files = [] +#custom_pattern_files = ["/Users/maxu/go/src/github.com/influxdata/telegraf/plugins/inputs/logparser/grok/testdata/test-patterns"] # # ## Custom patterns can also be defined here. Put one pattern per line. # custom_patterns = ''' diff --git a/plugins/inputs/reader/reader.go b/plugins/inputs/reader/reader.go index eb2ce4804..e21295611 100644 --- a/plugins/inputs/reader/reader.go +++ b/plugins/inputs/reader/reader.go @@ -18,7 +18,7 @@ type Reader struct { Filenames []string } -const sampleConfig = `## Files to parse. +const sampleConfig = `## Files to parse each interval. ## These accept standard unix glob matching rules, but with the addition of ## ** as a "super asterisk". ie: ## /var/log/**.log -> recursively find all .log files in /var/log From bf7220d2ceac69f2a5b66f273d30497ae9781dfb Mon Sep 17 00:00:00 2001 From: Max U Date: Tue, 26 Jun 2018 11:53:40 -0700 Subject: [PATCH 14/18] add test file to docker spin up --- plugins/inputs/reader/dev/docker-compose.yml | 13 +++ plugins/inputs/reader/dev/json_a.log | 14 +++ telegraf.conf | 104 +++++++++++++++++++ 3 files changed, 131 insertions(+) create mode 100644 plugins/inputs/reader/dev/docker-compose.yml create mode 100644 plugins/inputs/reader/dev/json_a.log create mode 100644 telegraf.conf diff --git a/plugins/inputs/reader/dev/docker-compose.yml b/plugins/inputs/reader/dev/docker-compose.yml new file mode 100644 index 000000000..abeaf9315 --- /dev/null +++ b/plugins/inputs/reader/dev/docker-compose.yml @@ -0,0 +1,13 @@ +version: '3' + +services: + telegraf: + image: glinton/scratch + volumes: + - ./telegraf.conf:/telegraf.conf + - ../../../../telegraf:/telegraf + - ./json_a.log:/var/log/test.log + entrypoint: + - /telegraf + - --config + - /telegraf.conf \ No newline at end of file diff --git a/plugins/inputs/reader/dev/json_a.log b/plugins/inputs/reader/dev/json_a.log new file mode 100644 index 000000000..739fd65d8 --- /dev/null +++ b/plugins/inputs/reader/dev/json_a.log @@ -0,0 +1,14 @@ +{ + "parent": { + "child": 3.0, + "ignored_child": "hi" + }, + "ignored_null": null, + "integer": 4, + "list": [3, 4], + "ignored_parent": { + "another_ignored_null": null, + "ignored_string": "hello, world!" + }, + "another_list": [4] + } \ No newline at end of file diff --git a/telegraf.conf b/telegraf.conf new file mode 100644 index 000000000..1a43b5a18 --- /dev/null +++ b/telegraf.conf @@ -0,0 +1,104 @@ + +# Global tags can be specified here in key="value" format. +[global_tags] + # dc = "us-east-1" # will tag all metrics with dc=us-east-1 + # rack = "1a" + ## Environment variables can be used as tags, and throughout the config file + # user = "$USER" + + +# Configuration for telegraf agent +[agent] + ## Default data collection interval for all inputs + interval = "10s" + ## Rounds collection interval to 'interval' + ## ie, if interval="10s" then always collect on :00, :10, :20, etc. + round_interval = true + + ## Telegraf will send metrics to outputs in batches of at most + ## metric_batch_size metrics. + ## This controls the size of writes that Telegraf sends to output plugins. + metric_batch_size = 1000 + + ## For failed writes, telegraf will cache metric_buffer_limit metrics for each + ## output, and will flush this buffer on a successful write. Oldest metrics + ## are dropped first when this buffer fills. + ## This buffer only fills when writes fail to output plugin(s). + metric_buffer_limit = 10000 + + ## Collection jitter is used to jitter the collection by a random amount. + ## Each plugin will sleep for a random time within jitter before collecting. + ## This can be used to avoid many plugins querying things like sysfs at the + ## same time, which can have a measurable effect on the system. + collection_jitter = "0s" + + ## Default flushing interval for all outputs. You shouldn't set this below + ## interval. Maximum flush_interval will be flush_interval + flush_jitter + flush_interval = "10s" + ## Jitter the flush interval by a random amount. This is primarily to avoid + ## large write spikes for users running a large number of telegraf instances. + ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s + flush_jitter = "0s" + + ## By default or when set to "0s", precision will be set to the same + ## timestamp order as the collection interval, with the maximum being 1s. + ## ie, when interval = "10s", precision will be "1s" + ## when interval = "250ms", precision will be "1ms" + ## Precision will NOT be used for service inputs. It is up to each individual + ## service input to set the timestamp at the appropriate precision. + ## Valid time units are "ns", "us" (or "µs"), "ms", "s". + precision = "" + + ## Logging configuration: + ## Run telegraf with debug log messages. + debug = false + ## Run telegraf in quiet mode (error log messages only). + quiet = false + ## Specify the log file name. The empty string means to log to stderr. + logfile = "" + + ## Override default hostname, if empty use os.Hostname() + hostname = "" + ## If set to true, do no set the "host" tag in the telegraf agent. + omit_hostname = false + +# # reload and gather from file[s] on telegraf's interval +[[inputs.reader]] +# ## These accept standard unix glob matching rules, but with the addition of +# ## ** as a "super asterisk". ie: +# ## /var/log/**.log -> recursively find all .log files in /var/log +# ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log +# ## /var/log/apache.log -> only tail the apache log file +files = ["/Users/maxu/go/src/github.com/influxdata/telegraf/plugins/inputs/logparser/grok/testdata/**.log"] +# +# ## The dataformat to be read from files +# ## Each data format has its own unique set of configuration options, read +# ## more about them here: +# ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md +data_format = "grok" +# + +patterns = ["%{TEST_LOG_B}","%{TEST_LOG_A}"] +# +# ## Name of the outputted measurement name. +name_override = "grok_reader" +# +# ## Full path(s) to custom pattern files. +custom_pattern_files = ["/Users/maxu/go/src/github.com/influxdata/telegraf/plugins/inputs/logparser/grok/testdata/test-patterns"] +# +# ## Custom patterns can also be defined here. Put one pattern per line. +# custom_patterns = ''' +# ''' +# +# ## Timezone allows you to provide an override for timestamps that +# ## don't already include an offset +# ## e.g. 04/06/2016 12:41:45 data one two 5.43µs +# ## +# ## Default: "" which renders UTC +# ## Options are as follows: +# ## 1. Local -- interpret based on machine localtime +# ## 2. "Canada/Eastern" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones +# ## 3. UTC -- or blank/unspecified, will return timestamp in UTC +# timezone = "Canada/Eastern" + + From a931eb1c900d71fdff44b7c440d3a8e248c4e7b4 Mon Sep 17 00:00:00 2001 From: Max U Date: Tue, 26 Jun 2018 12:06:35 -0700 Subject: [PATCH 15/18] update DATA_FORMATS_INPUT.MD to include grok --- docs/DATA_FORMATS_INPUT.md | 35 ++++++++++++++++++++++++- plugins/inputs/reader/dev/telegraf.conf | 2 +- 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index c1192e72b..fa30b54c5 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -9,6 +9,7 @@ Telegraf is able to parse the following input data formats into metrics: 1. [Nagios](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#nagios) (exec input only) 1. [Collectd](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#collectd) 1. [Dropwizard](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#dropwizard) +1. [Grok](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#grok) Telegraf metrics, like InfluxDB [points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/), @@ -651,5 +652,37 @@ For more information about the dropwizard json format see # [inputs.exec.dropwizard_tag_paths] # tag1 = "tags.tag1" # tag2 = "tags.tag2" +``` -``` \ No newline at end of file +#### Grok +Parse logstash-style "grok" patterns: +```toml + [inputs.reader] + ## This is a list of patterns to check the given log file(s) for. + ## Note that adding patterns here increases processing time. The most + ## efficient configuration is to have one pattern per logparser. + ## Other common built-in patterns are: + ## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs) + ## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent) + patterns = ["%{COMBINED_LOG_FORMAT}"] + + ## Name of the outputted measurement name. + name_override = "apache_access_log" + + ## Full path(s) to custom pattern files. + custom_pattern_files = [] + + ## Custom patterns can also be defined here. Put one pattern per line. + custom_patterns = ''' + + ## Timezone allows you to provide an override for timestamps that + ## don't already include an offset + ## e.g. 04/06/2016 12:41:45 data one two 5.43µs + ## + ## Default: "" which renders UTC + ## Options are as follows: + ## 1. Local -- interpret based on machine localtime + ## 2. "Canada/Eastern" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones + ## 3. UTC -- or blank/unspecified, will return timestamp in UTC + timezone = "Canada/Eastern" + ``` \ No newline at end of file diff --git a/plugins/inputs/reader/dev/telegraf.conf b/plugins/inputs/reader/dev/telegraf.conf index 76f57f0ad..e8da72c13 100644 --- a/plugins/inputs/reader/dev/telegraf.conf +++ b/plugins/inputs/reader/dev/telegraf.conf @@ -10,7 +10,7 @@ # Configuration for telegraf agent [agent] ## Default data collection interval for all inputs - interval = "10s" + interval = "15s" ## Rounds collection interval to 'interval' ## ie, if interval="10s" then always collect on :00, :10, :20, etc. round_interval = true From e450b266ec6d6b9a99026caa62d4aea6a15baac6 Mon Sep 17 00:00:00 2001 From: Max U Date: Tue, 26 Jun 2018 13:10:46 -0700 Subject: [PATCH 16/18] remove comments --- plugins/inputs/reader/dev/telegraf.conf | 104 ++---------------------- plugins/inputs/reader/reader_test.go | 6 -- 2 files changed, 5 insertions(+), 105 deletions(-) diff --git a/plugins/inputs/reader/dev/telegraf.conf b/plugins/inputs/reader/dev/telegraf.conf index e8da72c13..fd89ae4f3 100644 --- a/plugins/inputs/reader/dev/telegraf.conf +++ b/plugins/inputs/reader/dev/telegraf.conf @@ -1,105 +1,11 @@ -# Global tags can be specified here in key="value" format. -[global_tags] - # dc = "us-east-1" # will tag all metrics with dc=us-east-1 - # rack = "1a" - ## Environment variables can be used as tags, and throughout the config file - # user = "$USER" - - -# Configuration for telegraf agent -[agent] - ## Default data collection interval for all inputs - interval = "15s" - ## Rounds collection interval to 'interval' - ## ie, if interval="10s" then always collect on :00, :10, :20, etc. - round_interval = true - - ## Telegraf will send metrics to outputs in batches of at most - ## metric_batch_size metrics. - ## This controls the size of writes that Telegraf sends to output plugins. - metric_batch_size = 1000 - - ## For failed writes, telegraf will cache metric_buffer_limit metrics for each - ## output, and will flush this buffer on a successful write. Oldest metrics - ## are dropped first when this buffer fills. - ## This buffer only fills when writes fail to output plugin(s). - metric_buffer_limit = 10000 - - ## Collection jitter is used to jitter the collection by a random amount. - ## Each plugin will sleep for a random time within jitter before collecting. - ## This can be used to avoid many plugins querying things like sysfs at the - ## same time, which can have a measurable effect on the system. - collection_jitter = "0s" - - ## Default flushing interval for all outputs. You shouldn't set this below - ## interval. Maximum flush_interval will be flush_interval + flush_jitter - flush_interval = "10s" - ## Jitter the flush interval by a random amount. This is primarily to avoid - ## large write spikes for users running a large number of telegraf instances. - ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s - flush_jitter = "0s" - - ## By default or when set to "0s", precision will be set to the same - ## timestamp order as the collection interval, with the maximum being 1s. - ## ie, when interval = "10s", precision will be "1s" - ## when interval = "250ms", precision will be "1ms" - ## Precision will NOT be used for service inputs. It is up to each individual - ## service input to set the timestamp at the appropriate precision. - ## Valid time units are "ns", "us" (or "µs"), "ms", "s". - precision = "" - - ## Logging configuration: - ## Run telegraf with debug log messages. - debug = false - ## Run telegraf in quiet mode (error log messages only). - quiet = false - ## Specify the log file name. The empty string means to log to stderr. - logfile = "" - - ## Override default hostname, if empty use os.Hostname() - hostname = "" - ## If set to true, do no set the "host" tag in the telegraf agent. - omit_hostname = false - -# # reload and gather from file[s] on telegraf's interval [[inputs.reader]] -# ## These accept standard unix glob matching rules, but with the addition of -# ## ** as a "super asterisk". ie: -# ## /var/log/**.log -> recursively find all .log files in /var/log -# ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log -# ## /var/log/apache.log -> only tail the apache log file -files = ["/var/log/test.log"] -# -# ## The dataformat to be read from files -# ## Each data format has its own unique set of configuration options, read -# ## more about them here: -# ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md -data_format = "json" -# -#patterns = ["%{TEST_LOG_B}","%{TEST_LOG_A}"] -# -# ## Name of the outputted measurement name. -#name_override = "grok_reader" -# -# ## Full path(s) to custom pattern files. -#custom_pattern_files = ["/Users/maxu/go/src/github.com/influxdata/telegraf/plugins/inputs/logparser/grok/testdata/test-patterns"] -# -# ## Custom patterns can also be defined here. Put one pattern per line. -# custom_patterns = ''' -# ''' -# -# ## Timezone allows you to provide an override for timestamps that -# ## don't already include an offset -# ## e.g. 04/06/2016 12:41:45 data one two 5.43µs -# ## -# ## Default: "" which renders UTC -# ## Options are as follows: -# ## 1. Local -- interpret based on machine localtime -# ## 2. "Canada/Eastern" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones -# ## 3. UTC -- or blank/unspecified, will return timestamp in UTC -# timezone = "Canada/Eastern" +files = ["/var/log/test.log"] + +data_format = "json" + +name_override = "json_reader" [[outputs.file]] diff --git a/plugins/inputs/reader/reader_test.go b/plugins/inputs/reader/reader_test.go index ca076350b..8295b294b 100644 --- a/plugins/inputs/reader/reader_test.go +++ b/plugins/inputs/reader/reader_test.go @@ -1,7 +1,6 @@ package reader import ( - "log" "runtime" "strings" "testing" @@ -36,7 +35,6 @@ func TestJSONParserCompile(t *testing.T) { assert.NoError(t, err) r.Gather(&acc) - log.Printf("acc: %v", acc.Metrics[0].Tags) assert.Equal(t, map[string]string{"parent_ignored_child": "hi"}, acc.Metrics[0].Tags) assert.Equal(t, 5, len(acc.Metrics[0].Fields)) } @@ -57,11 +55,7 @@ func TestGrokParser(t *testing.T) { r.parser = nParser assert.NoError(t, err) - log.Printf("path: %v", r.Filepaths[0]) err = r.Gather(&acc) - log.Printf("err: %v", err) - log.Printf("metric[0]_tags: %v, metric[0]_fields: %v", acc.Metrics[0].Tags, acc.Metrics[0].Fields) - log.Printf("metric[1]_tags: %v, metric[1]_fields: %v", acc.Metrics[1].Tags, acc.Metrics[1].Fields) assert.Equal(t, 2, len(acc.Metrics)) } From 001658af30b420120488f406fd9b215e99bc38ce Mon Sep 17 00:00:00 2001 From: Max U Date: Tue, 26 Jun 2018 13:18:43 -0700 Subject: [PATCH 17/18] condense telegraf.conf --- plugins/inputs/reader/dev/telegraf.conf | 10 +++------- plugins/inputs/reader/reader_test.go | 1 - 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/plugins/inputs/reader/dev/telegraf.conf b/plugins/inputs/reader/dev/telegraf.conf index fd89ae4f3..caeaae4fd 100644 --- a/plugins/inputs/reader/dev/telegraf.conf +++ b/plugins/inputs/reader/dev/telegraf.conf @@ -1,12 +1,8 @@ [[inputs.reader]] - -files = ["/var/log/test.log"] - -data_format = "json" - -name_override = "json_reader" - + files = ["/var/log/test.log"] + data_format = "json" + name_override = "json_reader" [[outputs.file]] files = ["stdout"] \ No newline at end of file diff --git a/plugins/inputs/reader/reader_test.go b/plugins/inputs/reader/reader_test.go index 8295b294b..c46eb0272 100644 --- a/plugins/inputs/reader/reader_test.go +++ b/plugins/inputs/reader/reader_test.go @@ -17,7 +17,6 @@ func TestRefreshFilePaths(t *testing.T) { } r.refreshFilePaths() - //log.Printf("filenames: %v", filenames) assert.Equal(t, len(r.Filenames), 2) } func TestJSONParserCompile(t *testing.T) { From 7fa27f400d63f04b90961307d52dd7e7ca39b89c Mon Sep 17 00:00:00 2001 From: Max U Date: Tue, 26 Jun 2018 13:19:53 -0700 Subject: [PATCH 18/18] more condensing --- plugins/inputs/reader/dev/telegraf.conf | 1 - 1 file changed, 1 deletion(-) diff --git a/plugins/inputs/reader/dev/telegraf.conf b/plugins/inputs/reader/dev/telegraf.conf index caeaae4fd..abaf90b83 100644 --- a/plugins/inputs/reader/dev/telegraf.conf +++ b/plugins/inputs/reader/dev/telegraf.conf @@ -1,4 +1,3 @@ - [[inputs.reader]] files = ["/var/log/test.log"] data_format = "json"