diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d1b1bb5b..8e5865e7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ It is highly recommended that all users migrate to the new riemann output plugin ### Features +- [#2141](https://github.com/influxdata/telegraf/pull/2141): Logparser handles newly-created files. - [#2137](https://github.com/influxdata/telegraf/pull/2137): Added userstats to mysql input plugin. - [#2179](https://github.com/influxdata/telegraf/pull/2179): Added more InnoDB metric to MySQL plugin. - [#2251](https://github.com/influxdata/telegraf/pull/2251): InfluxDB output: use own client for improved through-put and less allocations. diff --git a/plugins/inputs/logparser/logparser.go b/plugins/inputs/logparser/logparser.go index 0778a8a6d..8ec328358 100644 --- a/plugins/inputs/logparser/logparser.go +++ b/plugins/inputs/logparser/logparser.go @@ -26,7 +26,7 @@ type LogParserPlugin struct { Files []string FromBeginning bool - tailers []*tail.Tail + tailers map[string]*tail.Tail lines chan string done chan struct{} wg sync.WaitGroup @@ -46,7 +46,9 @@ const sampleConfig = ` ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log ## /var/log/apache.log -> only tail the apache log file files = ["/var/log/apache/access.log"] - ## Read file from beginning. + ## Read files that currently exist from the beginning. Files that are created + ## while telegraf is running (and that match the "files" globs) will always + ## be read from the beginning. from_beginning = false ## Parse logstash-style "grok" patterns: @@ -77,7 +79,11 @@ func (l *LogParserPlugin) Description() string { } func (l *LogParserPlugin) Gather(acc telegraf.Accumulator) error { - return nil + l.Lock() + defer l.Unlock() + + // always start from the beginning of files that appear while we're running + return l.tailNewfiles(true) } func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { @@ -87,6 +93,7 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { l.acc = acc l.lines = make(chan string, 1000) l.done = make(chan struct{}) + l.tailers = make(map[string]*tail.Tail) // Looks for fields which implement LogParser interface l.parsers = []LogParser{} @@ -121,14 +128,22 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { return err } + l.wg.Add(1) + go l.parser() + + return l.tailNewfiles(l.FromBeginning) +} + +// check the globs against files on disk, and start tailing any new files. +// Assumes l's lock is held! +func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error { var seek tail.SeekInfo - if !l.FromBeginning { + if !fromBeginning { seek.Whence = 2 seek.Offset = 0 } - l.wg.Add(1) - go l.parser() + errChan := errchan.New(len(l.Files)) // Create a "tailer" for each file for _, filepath := range l.Files { @@ -139,7 +154,13 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { } files := g.Match() errChan = errchan.New(len(files)) + for file, _ := range files { + if _, ok := l.tailers[file]; ok { + // we're already tailing this file + continue + } + tailer, err := tail.TailFile(file, tail.Config{ ReOpen: true, @@ -152,7 +173,7 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { // create a goroutine for each "tailer" l.wg.Add(1) go l.receiver(tailer) - l.tailers = append(l.tailers, tailer) + l.tailers[file] = tailer } } @@ -166,6 +187,7 @@ func (l *LogParserPlugin) receiver(tailer *tail.Tail) { var line *tail.Line for line = range tailer.Lines { + if line.Err != nil { log.Printf("E! Error tailing file %s, Error: %s\n", tailer.Filename, line.Err) diff --git a/plugins/inputs/logparser/logparser_test.go b/plugins/inputs/logparser/logparser_test.go index 97f33067e..059bfd266 100644 --- a/plugins/inputs/logparser/logparser_test.go +++ b/plugins/inputs/logparser/logparser_test.go @@ -1,6 +1,8 @@ package logparser import ( + "io/ioutil" + "os" "runtime" "strings" "testing" @@ -80,6 +82,47 @@ func TestGrokParseLogFiles(t *testing.T) { map[string]string{}) } +func TestGrokParseLogFilesAppearLater(t *testing.T) { + emptydir, err := ioutil.TempDir("", "TestGrokParseLogFilesAppearLater") + defer os.RemoveAll(emptydir) + assert.NoError(t, err) + + thisdir := getCurrentDir() + p := &grok.Parser{ + Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"}, + CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"}, + } + + logparser := &LogParserPlugin{ + FromBeginning: true, + Files: []string{emptydir + "/*.log"}, + GrokParser: p, + } + + acc := testutil.Accumulator{} + assert.NoError(t, logparser.Start(&acc)) + + time.Sleep(time.Millisecond * 500) + assert.Equal(t, acc.NFields(), 0) + + os.Symlink( + thisdir+"grok/testdata/test_a.log", + emptydir+"/test_a.log") + assert.NoError(t, logparser.Gather(&acc)) + time.Sleep(time.Millisecond * 500) + + logparser.Stop() + + acc.AssertContainsTaggedFields(t, "logparser_grok", + map[string]interface{}{ + "clientip": "192.168.1.1", + "myfloat": float64(1.25), + "response_time": int64(5432), + "myint": int64(101), + }, + map[string]string{"response_code": "200"}) +} + // Test that test_a.log line gets parsed even though we don't have the correct // pattern available for test_b.log func TestGrokParseLogFilesOneBad(t *testing.T) {