Add file input plugin and grok parser (#4332)

This commit is contained in:
maxunt 2018-07-13 23:22:59 -07:00 committed by Daniel Nelson
parent 3f87e5bf57
commit 774a9f0492
24 changed files with 558 additions and 154 deletions

View File

@ -153,6 +153,7 @@ configuration options.
* [exec](./plugins/inputs/exec) (generic executable plugin, support JSON, influx, graphite and nagios) * [exec](./plugins/inputs/exec) (generic executable plugin, support JSON, influx, graphite and nagios)
* [fail2ban](./plugins/inputs/fail2ban) * [fail2ban](./plugins/inputs/fail2ban)
* [fibaro](./plugins/inputs/fibaro) * [fibaro](./plugins/inputs/fibaro)
* [file](./plugins/inputs/file)
* [filestat](./plugins/inputs/filestat) * [filestat](./plugins/inputs/filestat)
* [fluentd](./plugins/inputs/fluentd) * [fluentd](./plugins/inputs/fluentd)
* [graylog](./plugins/inputs/graylog) * [graylog](./plugins/inputs/graylog)

View File

@ -9,6 +9,7 @@ Telegraf is able to parse the following input data formats into metrics:
1. [Nagios](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#nagios) (exec input only) 1. [Nagios](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#nagios) (exec input only)
1. [Collectd](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#collectd) 1. [Collectd](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#collectd)
1. [Dropwizard](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#dropwizard) 1. [Dropwizard](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#dropwizard)
1. [Grok](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#grok)
Telegraf metrics, like InfluxDB Telegraf metrics, like InfluxDB
[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/), [points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/),
@ -657,5 +658,107 @@ For more information about the dropwizard json format see
# [inputs.exec.dropwizard_tag_paths] # [inputs.exec.dropwizard_tag_paths]
# tag1 = "tags.tag1" # tag1 = "tags.tag1"
# tag2 = "tags.tag2" # tag2 = "tags.tag2"
```
#### Grok
Parse logstash-style "grok" patterns. Patterns can be added to patterns, or custom patterns read from custom_pattern_files.
# View logstash grok pattern docs here:
# https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
# All default logstash patterns are supported, these can be viewed here:
# https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
# Available modifiers:
# string (default if nothing is specified)
# int
# float
# duration (ie, 5.23ms gets converted to int nanoseconds)
# tag (converts the field into a tag)
# drop (drops the field completely)
# Timestamp modifiers:
# ts-ansic ("Mon Jan _2 15:04:05 2006")
# ts-unix ("Mon Jan _2 15:04:05 MST 2006")
# ts-ruby ("Mon Jan 02 15:04:05 -0700 2006")
# ts-rfc822 ("02 Jan 06 15:04 MST")
# ts-rfc822z ("02 Jan 06 15:04 -0700")
# ts-rfc850 ("Monday, 02-Jan-06 15:04:05 MST")
# ts-rfc1123 ("Mon, 02 Jan 2006 15:04:05 MST")
# ts-rfc1123z ("Mon, 02 Jan 2006 15:04:05 -0700")
# ts-rfc3339 ("2006-01-02T15:04:05Z07:00")
# ts-rfc3339nano ("2006-01-02T15:04:05.999999999Z07:00")
# ts-httpd ("02/Jan/2006:15:04:05 -0700")
# ts-epoch (seconds since unix epoch)
# ts-epochnano (nanoseconds since unix epoch)
# ts-"CUSTOM"
# CUSTOM time layouts must be within quotes and be the representation of the
# "reference time", which is Mon Jan 2 15:04:05 -0700 MST 2006
# See https://golang.org/pkg/time/#Parse for more details.
# Example log file pattern, example log looks like this:
# [04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432µs
# Breakdown of the DURATION pattern below:
# NUMBER is a builtin logstash grok pattern matching float & int numbers.
# [nuµm]? is a regex specifying 0 or 1 of the characters within brackets.
# s is also regex, this pattern must end in "s".
# so DURATION will match something like '5.324ms' or '6.1µs' or '10s'
DURATION %{NUMBER}[nuµm]?s
RESPONSE_CODE %{NUMBER:response_code:tag}
RESPONSE_TIME %{DURATION:response_time_ns:duration}
EXAMPLE_LOG \[%{HTTPDATE:ts:ts-httpd}\] %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME}
# Wider-ranging username matching vs. logstash built-in %{USER}
NGUSERNAME [a-zA-Z0-9\.\@\-\+_%]+
NGUSER %{NGUSERNAME}
# Wider-ranging client IP matching
CLIENT (?:%{IPORHOST}|%{HOSTPORT}|::1)
##
## COMMON LOG PATTERNS
##
# apache & nginx logs, this is also known as the "common log format"
# see https://en.wikipedia.org/wiki/Common_Log_Format
COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NOTSPACE:ident} %{NOTSPACE:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-)
# Combined log format is the same as the common log format but with the addition
# of two quoted strings at the end for "referrer" and "agent"
# See Examples at http://httpd.apache.org/docs/current/mod/mod_log_config.html
COMBINED_LOG_FORMAT %{COMMON_LOG_FORMAT} %{QS:referrer} %{QS:agent}
# HTTPD log formats
HTTPD20_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{LOGLEVEL:loglevel:tag}\] (?:\[client %{IPORHOST:clientip}\] ){0,1}%{GREEDYDATA:errormsg}
HTTPD24_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{WORD:module}:%{LOGLEVEL:loglevel:tag}\] \[pid %{POSINT:pid:int}:tid %{NUMBER:tid:int}\]( \(%{POSINT:proxy_errorcode:int}\)%{DATA:proxy_errormessage}:)?( \[client %{IPORHOST:client}:%{POSINT:clientport}\])? %{DATA:errorcode}: %{GREEDYDATA:message}
HTTPD_ERRORLOG %{HTTPD20_ERRORLOG}|%{HTTPD24_ERRORLOG}
#### Grok Configuration:
```toml
[[inputs.reader]]
## This is a list of patterns to check the given log file(s) for.
## Note that adding patterns here increases processing time. The most
## efficient configuration is to have one pattern per logparser.
## Other common built-in patterns are:
## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs)
## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent)
grok_patterns = ["%{COMBINED_LOG_FORMAT}"]
## Name of the outputted measurement name.
grok_name_override = "apache_access_log"
## Full path(s) to custom pattern files.
grok_custom_pattern_files = []
## Custom patterns can also be defined here. Put one pattern per line.
grok_custom_patterns = '''
'''
## Timezone allows you to provide an override for timestamps that
## don't already include an offset
## e.g. 04/06/2016 12:41:45 data one two 5.43µs
##
## Default: "" which renders UTC
## Options are as follows:
## 1. Local -- interpret based on machine localtime
## 2. "Canada/Eastern" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
## 3. UTC -- or blank/unspecified, will return timestamp in UTC
grok_timezone = "Canada/Eastern"
``` ```

View File

@ -1346,6 +1346,59 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
} }
} }
//for grok data_format
if node, ok := tbl.Fields["grok_named_patterns"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.GrokNamedPatterns = append(c.GrokNamedPatterns, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["grok_patterns"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.GrokPatterns = append(c.GrokPatterns, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["grok_custom_patterns"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.GrokCustomPatterns = str.Value
}
}
}
if node, ok := tbl.Fields["grok_custom_pattern_files"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.GrokCustomPatternFiles = append(c.GrokCustomPatternFiles, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["grok_timezone"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.GrokTimeZone = str.Value
}
}
}
c.MetricName = name c.MetricName = name
delete(tbl.Fields, "data_format") delete(tbl.Fields, "data_format")
@ -1362,6 +1415,11 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
delete(tbl.Fields, "dropwizard_time_format") delete(tbl.Fields, "dropwizard_time_format")
delete(tbl.Fields, "dropwizard_tags_path") delete(tbl.Fields, "dropwizard_tags_path")
delete(tbl.Fields, "dropwizard_tag_paths") delete(tbl.Fields, "dropwizard_tag_paths")
delete(tbl.Fields, "grok_named_patterns")
delete(tbl.Fields, "grok_patterns")
delete(tbl.Fields, "grok_custom_patterns")
delete(tbl.Fields, "grok_custom_pattern_files")
delete(tbl.Fields, "grok_timezone")
return parsers.NewParser(c) return parsers.NewParser(c)
} }

View File

@ -30,6 +30,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/exec" _ "github.com/influxdata/telegraf/plugins/inputs/exec"
_ "github.com/influxdata/telegraf/plugins/inputs/fail2ban" _ "github.com/influxdata/telegraf/plugins/inputs/fail2ban"
_ "github.com/influxdata/telegraf/plugins/inputs/fibaro" _ "github.com/influxdata/telegraf/plugins/inputs/fibaro"
_ "github.com/influxdata/telegraf/plugins/inputs/file"
_ "github.com/influxdata/telegraf/plugins/inputs/filestat" _ "github.com/influxdata/telegraf/plugins/inputs/filestat"
_ "github.com/influxdata/telegraf/plugins/inputs/fluentd" _ "github.com/influxdata/telegraf/plugins/inputs/fluentd"
_ "github.com/influxdata/telegraf/plugins/inputs/graylog" _ "github.com/influxdata/telegraf/plugins/inputs/graylog"

View File

@ -0,0 +1,25 @@
# File Input Plugin
The file plugin updates a list of files every interval and parses the contents
using the selected [input data format](/docs/DATA_FORMATS_INPUT.md).
Files will always be read in their entirety, if you wish to tail/follow a file
use the [tail input plugin](/plugins/inputs/tail) instead.
### Configuration:
```toml
[[inputs.file]]
## Files to parse each interval.
## These accept standard unix glob matching rules, but with the addition of
## ** as a "super asterisk". ie:
## /var/log/**.log -> recursively find all .log files in /var/log
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
## /var/log/apache.log -> only tail the apache log file
files = ["/var/log/apache/access.log"]
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
```

View File

@ -0,0 +1,13 @@
version: '3'
services:
telegraf:
image: glinton/scratch
volumes:
- ./telegraf.conf:/telegraf.conf
- ../../../../telegraf:/telegraf
- ./json_a.log:/var/log/test.log
entrypoint:
- /telegraf
- --config
- /telegraf.conf

View File

@ -0,0 +1,14 @@
{
"parent": {
"child": 3.0,
"ignored_child": "hi"
},
"ignored_null": null,
"integer": 4,
"list": [3, 4],
"ignored_parent": {
"another_ignored_null": null,
"ignored_string": "hello, world!"
},
"another_list": [4]
}

View File

@ -0,0 +1,7 @@
[[inputs.file]]
files = ["/var/log/test.log"]
data_format = "json"
name_override = "json_file"
[[outputs.file]]
files = ["stdout"]

102
plugins/inputs/file/file.go Normal file
View File

@ -0,0 +1,102 @@
package file
import (
"fmt"
"io/ioutil"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)
type File struct {
Files []string `toml:"files"`
FromBeginning bool
parser parsers.Parser
filenames []string
}
const sampleConfig = `
## Files to parse each interval.
## These accept standard unix glob matching rules, but with the addition of
## ** as a "super asterisk". ie:
## /var/log/**.log -> recursively find all .log files in /var/log
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
## /var/log/apache.log -> only tail the apache log file
files = ["/var/log/apache/access.log"]
## The dataformat to be read from files
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
`
// SampleConfig returns the default configuration of the Input
func (f *File) SampleConfig() string {
return sampleConfig
}
func (f *File) Description() string {
return "reload and gather from file[s] on telegraf's interval"
}
func (f *File) Gather(acc telegraf.Accumulator) error {
err := f.refreshFilePaths()
if err != nil {
return err
}
for _, k := range f.filenames {
metrics, err := f.readMetric(k)
if err != nil {
return err
}
for _, m := range metrics {
acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
}
return nil
}
func (f *File) SetParser(p parsers.Parser) {
f.parser = p
}
func (f *File) refreshFilePaths() error {
var allFiles []string
for _, file := range f.Files {
g, err := globpath.Compile(file)
if err != nil {
return fmt.Errorf("could not compile glob %v: %v", file, err)
}
files := g.Match()
if len(files) <= 0 {
return fmt.Errorf("could not find file: %v", file)
}
for k := range files {
allFiles = append(allFiles, k)
}
}
f.filenames = allFiles
return nil
}
func (f *File) readMetric(filename string) ([]telegraf.Metric, error) {
fileContents, err := ioutil.ReadFile(filename)
if err != nil {
return nil, fmt.Errorf("E! Error file: %v could not be read, %s", filename, err)
}
return f.parser.Parse(fileContents)
}
func init() {
inputs.Add("file", func() telegraf.Input {
return &File{}
})
}

View File

@ -0,0 +1,61 @@
package file
import (
"os"
"path/filepath"
"testing"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestRefreshFilePaths(t *testing.T) {
wd, err := os.Getwd()
r := File{
Files: []string{filepath.Join(wd, "testfiles/**.log")},
}
err = r.refreshFilePaths()
require.NoError(t, err)
assert.Equal(t, len(r.filenames), 2)
}
func TestJSONParserCompile(t *testing.T) {
var acc testutil.Accumulator
wd, _ := os.Getwd()
r := File{
Files: []string{filepath.Join(wd, "testfiles/json_a.log")},
}
parserConfig := parsers.Config{
DataFormat: "json",
TagKeys: []string{"parent_ignored_child"},
}
nParser, err := parsers.NewParser(&parserConfig)
r.parser = nParser
assert.NoError(t, err)
r.Gather(&acc)
assert.Equal(t, map[string]string{"parent_ignored_child": "hi"}, acc.Metrics[0].Tags)
assert.Equal(t, 5, len(acc.Metrics[0].Fields))
}
func TestGrokParser(t *testing.T) {
wd, _ := os.Getwd()
var acc testutil.Accumulator
r := File{
Files: []string{filepath.Join(wd, "testfiles/grok_a.log")},
}
parserConfig := parsers.Config{
DataFormat: "grok",
GrokPatterns: []string{"%{COMMON_LOG_FORMAT}"},
}
nParser, err := parsers.NewParser(&parserConfig)
r.parser = nParser
assert.NoError(t, err)
err = r.Gather(&acc)
assert.Equal(t, 2, len(acc.Metrics))
}

View File

@ -0,0 +1,2 @@
127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326
128.0.0.1 user-identifier tony [10/Oct/2000:13:55:36 -0800] "GET /apache_pb.gif HTTP/1.0" 300 45

View File

@ -0,0 +1,14 @@
{
"parent": {
"child": 3.0,
"ignored_child": "hi"
},
"ignored_null": null,
"integer": 4,
"list": [3, 4],
"ignored_parent": {
"another_ignored_null": null,
"ignored_string": "hello, world!"
},
"another_list": [4]
}

View File

@ -1,5 +1,9 @@
# Logparser Input Plugin # Logparser Input Plugin
### **Deprecated in version 1.8**: Please use the
[tail](/plugins/inputs/tail) plugin with the `grok`
[data format](/docs/DATA_FORMATS_INPUT.md).
The `logparser` plugin streams and parses the given logfiles. Currently it The `logparser` plugin streams and parses the given logfiles. Currently it
has the capability of parsing "grok" patterns from logfiles, which also supports has the capability of parsing "grok" patterns from logfiles, which also supports
regex patterns. regex patterns.
@ -8,6 +12,9 @@ regex patterns.
```toml ```toml
[[inputs.logparser]] [[inputs.logparser]]
## DEPRECATED: The `logparser` plugin is deprecated in 1.8. Please use the
## `tail` plugin with the grok data_format instead.
## Log files to parse. ## Log files to parse.
## These accept standard unix glob matching rules, but with the addition of ## These accept standard unix glob matching rules, but with the addition of
## ** as a "super asterisk". ie: ## ** as a "super asterisk". ie:

Binary file not shown.

View File

@ -3,9 +3,7 @@
package logparser package logparser
import ( import (
"fmt"
"log" "log"
"reflect"
"strings" "strings"
"sync" "sync"
@ -14,9 +12,8 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/globpath" "github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
// Parsers // Parsers
"github.com/influxdata/telegraf/plugins/inputs/logparser/grok"
) )
const ( const (
@ -24,9 +21,13 @@ const (
) )
// LogParser in the primary interface for the plugin // LogParser in the primary interface for the plugin
type LogParser interface { type GrokConfig struct {
ParseLine(line string) (telegraf.Metric, error) MeasurementName string `toml:"measurement"`
Compile() error Patterns []string
NamedPatterns []string
CustomPatterns string
CustomPatternFiles []string
TimeZone string
} }
type logEntry struct { type logEntry struct {
@ -45,14 +46,17 @@ type LogParserPlugin struct {
done chan struct{} done chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
acc telegraf.Accumulator acc telegraf.Accumulator
parsers []LogParser
sync.Mutex sync.Mutex
GrokParser *grok.Parser `toml:"grok"` GrokParser parsers.Parser
GrokConfig GrokConfig `toml:"grok"`
} }
const sampleConfig = ` const sampleConfig = `
## DEPRECATED: The 'logparser' plugin is deprecated in 1.8. Please use the
## 'tail' plugin with the grok data_format as a replacement.
## Log files to parse. ## Log files to parse.
## These accept standard unix glob matching rules, but with the addition of ## These accept standard unix glob matching rules, but with the addition of
## ** as a "super asterisk". ie: ## ** as a "super asterisk". ie:
@ -122,6 +126,9 @@ func (l *LogParserPlugin) Gather(acc telegraf.Accumulator) error {
// Start kicks off collection of stats for the plugin // Start kicks off collection of stats for the plugin
func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
log.Println("W! DEPRECATED: The logparser plugin is deprecated in 1.8. " +
"Please use the tail plugin with the grok data_format as a replacement.")
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
@ -131,32 +138,19 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
l.tailers = make(map[string]*tail.Tail) l.tailers = make(map[string]*tail.Tail)
// Looks for fields which implement LogParser interface // Looks for fields which implement LogParser interface
l.parsers = []LogParser{} config := &parsers.Config{
s := reflect.ValueOf(l).Elem() GrokPatterns: l.GrokConfig.Patterns,
for i := 0; i < s.NumField(); i++ { GrokNamedPatterns: l.GrokConfig.NamedPatterns,
f := s.Field(i) GrokCustomPatterns: l.GrokConfig.CustomPatterns,
GrokCustomPatternFiles: l.GrokConfig.CustomPatternFiles,
if !f.CanInterface() { GrokTimeZone: l.GrokConfig.TimeZone,
continue DataFormat: "grok",
}
if lpPlugin, ok := f.Interface().(LogParser); ok {
if reflect.ValueOf(lpPlugin).IsNil() {
continue
}
l.parsers = append(l.parsers, lpPlugin)
}
} }
if len(l.parsers) == 0 { var err error
return fmt.Errorf("logparser input plugin: no parser defined") l.GrokParser, err = parsers.NewParser(config)
} if err != nil {
return err
// compile log parser patterns:
for _, parser := range l.parsers {
if err := parser.Compile(); err != nil {
return err
}
} }
l.wg.Add(1) l.wg.Add(1)
@ -251,8 +245,8 @@ func (l *LogParserPlugin) receiver(tailer *tail.Tail) {
} }
} }
// parser is launched as a goroutine to watch the l.lines channel. // parse is launched as a goroutine to watch the l.lines channel.
// when a line is available, parser parses it and adds the metric(s) to the // when a line is available, parse parses it and adds the metric(s) to the
// accumulator. // accumulator.
func (l *LogParserPlugin) parser() { func (l *LogParserPlugin) parser() {
defer l.wg.Done() defer l.wg.Done()
@ -269,18 +263,17 @@ func (l *LogParserPlugin) parser() {
continue continue
} }
} }
for _, parser := range l.parsers { m, err = l.GrokParser.ParseLine(entry.line)
m, err = parser.ParseLine(entry.line) if err == nil {
if err == nil { if m != nil {
if m != nil { tags := m.Tags()
tags := m.Tags() tags["path"] = entry.path
tags["path"] = entry.path l.acc.AddFields(l.GrokConfig.MeasurementName, m.Fields(), tags, m.Time())
l.acc.AddFields(m.Name(), m.Fields(), tags, m.Time())
}
} else {
log.Println("E! Error parsing log line: " + err.Error())
} }
} else {
log.Println("E! Error parsing log line: " + err.Error())
} }
} }
} }

