Add timezone support to logparser timestamps (#2882)

This commit is contained in:
Sebastian Borza 2017-06-05 16:45:11 -05:00 committed by Daniel Nelson
parent a47e6e6efe
commit 035905d65e
6 changed files with 259 additions and 16 deletions

View File

@ -15,7 +15,10 @@ regex patterns.
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
## /var/log/apache.log -> only tail the apache log file ## /var/log/apache.log -> only tail the apache log file
files = ["/var/log/apache/access.log"] files = ["/var/log/apache/access.log"]
## Read file from beginning.
## Read files that currently exist from the beginning. Files that are created
## while telegraf is running (and that match the "files" globs) will always
## be read from the beginning.
from_beginning = false from_beginning = false
## Parse logstash-style "grok" patterns: ## Parse logstash-style "grok" patterns:
@ -28,13 +31,27 @@ regex patterns.
## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs) ## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs)
## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent) ## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent)
patterns = ["%{COMBINED_LOG_FORMAT}"] patterns = ["%{COMBINED_LOG_FORMAT}"]
## Name of the outputted measurement name. ## Name of the outputted measurement name.
measurement = "apache_access_log" measurement = "apache_access_log"
## Full path(s) to custom pattern files. ## Full path(s) to custom pattern files.
custom_pattern_files = [] custom_pattern_files = []
## Custom patterns can also be defined here. Put one pattern per line. ## Custom patterns can also be defined here. Put one pattern per line.
custom_patterns = ''' custom_patterns = '''
''' '''
## Timezone allows you to provide an override for timestamps that
## don't already include an offset
## e.g. 04/06/2016 12:41:45 data one two 5.43µs
##
## Default: "" which renders UTC
## Options are as follows:
## 1. Local -- interpret based on machine localtime
## 2. "Canada/Eastern" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
## 3. UTC -- or blank/unspecified, will return timestamp in UTC
timezone = "Canada/Eastern"
``` ```
### Grok Parser ### Grok Parser
@ -125,6 +142,13 @@ Wed Apr 12 13:10:34 PST 2017 value=42
''' '''
``` ```
For cases where the timestamp itself is without offset, the `timezone` config var is available
to denote an offset. By default (with `timezone` either omit, blank or set to `"UTC"`), the times
are processed as if in the UTC timezone. If specified as `timezone = "Local"`, the timestamp
will be processed based on the current machine timezone configuration. Lastly, if using a
timezone from the list of Unix [timezones](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones), the logparser grok will attempt to offset
the timestamp accordingly. See test cases for more detailed examples.
#### TOML Escaping #### TOML Escaping
When saving patterns to the configuration file, keep in mind the different TOML When saving patterns to the configuration file, keep in mind the different TOML

View File

@ -59,6 +59,7 @@ var (
patternOnlyRe = regexp.MustCompile(`%{(\w+)}`) patternOnlyRe = regexp.MustCompile(`%{(\w+)}`)
) )
// Parser is the primary struct to handle and grok-patterns defined in the config toml
type Parser struct { type Parser struct {
Patterns []string Patterns []string
// namedPatterns is a list of internally-assigned names to the patterns // namedPatterns is a list of internally-assigned names to the patterns
@ -70,6 +71,16 @@ type Parser struct {
CustomPatternFiles []string CustomPatternFiles []string
Measurement string Measurement string
// Timezone is an optional component to help render log dates to
// your chosen zone.
// Default: "" which renders UTC
// Options are as follows:
// 1. Local -- interpret based on machine localtime
// 2. "America/Chicago" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
// 3. UTC -- or blank/unspecified, will return timestamp in UTC
Timezone string
loc *time.Location
// typeMap is a map of patterns -> capture name -> modifier, // typeMap is a map of patterns -> capture name -> modifier,
// ie, { // ie, {
// "%{TESTLOG}": // "%{TESTLOG}":
@ -105,6 +116,7 @@ type Parser struct {
tsModder *tsModder tsModder *tsModder
} }
// Compile is a bound method to Parser which will process the options for our parser
func (p *Parser) Compile() error { func (p *Parser) Compile() error {
p.typeMap = make(map[string]map[string]string) p.typeMap = make(map[string]map[string]string)
p.tsMap = make(map[string]map[string]string) p.tsMap = make(map[string]map[string]string)
@ -142,9 +154,9 @@ func (p *Parser) Compile() error {
// Parse any custom pattern files supplied. // Parse any custom pattern files supplied.
for _, filename := range p.CustomPatternFiles { for _, filename := range p.CustomPatternFiles {
file, err := os.Open(filename) file, fileErr := os.Open(filename)
if err != nil { if fileErr != nil {
return err return fileErr
} }
scanner := bufio.NewScanner(bufio.NewReader(file)) scanner := bufio.NewScanner(bufio.NewReader(file))
@ -155,9 +167,16 @@ func (p *Parser) Compile() error {
p.Measurement = "logparser_grok" p.Measurement = "logparser_grok"
} }
p.loc, err = time.LoadLocation(p.Timezone)
if err != nil {
log.Printf("W! improper timezone supplied (%s), setting loc to UTC", p.Timezone)
p.loc, _ = time.LoadLocation("UTC")
}
return p.compileCustomPatterns() return p.compileCustomPatterns()
} }
// ParseLine is the primary function to process individual lines, returning the metrics
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
var err error var err error
// values are the parsed fields from the log line // values are the parsed fields from the log line
@ -251,7 +270,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
var foundTs bool var foundTs bool
// first try timestamp layouts that we've already found // first try timestamp layouts that we've already found
for _, layout := range p.foundTsLayouts { for _, layout := range p.foundTsLayouts {
ts, err := time.Parse(layout, v) ts, err := time.ParseInLocation(layout, v, p.loc)
if err == nil { if err == nil {
timestamp = ts timestamp = ts
foundTs = true foundTs = true
@ -262,7 +281,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
// layouts. // layouts.
if !foundTs { if !foundTs {
for _, layout := range timeLayouts { for _, layout := range timeLayouts {
ts, err := time.Parse(layout, v) ts, err := time.ParseInLocation(layout, v, p.loc)
if err == nil { if err == nil {
timestamp = ts timestamp = ts
foundTs = true foundTs = true
@ -280,7 +299,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
case DROP: case DROP:
// goodbye! // goodbye!
default: default:
ts, err := time.Parse(t, v) ts, err := time.ParseInLocation(t, v, p.loc)
if err == nil { if err == nil {
timestamp = ts timestamp = ts
} else { } else {

View File

@ -16,7 +16,7 @@ func Benchmark_ParseLine_CommonLogFormat(b *testing.B) {
p := &Parser{ p := &Parser{
Patterns: []string{"%{COMMON_LOG_FORMAT}"}, Patterns: []string{"%{COMMON_LOG_FORMAT}"},
} }
p.Compile() _ = p.Compile()
var m telegraf.Metric var m telegraf.Metric
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
@ -29,7 +29,7 @@ func Benchmark_ParseLine_CombinedLogFormat(b *testing.B) {
p := &Parser{ p := &Parser{
Patterns: []string{"%{COMBINED_LOG_FORMAT}"}, Patterns: []string{"%{COMBINED_LOG_FORMAT}"},
} }
p.Compile() _ = p.Compile()
var m telegraf.Metric var m telegraf.Metric
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
@ -48,7 +48,7 @@ func Benchmark_ParseLine_CustomPattern(b *testing.B) {
TEST_LOG_A %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME} TEST_LOG_A %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME}
`, `,
} }
p.Compile() _ = p.Compile()
var m telegraf.Metric var m telegraf.Metric
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
@ -707,3 +707,183 @@ func TestShortPatternRegression(t *testing.T) {
}, },
metric.Fields()) metric.Fields())
} }
func TestTimezoneEmptyCompileFileAndParse(t *testing.T) {
p := &Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatternFiles: []string{"./testdata/test-patterns"},
Timezone: "",
}
assert.NoError(t, p.Compile())
metricA, err := p.ParseLine(`[04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432µs 101`)
require.NotNil(t, metricA)
assert.NoError(t, err)
assert.Equal(t,
map[string]interface{}{
"clientip": "192.168.1.1",
"myfloat": float64(1.25),
"response_time": int64(5432),
"myint": int64(101),
},
metricA.Fields())
assert.Equal(t, map[string]string{"response_code": "200"}, metricA.Tags())
assert.Equal(t, int64(1465040505000000000), metricA.UnixNano())
metricB, err := p.ParseLine(`[04/06/2016--12:41:45] 1.25 mystring dropme nomodifier`)
require.NotNil(t, metricB)
assert.NoError(t, err)
assert.Equal(t,
map[string]interface{}{
"myfloat": 1.25,
"mystring": "mystring",
"nomodifier": "nomodifier",
},
metricB.Fields())
assert.Equal(t, map[string]string{}, metricB.Tags())
assert.Equal(t, int64(1465044105000000000), metricB.UnixNano())
}
func TestTimezoneMalformedCompileFileAndParse(t *testing.T) {
p := &Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatternFiles: []string{"./testdata/test-patterns"},
Timezone: "Something/Weird",
}
assert.NoError(t, p.Compile())
metricA, err := p.ParseLine(`[04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432µs 101`)
require.NotNil(t, metricA)
assert.NoError(t, err)
assert.Equal(t,
map[string]interface{}{
"clientip": "192.168.1.1",
"myfloat": float64(1.25),
"response_time": int64(5432),
"myint": int64(101),
},
metricA.Fields())
assert.Equal(t, map[string]string{"response_code": "200"}, metricA.Tags())
assert.Equal(t, int64(1465040505000000000), metricA.UnixNano())
metricB, err := p.ParseLine(`[04/06/2016--12:41:45] 1.25 mystring dropme nomodifier`)
require.NotNil(t, metricB)
assert.NoError(t, err)
assert.Equal(t,
map[string]interface{}{
"myfloat": 1.25,
"mystring": "mystring",
"nomodifier": "nomodifier",
},
metricB.Fields())
assert.Equal(t, map[string]string{}, metricB.Tags())
assert.Equal(t, int64(1465044105000000000), metricB.UnixNano())
}
func TestTimezoneEuropeCompileFileAndParse(t *testing.T) {
p := &Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatternFiles: []string{"./testdata/test-patterns"},
Timezone: "Europe/Berlin",
}
assert.NoError(t, p.Compile())
metricA, err := p.ParseLine(`[04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432µs 101`)
require.NotNil(t, metricA)
assert.NoError(t, err)
assert.Equal(t,
map[string]interface{}{
"clientip": "192.168.1.1",
"myfloat": float64(1.25),
"response_time": int64(5432),
"myint": int64(101),
},
metricA.Fields())
assert.Equal(t, map[string]string{"response_code": "200"}, metricA.Tags())
assert.Equal(t, int64(1465040505000000000), metricA.UnixNano())
metricB, err := p.ParseLine(`[04/06/2016--12:41:45] 1.25 mystring dropme nomodifier`)
require.NotNil(t, metricB)
assert.NoError(t, err)
assert.Equal(t,
map[string]interface{}{
"myfloat": 1.25,
"mystring": "mystring",
"nomodifier": "nomodifier",
},
metricB.Fields())
assert.Equal(t, map[string]string{}, metricB.Tags())
assert.Equal(t, int64(1465036905000000000), metricB.UnixNano())
}
func TestTimezoneAmericasCompileFileAndParse(t *testing.T) {
p := &Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatternFiles: []string{"./testdata/test-patterns"},
Timezone: "Canada/Eastern",
}
assert.NoError(t, p.Compile())
metricA, err := p.ParseLine(`[04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432µs 101`)
require.NotNil(t, metricA)
assert.NoError(t, err)
assert.Equal(t,
map[string]interface{}{
"clientip": "192.168.1.1",
"myfloat": float64(1.25),
"response_time": int64(5432),
"myint": int64(101),
},
metricA.Fields())
assert.Equal(t, map[string]string{"response_code": "200"}, metricA.Tags())
assert.Equal(t, int64(1465040505000000000), metricA.UnixNano())
metricB, err := p.ParseLine(`[04/06/2016--12:41:45] 1.25 mystring dropme nomodifier`)
require.NotNil(t, metricB)
assert.NoError(t, err)
assert.Equal(t,
map[string]interface{}{
"myfloat": 1.25,
"mystring": "mystring",
"nomodifier": "nomodifier",
},
metricB.Fields())
assert.Equal(t, map[string]string{}, metricB.Tags())
assert.Equal(t, int64(1465058505000000000), metricB.UnixNano())
}
func TestTimezoneLocalCompileFileAndParse(t *testing.T) {
p := &Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatternFiles: []string{"./testdata/test-patterns"},
Timezone: "Local",
}
assert.NoError(t, p.Compile())
metricA, err := p.ParseLine(`[04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432µs 101`)
require.NotNil(t, metricA)
assert.NoError(t, err)
assert.Equal(t,
map[string]interface{}{
"clientip": "192.168.1.1",
"myfloat": float64(1.25),
"response_time": int64(5432),
"myint": int64(101),
},
metricA.Fields())
assert.Equal(t, map[string]string{"response_code": "200"}, metricA.Tags())
assert.Equal(t, int64(1465040505000000000), metricA.UnixNano())
metricB, err := p.ParseLine(`[04/06/2016--12:41:45] 1.25 mystring dropme nomodifier`)
require.NotNil(t, metricB)
assert.NoError(t, err)
assert.Equal(t,
map[string]interface{}{
"myfloat": 1.25,
"mystring": "mystring",
"nomodifier": "nomodifier",
},
metricB.Fields())
assert.Equal(t, map[string]string{}, metricB.Tags())
assert.Equal(t, time.Date(2016, time.June, 4, 12, 41, 45, 0, time.Local).UnixNano(), metricB.UnixNano())
}

View File

@ -1,6 +1,6 @@
package grok package grok
// THIS SHOULD BE KEPT IN-SYNC WITH patterns/influx-patterns // DEFAULT_PATTERNS SHOULD BE KEPT IN-SYNC WITH patterns/influx-patterns
const DEFAULT_PATTERNS = ` const DEFAULT_PATTERNS = `
# Captures are a slightly modified version of logstash "grok" patterns, with # Captures are a slightly modified version of logstash "grok" patterns, with
# the format %{<capture syntax>[:<semantic name>][:<modifier>]} # the format %{<capture syntax>[:<semantic name>][:<modifier>]}

View File

@ -16,11 +16,13 @@ import (
"github.com/influxdata/telegraf/plugins/inputs/logparser/grok" "github.com/influxdata/telegraf/plugins/inputs/logparser/grok"
) )
// LogParser in the primary interface for the plugin
type LogParser interface { type LogParser interface {
ParseLine(line string) (telegraf.Metric, error) ParseLine(line string) (telegraf.Metric, error)
Compile() error Compile() error
} }
// LogParserPlugin is the primary struct to implement the interface for logparser plugin
type LogParserPlugin struct { type LogParserPlugin struct {
Files []string Files []string
FromBeginning bool FromBeginning bool
@ -45,6 +47,7 @@ const sampleConfig = `
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
## /var/log/apache.log -> only tail the apache log file ## /var/log/apache.log -> only tail the apache log file
files = ["/var/log/apache/access.log"] files = ["/var/log/apache/access.log"]
## Read files that currently exist from the beginning. Files that are created ## Read files that currently exist from the beginning. Files that are created
## while telegraf is running (and that match the "files" globs) will always ## while telegraf is running (and that match the "files" globs) will always
## be read from the beginning. ## be read from the beginning.
@ -60,23 +63,40 @@ const sampleConfig = `
## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs) ## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs)
## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent) ## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent)
patterns = ["%{COMBINED_LOG_FORMAT}"] patterns = ["%{COMBINED_LOG_FORMAT}"]
## Name of the outputted measurement name. ## Name of the outputted measurement name.
measurement = "apache_access_log" measurement = "apache_access_log"
## Full path(s) to custom pattern files. ## Full path(s) to custom pattern files.
custom_pattern_files = [] custom_pattern_files = []
## Custom patterns can also be defined here. Put one pattern per line. ## Custom patterns can also be defined here. Put one pattern per line.
custom_patterns = ''' custom_patterns = '''
## Timezone allows you to provide an override for timestamps that
## don't already include an offset
## e.g. 04/06/2016 12:41:45 data one two 5.43µs
##
## Default: "" which renders UTC
## Options are as follows:
## 1. Local -- interpret based on machine localtime
## 2. "Canada/Eastern" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
## 3. UTC -- or blank/unspecified, will return timestamp in UTC
timezone = "Canada/Eastern"
''' '''
` `
// SampleConfig returns the sample configuration for the plugin
func (l *LogParserPlugin) SampleConfig() string { func (l *LogParserPlugin) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Description returns the human readable description for the plugin
func (l *LogParserPlugin) Description() string { func (l *LogParserPlugin) Description() string {
return "Stream and parse log file(s)." return "Stream and parse log file(s)."
} }
// Gather is the primary function to collect the metrics for the plugin
func (l *LogParserPlugin) Gather(acc telegraf.Accumulator) error { func (l *LogParserPlugin) Gather(acc telegraf.Accumulator) error {
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
@ -85,6 +105,7 @@ func (l *LogParserPlugin) Gather(acc telegraf.Accumulator) error {
return l.tailNewfiles(true) return l.tailNewfiles(true)
} }
// Start kicks off collection of stats for the plugin
func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
@ -113,7 +134,7 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
} }
if len(l.parsers) == 0 { if len(l.parsers) == 0 {
return fmt.Errorf("ERROR: logparser input plugin: no parser defined.") return fmt.Errorf("logparser input plugin: no parser defined")
} }
// compile log parser patterns: // compile log parser patterns:
@ -147,7 +168,7 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
} }
files := g.Match() files := g.Match()
for file, _ := range files { for file := range files {
if _, ok := l.tailers[file]; ok { if _, ok := l.tailers[file]; ok {
// we're already tailing this file // we're already tailing this file
continue continue
@ -225,6 +246,7 @@ func (l *LogParserPlugin) parser() {
} }
} }
// Stop will end the metrics collection process on file tailers
func (l *LogParserPlugin) Stop() { func (l *LogParserPlugin) Stop() {
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()

View File

@ -102,9 +102,7 @@ func TestGrokParseLogFilesAppearLater(t *testing.T) {
assert.Equal(t, acc.NFields(), 0) assert.Equal(t, acc.NFields(), 0)
os.Symlink( _ = os.Symlink(thisdir+"grok/testdata/test_a.log", emptydir+"/test_a.log")
thisdir+"grok/testdata/test_a.log",
emptydir+"/test_a.log")
assert.NoError(t, acc.GatherError(logparser.Gather)) assert.NoError(t, acc.GatherError(logparser.Gather))
acc.Wait(1) acc.Wait(1)