Add path tag to logparser containing path of logfile (#3098)
This commit is contained in:
		
							parent
							
								
									1fb5373962
								
							
						
					
					
						commit
						3e27134872
					
				|  | @ -23,13 +23,18 @@ type LogParser interface { | ||||||
| 	Compile() error | 	Compile() error | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | type logEntry struct { | ||||||
|  | 	path string | ||||||
|  | 	line string | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // LogParserPlugin is the primary struct to implement the interface for logparser plugin
 | // LogParserPlugin is the primary struct to implement the interface for logparser plugin
 | ||||||
| type LogParserPlugin struct { | type LogParserPlugin struct { | ||||||
| 	Files         []string | 	Files         []string | ||||||
| 	FromBeginning bool | 	FromBeginning bool | ||||||
| 
 | 
 | ||||||
| 	tailers map[string]*tail.Tail | 	tailers map[string]*tail.Tail | ||||||
| 	lines   chan string | 	lines   chan logEntry | ||||||
| 	done    chan struct{} | 	done    chan struct{} | ||||||
| 	wg      sync.WaitGroup | 	wg      sync.WaitGroup | ||||||
| 	acc     telegraf.Accumulator | 	acc     telegraf.Accumulator | ||||||
|  | @ -112,7 +117,7 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { | ||||||
| 	defer l.Unlock() | 	defer l.Unlock() | ||||||
| 
 | 
 | ||||||
| 	l.acc = acc | 	l.acc = acc | ||||||
| 	l.lines = make(chan string, 1000) | 	l.lines = make(chan logEntry, 1000) | ||||||
| 	l.done = make(chan struct{}) | 	l.done = make(chan struct{}) | ||||||
| 	l.tailers = make(map[string]*tail.Tail) | 	l.tailers = make(map[string]*tail.Tail) | ||||||
| 
 | 
 | ||||||
|  | @ -214,9 +219,14 @@ func (l *LogParserPlugin) receiver(tailer *tail.Tail) { | ||||||
| 		// Fix up files with Windows line endings.
 | 		// Fix up files with Windows line endings.
 | ||||||
| 		text := strings.TrimRight(line.Text, "\r") | 		text := strings.TrimRight(line.Text, "\r") | ||||||
| 
 | 
 | ||||||
|  | 		entry := logEntry{ | ||||||
|  | 			path: tailer.Filename, | ||||||
|  | 			line: text, | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
| 		select { | 		select { | ||||||
| 		case <-l.done: | 		case <-l.done: | ||||||
| 		case l.lines <- text: | 		case l.lines <- entry: | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | @ -229,22 +239,23 @@ func (l *LogParserPlugin) parser() { | ||||||
| 
 | 
 | ||||||
| 	var m telegraf.Metric | 	var m telegraf.Metric | ||||||
| 	var err error | 	var err error | ||||||
| 	var line string | 	var entry logEntry | ||||||
| 	for { | 	for { | ||||||
| 		select { | 		select { | ||||||
| 		case <-l.done: | 		case <-l.done: | ||||||
| 			return | 			return | ||||||
| 		case line = <-l.lines: | 		case entry = <-l.lines: | ||||||
| 			if line == "" || line == "\n" { | 			if entry.line == "" || entry.line == "\n" { | ||||||
| 				continue | 				continue | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 
 |  | ||||||
| 		for _, parser := range l.parsers { | 		for _, parser := range l.parsers { | ||||||
| 			m, err = parser.ParseLine(line) | 			m, err = parser.ParseLine(entry.line) | ||||||
| 			if err == nil { | 			if err == nil { | ||||||
|  | 				tags := m.Tags() | ||||||
|  | 				tags["path"] = entry.path | ||||||
| 				if m != nil { | 				if m != nil { | ||||||
| 					l.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) | 					l.acc.AddFields(m.Name(), m.Fields(), tags, m.Time()) | ||||||
| 				} | 				} | ||||||
| 			} else { | 			} else { | ||||||
| 				log.Println("E! Error parsing log line: " + err.Error()) | 				log.Println("E! Error parsing log line: " + err.Error()) | ||||||
|  |  | ||||||
|  | @ -69,7 +69,10 @@ func TestGrokParseLogFiles(t *testing.T) { | ||||||
| 			"response_time": int64(5432), | 			"response_time": int64(5432), | ||||||
| 			"myint":         int64(101), | 			"myint":         int64(101), | ||||||
| 		}, | 		}, | ||||||
| 		map[string]string{"response_code": "200"}) | 		map[string]string{ | ||||||
|  | 			"response_code": "200", | ||||||
|  | 			"path":          thisdir + "grok/testdata/test_a.log", | ||||||
|  | 		}) | ||||||
| 
 | 
 | ||||||
| 	acc.AssertContainsTaggedFields(t, "logparser_grok", | 	acc.AssertContainsTaggedFields(t, "logparser_grok", | ||||||
| 		map[string]interface{}{ | 		map[string]interface{}{ | ||||||
|  | @ -77,7 +80,9 @@ func TestGrokParseLogFiles(t *testing.T) { | ||||||
| 			"mystring":   "mystring", | 			"mystring":   "mystring", | ||||||
| 			"nomodifier": "nomodifier", | 			"nomodifier": "nomodifier", | ||||||
| 		}, | 		}, | ||||||
| 		map[string]string{}) | 		map[string]string{ | ||||||
|  | 			"path": thisdir + "grok/testdata/test_b.log", | ||||||
|  | 		}) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func TestGrokParseLogFilesAppearLater(t *testing.T) { | func TestGrokParseLogFilesAppearLater(t *testing.T) { | ||||||
|  | @ -115,7 +120,10 @@ func TestGrokParseLogFilesAppearLater(t *testing.T) { | ||||||
| 			"response_time": int64(5432), | 			"response_time": int64(5432), | ||||||
| 			"myint":         int64(101), | 			"myint":         int64(101), | ||||||
| 		}, | 		}, | ||||||
| 		map[string]string{"response_code": "200"}) | 		map[string]string{ | ||||||
|  | 			"response_code": "200", | ||||||
|  | 			"path":          emptydir + "/test_a.log", | ||||||
|  | 		}) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Test that test_a.log line gets parsed even though we don't have the correct
 | // Test that test_a.log line gets parsed even though we don't have the correct
 | ||||||
|  | @ -148,7 +156,10 @@ func TestGrokParseLogFilesOneBad(t *testing.T) { | ||||||
| 			"response_time": int64(5432), | 			"response_time": int64(5432), | ||||||
| 			"myint":         int64(101), | 			"myint":         int64(101), | ||||||
| 		}, | 		}, | ||||||
| 		map[string]string{"response_code": "200"}) | 		map[string]string{ | ||||||
|  | 			"response_code": "200", | ||||||
|  | 			"path":          thisdir + "grok/testdata/test_a.log", | ||||||
|  | 		}) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func getCurrentDir() string { | func getCurrentDir() string { | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue