From dabb6f54663ca16f8c62d0f725fc3e302b98e87d Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 18 Jul 2016 14:44:25 +0100 Subject: [PATCH] Internally name all patterns for log parsing flexibility closes #1436 This also fixes the bad behavior of waiting until runtime to return log parsing pattern compile errors when a pattern was simply unfound. closes #1418 Also protect against user error when the telegraf user does not have permission to open the provided file. We will now error and exit in this case, rather than silently waiting to get permission to open it. --- CHANGELOG.md | 2 ++ plugins/inputs/logparser/grok/grok.go | 22 ++++++++++-- plugins/inputs/logparser/grok/grok_test.go | 39 ++++++++++++++++++++-- plugins/inputs/logparser/logparser.go | 33 +++++++++--------- plugins/inputs/logparser/logparser_test.go | 7 ++-- plugins/inputs/tail/tail.go | 7 ++-- 6 files changed, 84 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a0f0cca16..99e8ffe56 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -63,6 +63,8 @@ should now look like: - [#1460](https://github.com/influxdata/telegraf/issues/1460): Remove PID from procstat plugin to fix cardinality issues. - [#1427](https://github.com/influxdata/telegraf/issues/1427): Cassandra input: version 2.x "column family" fix. - [#1463](https://github.com/influxdata/telegraf/issues/1463): Shared WaitGroup in Exec plugin +- [#1436](https://github.com/influxdata/telegraf/issues/1436): logparser: honor modifiers in "pattern" config. +- [#1418](https://github.com/influxdata/telegraf/issues/1418): logparser: error and exit on file permissions/missing errors. ## v1.0 beta 2 [2016-06-21] diff --git a/plugins/inputs/logparser/grok/grok.go b/plugins/inputs/logparser/grok/grok.go index 16e62b223..d8691d7b9 100644 --- a/plugins/inputs/logparser/grok/grok.go +++ b/plugins/inputs/logparser/grok/grok.go @@ -53,7 +53,12 @@ var ( ) type Parser struct { - Patterns []string + Patterns []string + // namedPatterns is a list of internally-assigned names to the patterns + // specified by the user in Patterns. + // They will look like: + // GROK_INTERNAL_PATTERN_0, GROK_INTERNAL_PATTERN_1, etc. + namedPatterns []string CustomPatterns string CustomPatternFiles []string Measurement string @@ -98,13 +103,24 @@ func (p *Parser) Compile() error { return err } - p.CustomPatterns = DEFAULT_PATTERNS + p.CustomPatterns + // Give Patterns fake names so that they can be treated as named + // "custom patterns" + p.namedPatterns = make([]string, len(p.Patterns)) + for i, pattern := range p.Patterns { + name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i) + p.CustomPatterns += "\n" + name + " " + pattern + "\n" + p.namedPatterns[i] = "%{" + name + "}" + } + // Combine user-supplied CustomPatterns with DEFAULT_PATTERNS and parse + // them together as the same type of pattern. + p.CustomPatterns = DEFAULT_PATTERNS + p.CustomPatterns if len(p.CustomPatterns) != 0 { scanner := bufio.NewScanner(strings.NewReader(p.CustomPatterns)) p.addCustomPatterns(scanner) } + // Parse any custom pattern files supplied. for _, filename := range p.CustomPatternFiles { file, err := os.Open(filename) if err != nil { @@ -127,7 +143,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { var values map[string]string // the matching pattern string var patternName string - for _, pattern := range p.Patterns { + for _, pattern := range p.namedPatterns { if values, err = p.g.Parse(pattern, line); err != nil { return nil, err } diff --git a/plugins/inputs/logparser/grok/grok_test.go b/plugins/inputs/logparser/grok/grok_test.go index 1181e85ae..295f32609 100644 --- a/plugins/inputs/logparser/grok/grok_test.go +++ b/plugins/inputs/logparser/grok/grok_test.go @@ -207,7 +207,7 @@ func TestBuiltinCombinedLogFormat(t *testing.T) { func TestCompileStringAndParse(t *testing.T) { p := &Parser{ - Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"}, + Patterns: []string{"%{TEST_LOG_A}"}, CustomPatterns: ` DURATION %{NUMBER}[nuµm]?s RESPONSE_CODE %{NUMBER:response_code:tag} @@ -230,6 +230,41 @@ func TestCompileStringAndParse(t *testing.T) { assert.Equal(t, map[string]string{"response_code": "200"}, metricA.Tags()) } +func TestCompileErrorsOnInvalidPattern(t *testing.T) { + p := &Parser{ + Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"}, + CustomPatterns: ` + DURATION %{NUMBER}[nuµm]?s + RESPONSE_CODE %{NUMBER:response_code:tag} + RESPONSE_TIME %{DURATION:response_time:duration} + TEST_LOG_A %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME} + `, + } + assert.Error(t, p.Compile()) + + metricA, _ := p.ParseLine(`1.25 200 192.168.1.1 5.432µs`) + require.Nil(t, metricA) +} + +func TestParsePatternsWithoutCustom(t *testing.T) { + p := &Parser{ + Patterns: []string{"%{POSINT:ts:ts-epochnano} response_time=%{POSINT:response_time:int} mymetric=%{NUMBER:metric:float}"}, + } + assert.NoError(t, p.Compile()) + + metricA, err := p.ParseLine(`1466004605359052000 response_time=20821 mymetric=10890.645`) + require.NotNil(t, metricA) + assert.NoError(t, err) + assert.Equal(t, + map[string]interface{}{ + "response_time": int64(20821), + "metric": float64(10890.645), + }, + metricA.Fields()) + assert.Equal(t, map[string]string{}, metricA.Tags()) + assert.Equal(t, time.Unix(0, 1466004605359052000), metricA.Time()) +} + func TestParseEpochNano(t *testing.T) { p := &Parser{ Patterns: []string{"%{MYAPP}"}, @@ -413,7 +448,7 @@ func TestParseErrors(t *testing.T) { TEST_LOG_A %{HTTPDATE:ts:ts-httpd} %{WORD:myword:int} %{} `, } - assert.NoError(t, p.Compile()) + assert.Error(t, p.Compile()) _, err := p.ParseLine(`[04/Jun/2016:12:41:45 +0100] notnumber 200 192.168.1.1 5.432µs 101`) assert.Error(t, err) diff --git a/plugins/inputs/logparser/logparser.go b/plugins/inputs/logparser/logparser.go index 4737ace65..6b29ea031 100644 --- a/plugins/inputs/logparser/logparser.go +++ b/plugins/inputs/logparser/logparser.go @@ -9,6 +9,7 @@ import ( "github.com/hpcloud/tail" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/internal/globpath" "github.com/influxdata/telegraf/plugins/inputs" @@ -110,11 +111,15 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { } // compile log parser patterns: + errChan := errchan.New(len(l.parsers)) for _, parser := range l.parsers { if err := parser.Compile(); err != nil { - return err + errChan.C <- err } } + if err := errChan.Error(); err != nil { + return err + } var seek tail.SeekInfo if !l.FromBeginning { @@ -125,24 +130,25 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { l.wg.Add(1) go l.parser() - var errS string // Create a "tailer" for each file for _, filepath := range l.Files { g, err := globpath.Compile(filepath) if err != nil { log.Printf("ERROR Glob %s failed to compile, %s", filepath, err) + continue } - for file, _ := range g.Match() { + files := g.Match() + errChan = errchan.New(len(files)) + for file, _ := range files { tailer, err := tail.TailFile(file, tail.Config{ - ReOpen: true, - Follow: true, - Location: &seek, + ReOpen: true, + Follow: true, + Location: &seek, + MustExist: true, }) - if err != nil { - errS += err.Error() + " " - continue - } + errChan.C <- err + // create a goroutine for each "tailer" l.wg.Add(1) go l.receiver(tailer) @@ -150,10 +156,7 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { } } - if errS != "" { - return fmt.Errorf(errS) - } - return nil + return errChan.Error() } // receiver is launched as a goroutine to continuously watch a tailed logfile @@ -201,8 +204,6 @@ func (l *LogParserPlugin) parser() { if m != nil { l.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) } - } else { - log.Printf("Malformed log line in [%s], Error: %s\n", line, err) } } } diff --git a/plugins/inputs/logparser/logparser_test.go b/plugins/inputs/logparser/logparser_test.go index 095b627ef..97f33067e 100644 --- a/plugins/inputs/logparser/logparser_test.go +++ b/plugins/inputs/logparser/logparser_test.go @@ -37,7 +37,7 @@ func TestGrokParseLogFilesNonExistPattern(t *testing.T) { } acc := testutil.Accumulator{} - assert.NoError(t, logparser.Start(&acc)) + assert.Error(t, logparser.Start(&acc)) time.Sleep(time.Millisecond * 500) logparser.Stop() @@ -80,6 +80,8 @@ func TestGrokParseLogFiles(t *testing.T) { map[string]string{}) } +// Test that test_a.log line gets parsed even though we don't have the correct +// pattern available for test_b.log func TestGrokParseLogFilesOneBad(t *testing.T) { thisdir := getCurrentDir() p := &grok.Parser{ @@ -90,11 +92,12 @@ func TestGrokParseLogFilesOneBad(t *testing.T) { logparser := &LogParserPlugin{ FromBeginning: true, - Files: []string{thisdir + "grok/testdata/*.log"}, + Files: []string{thisdir + "grok/testdata/test_a.log"}, GrokParser: p, } acc := testutil.Accumulator{} + acc.SetDebug(true) assert.NoError(t, logparser.Start(&acc)) time.Sleep(time.Millisecond * 500) diff --git a/plugins/inputs/tail/tail.go b/plugins/inputs/tail/tail.go index 7386e053d..942fd6bae 100644 --- a/plugins/inputs/tail/tail.go +++ b/plugins/inputs/tail/tail.go @@ -86,9 +86,10 @@ func (t *Tail) Start(acc telegraf.Accumulator) error { for file, _ := range g.Match() { tailer, err := tail.TailFile(file, tail.Config{ - ReOpen: true, - Follow: true, - Location: &seek, + ReOpen: true, + Follow: true, + Location: &seek, + MustExist: true, }) if err != nil { errS += err.Error() + " "