Add path tag to logparser containing path of logfile (#3098)
This commit is contained in:
parent
07cda8903a
commit
d9ddf7bfd0
|
@ -23,13 +23,18 @@ type LogParser interface {
|
||||||
Compile() error
|
Compile() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type logEntry struct {
|
||||||
|
path string
|
||||||
|
line string
|
||||||
|
}
|
||||||
|
|
||||||
// LogParserPlugin is the primary struct to implement the interface for logparser plugin
|
// LogParserPlugin is the primary struct to implement the interface for logparser plugin
|
||||||
type LogParserPlugin struct {
|
type LogParserPlugin struct {
|
||||||
Files []string
|
Files []string
|
||||||
FromBeginning bool
|
FromBeginning bool
|
||||||
|
|
||||||
tailers map[string]*tail.Tail
|
tailers map[string]*tail.Tail
|
||||||
lines chan string
|
lines chan logEntry
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
acc telegraf.Accumulator
|
acc telegraf.Accumulator
|
||||||
|
@ -112,7 +117,7 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
|
||||||
defer l.Unlock()
|
defer l.Unlock()
|
||||||
|
|
||||||
l.acc = acc
|
l.acc = acc
|
||||||
l.lines = make(chan string, 1000)
|
l.lines = make(chan logEntry, 1000)
|
||||||
l.done = make(chan struct{})
|
l.done = make(chan struct{})
|
||||||
l.tailers = make(map[string]*tail.Tail)
|
l.tailers = make(map[string]*tail.Tail)
|
||||||
|
|
||||||
|
@ -214,9 +219,14 @@ func (l *LogParserPlugin) receiver(tailer *tail.Tail) {
|
||||||
// Fix up files with Windows line endings.
|
// Fix up files with Windows line endings.
|
||||||
text := strings.TrimRight(line.Text, "\r")
|
text := strings.TrimRight(line.Text, "\r")
|
||||||
|
|
||||||
|
entry := logEntry{
|
||||||
|
path: tailer.Filename,
|
||||||
|
line: text,
|
||||||
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-l.done:
|
case <-l.done:
|
||||||
case l.lines <- text:
|
case l.lines <- entry:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -229,22 +239,23 @@ func (l *LogParserPlugin) parser() {
|
||||||
|
|
||||||
var m telegraf.Metric
|
var m telegraf.Metric
|
||||||
var err error
|
var err error
|
||||||
var line string
|
var entry logEntry
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-l.done:
|
case <-l.done:
|
||||||
return
|
return
|
||||||
case line = <-l.lines:
|
case entry = <-l.lines:
|
||||||
if line == "" || line == "\n" {
|
if entry.line == "" || entry.line == "\n" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, parser := range l.parsers {
|
for _, parser := range l.parsers {
|
||||||
m, err = parser.ParseLine(line)
|
m, err = parser.ParseLine(entry.line)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
tags := m.Tags()
|
||||||
|
tags["path"] = entry.path
|
||||||
if m != nil {
|
if m != nil {
|
||||||
l.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
|
l.acc.AddFields(m.Name(), m.Fields(), tags, m.Time())
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Println("E! Error parsing log line: " + err.Error())
|
log.Println("E! Error parsing log line: " + err.Error())
|
||||||
|
|
|
@ -69,7 +69,10 @@ func TestGrokParseLogFiles(t *testing.T) {
|
||||||
"response_time": int64(5432),
|
"response_time": int64(5432),
|
||||||
"myint": int64(101),
|
"myint": int64(101),
|
||||||
},
|
},
|
||||||
map[string]string{"response_code": "200"})
|
map[string]string{
|
||||||
|
"response_code": "200",
|
||||||
|
"path": thisdir + "grok/testdata/test_a.log",
|
||||||
|
})
|
||||||
|
|
||||||
acc.AssertContainsTaggedFields(t, "logparser_grok",
|
acc.AssertContainsTaggedFields(t, "logparser_grok",
|
||||||
map[string]interface{}{
|
map[string]interface{}{
|
||||||
|
@ -77,7 +80,9 @@ func TestGrokParseLogFiles(t *testing.T) {
|
||||||
"mystring": "mystring",
|
"mystring": "mystring",
|
||||||
"nomodifier": "nomodifier",
|
"nomodifier": "nomodifier",
|
||||||
},
|
},
|
||||||
map[string]string{})
|
map[string]string{
|
||||||
|
"path": thisdir + "grok/testdata/test_b.log",
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGrokParseLogFilesAppearLater(t *testing.T) {
|
func TestGrokParseLogFilesAppearLater(t *testing.T) {
|
||||||
|
@ -115,7 +120,10 @@ func TestGrokParseLogFilesAppearLater(t *testing.T) {
|
||||||
"response_time": int64(5432),
|
"response_time": int64(5432),
|
||||||
"myint": int64(101),
|
"myint": int64(101),
|
||||||
},
|
},
|
||||||
map[string]string{"response_code": "200"})
|
map[string]string{
|
||||||
|
"response_code": "200",
|
||||||
|
"path": emptydir + "/test_a.log",
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that test_a.log line gets parsed even though we don't have the correct
|
// Test that test_a.log line gets parsed even though we don't have the correct
|
||||||
|
@ -148,7 +156,10 @@ func TestGrokParseLogFilesOneBad(t *testing.T) {
|
||||||
"response_time": int64(5432),
|
"response_time": int64(5432),
|
||||||
"myint": int64(101),
|
"myint": int64(101),
|
||||||
},
|
},
|
||||||
map[string]string{"response_code": "200"})
|
map[string]string{
|
||||||
|
"response_code": "200",
|
||||||
|
"path": thisdir + "grok/testdata/test_a.log",
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func getCurrentDir() string {
|
func getCurrentDir() string {
|
||||||
|
|
Loading…
Reference in New Issue