From d9ddf7bfd0f0fff0bd49cb0389a75bfb75c47d59 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Mon, 7 Aug 2017 16:16:31 -0700 Subject: [PATCH] Add path tag to logparser containing path of logfile (#3098) --- plugins/inputs/logparser/logparser.go | 29 +++++++++++++++------- plugins/inputs/logparser/logparser_test.go | 19 +++++++++++--- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/plugins/inputs/logparser/logparser.go b/plugins/inputs/logparser/logparser.go index 0c495f409..27441c0ba 100644 --- a/plugins/inputs/logparser/logparser.go +++ b/plugins/inputs/logparser/logparser.go @@ -23,13 +23,18 @@ type LogParser interface { Compile() error } +type logEntry struct { + path string + line string +} + // LogParserPlugin is the primary struct to implement the interface for logparser plugin type LogParserPlugin struct { Files []string FromBeginning bool tailers map[string]*tail.Tail - lines chan string + lines chan logEntry done chan struct{} wg sync.WaitGroup acc telegraf.Accumulator @@ -112,7 +117,7 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { defer l.Unlock() l.acc = acc - l.lines = make(chan string, 1000) + l.lines = make(chan logEntry, 1000) l.done = make(chan struct{}) l.tailers = make(map[string]*tail.Tail) @@ -214,9 +219,14 @@ func (l *LogParserPlugin) receiver(tailer *tail.Tail) { // Fix up files with Windows line endings. text := strings.TrimRight(line.Text, "\r") + entry := logEntry{ + path: tailer.Filename, + line: text, + } + select { case <-l.done: - case l.lines <- text: + case l.lines <- entry: } } } @@ -229,22 +239,23 @@ func (l *LogParserPlugin) parser() { var m telegraf.Metric var err error - var line string + var entry logEntry for { select { case <-l.done: return - case line = <-l.lines: - if line == "" || line == "\n" { + case entry = <-l.lines: + if entry.line == "" || entry.line == "\n" { continue } } - for _, parser := range l.parsers { - m, err = parser.ParseLine(line) + m, err = parser.ParseLine(entry.line) if err == nil { + tags := m.Tags() + tags["path"] = entry.path if m != nil { - l.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + l.acc.AddFields(m.Name(), m.Fields(), tags, m.Time()) } } else { log.Println("E! Error parsing log line: " + err.Error()) diff --git a/plugins/inputs/logparser/logparser_test.go b/plugins/inputs/logparser/logparser_test.go index f2d93f265..98567b4c2 100644 --- a/plugins/inputs/logparser/logparser_test.go +++ b/plugins/inputs/logparser/logparser_test.go @@ -69,7 +69,10 @@ func TestGrokParseLogFiles(t *testing.T) { "response_time": int64(5432), "myint": int64(101), }, - map[string]string{"response_code": "200"}) + map[string]string{ + "response_code": "200", + "path": thisdir + "grok/testdata/test_a.log", + }) acc.AssertContainsTaggedFields(t, "logparser_grok", map[string]interface{}{ @@ -77,7 +80,9 @@ func TestGrokParseLogFiles(t *testing.T) { "mystring": "mystring", "nomodifier": "nomodifier", }, - map[string]string{}) + map[string]string{ + "path": thisdir + "grok/testdata/test_b.log", + }) } func TestGrokParseLogFilesAppearLater(t *testing.T) { @@ -115,7 +120,10 @@ func TestGrokParseLogFilesAppearLater(t *testing.T) { "response_time": int64(5432), "myint": int64(101), }, - map[string]string{"response_code": "200"}) + map[string]string{ + "response_code": "200", + "path": emptydir + "/test_a.log", + }) } // Test that test_a.log line gets parsed even though we don't have the correct @@ -148,7 +156,10 @@ func TestGrokParseLogFilesOneBad(t *testing.T) { "response_time": int64(5432), "myint": int64(101), }, - map[string]string{"response_code": "200"}) + map[string]string{ + "response_code": "200", + "path": thisdir + "grok/testdata/test_a.log", + }) } func getCurrentDir() string {