Internally name all patterns for log parsing flexibility

closes #1436

This also fixes the bad behavior of waiting until runtime to return log
parsing pattern compile errors when a pattern was simply unfound.

closes #1418

Also protect against user error when the telegraf user does not have
permission to open the provided file. We will now error and exit in this
case, rather than silently waiting to get permission to open it.
This commit is contained in:
Cameron Sparr 2016-07-18 14:44:25 +01:00
parent 281a4d5500
commit dabb6f5466
6 changed files with 84 additions and 26 deletions

View File

@ -63,6 +63,8 @@ should now look like:
- [#1460](https://github.com/influxdata/telegraf/issues/1460): Remove PID from procstat plugin to fix cardinality issues.
- [#1427](https://github.com/influxdata/telegraf/issues/1427): Cassandra input: version 2.x "column family" fix.
- [#1463](https://github.com/influxdata/telegraf/issues/1463): Shared WaitGroup in Exec plugin
- [#1436](https://github.com/influxdata/telegraf/issues/1436): logparser: honor modifiers in "pattern" config.
- [#1418](https://github.com/influxdata/telegraf/issues/1418): logparser: error and exit on file permissions/missing errors.
## v1.0 beta 2 [2016-06-21]

View File

@ -53,7 +53,12 @@ var (
)
type Parser struct {
Patterns []string
Patterns []string
// namedPatterns is a list of internally-assigned names to the patterns
// specified by the user in Patterns.
// They will look like:
// GROK_INTERNAL_PATTERN_0, GROK_INTERNAL_PATTERN_1, etc.
namedPatterns []string
CustomPatterns string
CustomPatternFiles []string
Measurement string
@ -98,13 +103,24 @@ func (p *Parser) Compile() error {
return err
}
p.CustomPatterns = DEFAULT_PATTERNS + p.CustomPatterns
// Give Patterns fake names so that they can be treated as named
// "custom patterns"
p.namedPatterns = make([]string, len(p.Patterns))
for i, pattern := range p.Patterns {
name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i)
p.CustomPatterns += "\n" + name + " " + pattern + "\n"
p.namedPatterns[i] = "%{" + name + "}"
}
// Combine user-supplied CustomPatterns with DEFAULT_PATTERNS and parse
// them together as the same type of pattern.
p.CustomPatterns = DEFAULT_PATTERNS + p.CustomPatterns
if len(p.CustomPatterns) != 0 {
scanner := bufio.NewScanner(strings.NewReader(p.CustomPatterns))
p.addCustomPatterns(scanner)
}
// Parse any custom pattern files supplied.
for _, filename := range p.CustomPatternFiles {
file, err := os.Open(filename)
if err != nil {
@ -127,7 +143,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
var values map[string]string
// the matching pattern string
var patternName string
for _, pattern := range p.Patterns {
for _, pattern := range p.namedPatterns {
if values, err = p.g.Parse(pattern, line); err != nil {
return nil, err
}

View File

@ -207,7 +207,7 @@ func TestBuiltinCombinedLogFormat(t *testing.T) {
func TestCompileStringAndParse(t *testing.T) {
p := &Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
Patterns: []string{"%{TEST_LOG_A}"},
CustomPatterns: `
DURATION %{NUMBER}[nuµm]?s
RESPONSE_CODE %{NUMBER:response_code:tag}
@ -230,6 +230,41 @@ func TestCompileStringAndParse(t *testing.T) {
assert.Equal(t, map[string]string{"response_code": "200"}, metricA.Tags())
}
func TestCompileErrorsOnInvalidPattern(t *testing.T) {
p := &Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatterns: `
DURATION %{NUMBER}[nuµm]?s
RESPONSE_CODE %{NUMBER:response_code:tag}
RESPONSE_TIME %{DURATION:response_time:duration}
TEST_LOG_A %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME}
`,
}
assert.Error(t, p.Compile())
metricA, _ := p.ParseLine(`1.25 200 192.168.1.1 5.432µs`)
require.Nil(t, metricA)
}
func TestParsePatternsWithoutCustom(t *testing.T) {
p := &Parser{
Patterns: []string{"%{POSINT:ts:ts-epochnano} response_time=%{POSINT:response_time:int} mymetric=%{NUMBER:metric:float}"},
}
assert.NoError(t, p.Compile())
metricA, err := p.ParseLine(`1466004605359052000 response_time=20821 mymetric=10890.645`)
require.NotNil(t, metricA)
assert.NoError(t, err)
assert.Equal(t,
map[string]interface{}{
"response_time": int64(20821),
"metric": float64(10890.645),
},
metricA.Fields())
assert.Equal(t, map[string]string{}, metricA.Tags())
assert.Equal(t, time.Unix(0, 1466004605359052000), metricA.Time())
}
func TestParseEpochNano(t *testing.T) {
p := &Parser{
Patterns: []string{"%{MYAPP}"},
@ -413,7 +448,7 @@ func TestParseErrors(t *testing.T) {
TEST_LOG_A %{HTTPDATE:ts:ts-httpd} %{WORD:myword:int} %{}
`,
}
assert.NoError(t, p.Compile())
assert.Error(t, p.Compile())
_, err := p.ParseLine(`[04/Jun/2016:12:41:45 +0100] notnumber 200 192.168.1.1 5.432µs 101`)
assert.Error(t, err)

View File

@ -9,6 +9,7 @@ import (
"github.com/hpcloud/tail"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/inputs"
@ -110,11 +111,15 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
}
// compile log parser patterns:
errChan := errchan.New(len(l.parsers))
for _, parser := range l.parsers {
if err := parser.Compile(); err != nil {
return err
errChan.C <- err
}
}
if err := errChan.Error(); err != nil {
return err
}
var seek tail.SeekInfo
if !l.FromBeginning {
@ -125,24 +130,25 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
l.wg.Add(1)
go l.parser()
var errS string
// Create a "tailer" for each file
for _, filepath := range l.Files {
g, err := globpath.Compile(filepath)
if err != nil {
log.Printf("ERROR Glob %s failed to compile, %s", filepath, err)
continue
}
for file, _ := range g.Match() {
files := g.Match()
errChan = errchan.New(len(files))
for file, _ := range files {
tailer, err := tail.TailFile(file,
tail.Config{
ReOpen: true,
Follow: true,
Location: &seek,
ReOpen: true,
Follow: true,
Location: &seek,
MustExist: true,
})
if err != nil {
errS += err.Error() + " "
continue
}
errChan.C <- err
// create a goroutine for each "tailer"
l.wg.Add(1)
go l.receiver(tailer)
@ -150,10 +156,7 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
}
}
if errS != "" {
return fmt.Errorf(errS)
}
return nil
return errChan.Error()
}
// receiver is launched as a goroutine to continuously watch a tailed logfile
@ -201,8 +204,6 @@ func (l *LogParserPlugin) parser() {
if m != nil {
l.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
} else {
log.Printf("Malformed log line in [%s], Error: %s\n", line, err)
}
}
}

View File

@ -37,7 +37,7 @@ func TestGrokParseLogFilesNonExistPattern(t *testing.T) {
}
acc := testutil.Accumulator{}
assert.NoError(t, logparser.Start(&acc))
assert.Error(t, logparser.Start(&acc))
time.Sleep(time.Millisecond * 500)
logparser.Stop()
@ -80,6 +80,8 @@ func TestGrokParseLogFiles(t *testing.T) {
map[string]string{})
}
// Test that test_a.log line gets parsed even though we don't have the correct
// pattern available for test_b.log
func TestGrokParseLogFilesOneBad(t *testing.T) {
thisdir := getCurrentDir()
p := &grok.Parser{
@ -90,11 +92,12 @@ func TestGrokParseLogFilesOneBad(t *testing.T) {
logparser := &LogParserPlugin{
FromBeginning: true,
Files: []string{thisdir + "grok/testdata/*.log"},
Files: []string{thisdir + "grok/testdata/test_a.log"},
GrokParser: p,
}
acc := testutil.Accumulator{}
acc.SetDebug(true)
assert.NoError(t, logparser.Start(&acc))
time.Sleep(time.Millisecond * 500)

View File

@ -86,9 +86,10 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {
for file, _ := range g.Match() {
tailer, err := tail.TailFile(file,
tail.Config{
ReOpen: true,
Follow: true,
Location: &seek,
ReOpen: true,
Follow: true,
Location: &seek,
MustExist: true,
})
if err != nil {
errS += err.Error() + " "