View File

@ -9,8 +9,6 @@ import (
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/influxdata/telegraf/plugins/inputs/logparser/grok"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -26,15 +24,14 @@ func TestStartNoParsers(t *testing.T) {
func TestGrokParseLogFilesNonExistPattern(t *testing.T) { func TestGrokParseLogFilesNonExistPattern(t *testing.T) {
thisdir := getCurrentDir() thisdir := getCurrentDir()
p := &grok.Parser{
Patterns: []string{"%{FOOBAR}"},
CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"},
}
logparser := &LogParserPlugin{ logparser := &LogParserPlugin{
FromBeginning: true, FromBeginning: true,
Files: []string{thisdir + "grok/testdata/*.log"}, Files: []string{thisdir + "grok/testdata/*.log"},
GrokParser: p, GrokConfig: GrokConfig{
Patterns: []string{"%{FOOBAR}"},
CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"},
},
} }
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
@ -44,20 +41,19 @@ func TestGrokParseLogFilesNonExistPattern(t *testing.T) {
func TestGrokParseLogFiles(t *testing.T) { func TestGrokParseLogFiles(t *testing.T) {
thisdir := getCurrentDir() thisdir := getCurrentDir()
p := &grok.Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"},
}
logparser := &LogParserPlugin{ logparser := &LogParserPlugin{
GrokConfig: GrokConfig{
MeasurementName: "logparser_grok",
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"},
},
FromBeginning: true, FromBeginning: true,
Files: []string{thisdir + "grok/testdata/*.log"}, Files: []string{thisdir + "grok/testdata/*.log"},
GrokParser: p,
} }
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
assert.NoError(t, logparser.Start(&acc)) assert.NoError(t, logparser.Start(&acc))
acc.Wait(2) acc.Wait(2)
logparser.Stop() logparser.Stop()
@ -91,15 +87,15 @@ func TestGrokParseLogFilesAppearLater(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
thisdir := getCurrentDir() thisdir := getCurrentDir()
p := &grok.Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"},
}
logparser := &LogParserPlugin{ logparser := &LogParserPlugin{
FromBeginning: true, FromBeginning: true,
Files: []string{emptydir + "/*.log"}, Files: []string{emptydir + "/*.log"},
GrokParser: p, GrokConfig: GrokConfig{
MeasurementName: "logparser_grok",
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"},
},
} }
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
@ -130,16 +126,15 @@ func TestGrokParseLogFilesAppearLater(t *testing.T) {
// pattern available for test_b.log // pattern available for test_b.log
func TestGrokParseLogFilesOneBad(t *testing.T) { func TestGrokParseLogFilesOneBad(t *testing.T) {
thisdir := getCurrentDir() thisdir := getCurrentDir()
p := &grok.Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_BAD}"},
CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"},
}
assert.NoError(t, p.Compile())
logparser := &LogParserPlugin{ logparser := &LogParserPlugin{
FromBeginning: true, FromBeginning: true,
Files: []string{thisdir + "grok/testdata/test_a.log"}, Files: []string{thisdir + "grok/testdata/test_a.log"},
GrokParser: p, GrokConfig: GrokConfig{
MeasurementName: "logparser_grok",
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_BAD}"},
CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"},
},
} }
acc := testutil.Accumulator{} acc := testutil.Accumulator{}

View File

@ -68,10 +68,11 @@ type Parser struct {
// specified by the user in Patterns. // specified by the user in Patterns.
// They will look like: // They will look like:
// GROK_INTERNAL_PATTERN_0, GROK_INTERNAL_PATTERN_1, etc. // GROK_INTERNAL_PATTERN_0, GROK_INTERNAL_PATTERN_1, etc.
namedPatterns []string NamedPatterns []string
CustomPatterns string CustomPatterns string
CustomPatternFiles []string CustomPatternFiles []string
Measurement string Measurement string
DefaultTags map[string]string
// Timezone is an optional component to help render log dates to // Timezone is an optional component to help render log dates to
// your chosen zone. // your chosen zone.
@ -133,7 +134,7 @@ func (p *Parser) Compile() error {
// Give Patterns fake names so that they can be treated as named // Give Patterns fake names so that they can be treated as named
// "custom patterns" // "custom patterns"
p.namedPatterns = make([]string, 0, len(p.Patterns)) p.NamedPatterns = make([]string, 0, len(p.Patterns))
for i, pattern := range p.Patterns { for i, pattern := range p.Patterns {
pattern = strings.TrimSpace(pattern) pattern = strings.TrimSpace(pattern)
if pattern == "" { if pattern == "" {
@ -141,10 +142,10 @@ func (p *Parser) Compile() error {
} }
name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i) name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i)
p.CustomPatterns += "\n" + name + " " + pattern + "\n" p.CustomPatterns += "\n" + name + " " + pattern + "\n"
p.namedPatterns = append(p.namedPatterns, "%{"+name+"}") p.NamedPatterns = append(p.NamedPatterns, "%{"+name+"}")
} }
if len(p.namedPatterns) == 0 { if len(p.NamedPatterns) == 0 {
return fmt.Errorf("pattern required") return fmt.Errorf("pattern required")
} }
@ -167,10 +168,6 @@ func (p *Parser) Compile() error {
p.addCustomPatterns(scanner) p.addCustomPatterns(scanner)
} }
if p.Measurement == "" {
p.Measurement = "logparser_grok"
}
p.loc, err = time.LoadLocation(p.Timezone) p.loc, err = time.LoadLocation(p.Timezone)
if err != nil { if err != nil {
log.Printf("W! improper timezone supplied (%s), setting loc to UTC", p.Timezone) log.Printf("W! improper timezone supplied (%s), setting loc to UTC", p.Timezone)
@ -191,7 +188,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
var values map[string]string var values map[string]string
// the matching pattern string // the matching pattern string
var patternName string var patternName string
for _, pattern := range p.namedPatterns { for _, pattern := range p.NamedPatterns {
if values, err = p.g.Parse(pattern, line); err != nil { if values, err = p.g.Parse(pattern, line); err != nil {
return nil, err return nil, err
} }
@ -208,6 +205,12 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
fields := make(map[string]interface{}) fields := make(map[string]interface{})
tags := make(map[string]string) tags := make(map[string]string)
//add default tags
for k, v := range p.DefaultTags {
tags[k] = v
}
timestamp := time.Now() timestamp := time.Now()
for k, v := range values { for k, v := range values {
if k == "" || v == "" { if k == "" || v == "" {
@ -335,9 +338,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
case DROP: case DROP:
// goodbye! // goodbye!
default: default:
// Replace commas with dot character
v = strings.Replace(v, ",", ".", -1) v = strings.Replace(v, ",", ".", -1)
ts, err := time.ParseInLocation(t, v, p.loc) ts, err := time.ParseInLocation(t, v, p.loc)
if err == nil { if err == nil {
timestamp = ts timestamp = ts
@ -354,6 +355,29 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
return metric.New(p.Measurement, tags, fields, p.tsModder.tsMod(timestamp)) return metric.New(p.Measurement, tags, fields, p.tsModder.tsMod(timestamp))
} }
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
scanner := bufio.NewScanner(strings.NewReader(string(buf)))
var lines []string
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
var metrics []telegraf.Metric
for _, line := range lines {
m, err := p.ParseLine(line)
if err != nil {
return nil, err
}
metrics = append(metrics, m)
}
return metrics, nil
}
func (p *Parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}
func (p *Parser) addCustomPatterns(scanner *bufio.Scanner) { func (p *Parser) addCustomPatterns(scanner *bufio.Scanner) {
for scanner.Scan() { for scanner.Scan() {
line := strings.TrimSpace(scanner.Text()) line := strings.TrimSpace(scanner.Text())

View File

@ -4,79 +4,18 @@ import (
"testing" "testing"
"time" "time"
"github.com/influxdata/telegraf"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
var benchM telegraf.Metric func TestGrokParse(t *testing.T) {
parser := Parser{
func Benchmark_ParseLine_CommonLogFormat(b *testing.B) { Measurement: "t_met",
p := &Parser{ Patterns: []string{"%{COMMON_LOG_FORMAT}"},
Patterns: []string{"%{COMMON_LOG_FORMAT}"},
} }
_ = p.Compile() parser.Compile()
_, err := parser.Parse([]byte(`127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326`))
var m telegraf.Metric
for n := 0; n < b.N; n++ {
m, _ = p.ParseLine(`127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326`)
}
benchM = m
}
func Benchmark_ParseLine_CombinedLogFormat(b *testing.B) {
p := &Parser{
Patterns: []string{"%{COMBINED_LOG_FORMAT}"},
}
_ = p.Compile()
var m telegraf.Metric
for n := 0; n < b.N; n++ {
m, _ = p.ParseLine(`127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326 "-" "Mozilla"`)
}
benchM = m
}
func Benchmark_ParseLine_CustomPattern(b *testing.B) {
p := &Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatterns: `
DURATION %{NUMBER}[nuµm]?s
RESPONSE_CODE %{NUMBER:response_code:tag}
RESPONSE_TIME %{DURATION:response_time:duration}
TEST_LOG_A %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME}
`,
}
_ = p.Compile()
var m telegraf.Metric
for n := 0; n < b.N; n++ {
m, _ = p.ParseLine(`[04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432µs 101`)
}
benchM = m
}
// Test a very simple parse pattern.
func TestSimpleParse(t *testing.T) {
p := &Parser{
Patterns: []string{"%{TESTLOG}"},
CustomPatterns: `
TESTLOG %{NUMBER:num:int} %{WORD:client}
`,
}
assert.NoError(t, p.Compile())
m, err := p.ParseLine(`142 bot`)
assert.NoError(t, err) assert.NoError(t, err)
require.NotNil(t, m)
assert.Equal(t,
map[string]interface{}{
"num": int64(142),
"client": "bot",
},
m.Fields())
} }
// Verify that patterns with a regex lookahead fail at compile time. // Verify that patterns with a regex lookahead fail at compile time.
@ -96,8 +35,7 @@ func TestParsePatternsWithLookahead(t *testing.T) {
func TestMeasurementName(t *testing.T) { func TestMeasurementName(t *testing.T) {
p := &Parser{ p := &Parser{
Measurement: "my_web_log", Patterns: []string{"%{COMMON_LOG_FORMAT}"},
Patterns: []string{"%{COMMON_LOG_FORMAT}"},
} }
assert.NoError(t, p.Compile()) assert.NoError(t, p.Compile())
@ -116,13 +54,11 @@ func TestMeasurementName(t *testing.T) {
}, },
m.Fields()) m.Fields())
assert.Equal(t, map[string]string{"verb": "GET", "resp_code": "200"}, m.Tags()) assert.Equal(t, map[string]string{"verb": "GET", "resp_code": "200"}, m.Tags())
assert.Equal(t, "my_web_log", m.Name())
} }
func TestCLF_IPv6(t *testing.T) { func TestCLF_IPv6(t *testing.T) {
p := &Parser{ p := &Parser{
Measurement: "my_web_log", Patterns: []string{"%{COMMON_LOG_FORMAT}"},
Patterns: []string{"%{COMMON_LOG_FORMAT}"},
} }
assert.NoError(t, p.Compile()) assert.NoError(t, p.Compile())
@ -140,7 +76,6 @@ func TestCLF_IPv6(t *testing.T) {
}, },
m.Fields()) m.Fields())
assert.Equal(t, map[string]string{"verb": "GET", "resp_code": "200"}, m.Tags()) assert.Equal(t, map[string]string{"verb": "GET", "resp_code": "200"}, m.Tags())
assert.Equal(t, "my_web_log", m.Name())
m, err = p.ParseLine(`::1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326`) m, err = p.ParseLine(`::1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326`)
require.NotNil(t, m) require.NotNil(t, m)
@ -156,7 +91,6 @@ func TestCLF_IPv6(t *testing.T) {
}, },
m.Fields()) m.Fields())
assert.Equal(t, map[string]string{"verb": "GET", "resp_code": "200"}, m.Tags()) assert.Equal(t, map[string]string{"verb": "GET", "resp_code": "200"}, m.Tags())
assert.Equal(t, "my_web_log", m.Name())
} }
func TestCustomInfluxdbHttpd(t *testing.T) { func TestCustomInfluxdbHttpd(t *testing.T) {

BIN
plugins/parsers/grok/testdata/.DS_Store vendored Normal file

Binary file not shown.

View File

@ -0,0 +1,14 @@
# Test A log line:
# [04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432µs 101
DURATION %{NUMBER}[nuµm]?s
RESPONSE_CODE %{NUMBER:response_code:tag}
RESPONSE_TIME %{DURATION:response_time:duration}
TEST_LOG_A \[%{HTTPDATE:timestamp:ts-httpd}\] %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME} %{NUMBER:myint:int}
# Test B log line:
# [04/06/2016--12:41:45] 1.25 mystring dropme nomodifier
TEST_TIMESTAMP %{MONTHDAY}/%{MONTHNUM}/%{YEAR}--%{TIME}
TEST_LOG_B \[%{TEST_TIMESTAMP:timestamp:ts-"02/01/2006--15:04:05"}\] %{NUMBER:myfloat:float} %{WORD:mystring:string} %{WORD:dropme:drop} %{WORD:nomodifier}
TEST_TIMESTAMP %{MONTHDAY}/%{MONTHNUM}/%{YEAR}--%{TIME}
TEST_LOG_BAD \[%{TEST_TIMESTAMP:timestamp:ts-"02/01/2006--15:04:05"}\] %{NUMBER:myfloat:float} %{WORD:mystring:int} %{WORD:dropme:drop} %{WORD:nomodifier}

View File

@ -0,0 +1 @@
[04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432µs 101

View File

@ -0,0 +1 @@
[04/06/2016--12:41:45] 1.25 mystring dropme nomodifier

View File

@ -8,6 +8,7 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/collectd" "github.com/influxdata/telegraf/plugins/parsers/collectd"
"github.com/influxdata/telegraf/plugins/parsers/dropwizard" "github.com/influxdata/telegraf/plugins/parsers/dropwizard"
"github.com/influxdata/telegraf/plugins/parsers/graphite" "github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/grok"
"github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/parsers/nagios" "github.com/influxdata/telegraf/plugins/parsers/nagios"
@ -90,6 +91,13 @@ type Config struct {
// an optional map containing tag names as keys and json paths to retrieve the tag values from as values // an optional map containing tag names as keys and json paths to retrieve the tag values from as values
// used if TagsPath is empty or doesn't return any tags // used if TagsPath is empty or doesn't return any tags
DropwizardTagPathsMap map[string]string DropwizardTagPathsMap map[string]string
//grok patterns
GrokPatterns []string
GrokNamedPatterns []string
GrokCustomPatterns string
GrokCustomPatternFiles []string
GrokTimeZone string
} }
// NewParser returns a Parser interface based on the given config. // NewParser returns a Parser interface based on the given config.
@ -123,12 +131,38 @@ func NewParser(config *Config) (Parser, error) {
config.DefaultTags, config.DefaultTags,
config.Separator, config.Separator,
config.Templates) config.Templates)
case "grok":
parser, err = newGrokParser(
config.MetricName,
config.GrokPatterns,
config.GrokNamedPatterns,
config.GrokCustomPatterns,
config.GrokCustomPatternFiles,
config.GrokTimeZone)
default: default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat) err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
} }
return parser, err return parser, err
} }
func newGrokParser(metricName string,
patterns []string,
nPatterns []string,
cPatterns string,
cPatternFiles []string, tZone string) (Parser, error) {
parser := grok.Parser{
Measurement: metricName,
Patterns: patterns,
NamedPatterns: nPatterns,
CustomPatterns: cPatterns,
CustomPatternFiles: cPatternFiles,
Timezone: tZone,
}
err := parser.Compile()
return &parser, err
}
func NewJSONParser( func NewJSONParser(
metricName string, metricName string,
tagKeys []string, tagKeys []string,