From 981dd5bfc0ea72c8d9858aaff494f69d2298cbb1 Mon Sep 17 00:00:00 2001 From: Matthew Crenshaw <3420325+sgtsquiggs@users.noreply.github.com> Date: Thu, 11 Jul 2019 20:39:59 -0400 Subject: [PATCH] Resume from last known offset when reloading in tail input (#6074) --- plugins/inputs/logparser/logparser.go | 88 ++++++++++++++++++++++----- plugins/inputs/tail/tail.go | 78 +++++++++++++++++++----- plugins/inputs/tail/tail_test.go | 2 +- 3 files changed, 135 insertions(+), 33 deletions(-) diff --git a/plugins/inputs/logparser/logparser.go b/plugins/inputs/logparser/logparser.go index e724f2d4b..c132ba7a2 100644 --- a/plugins/inputs/logparser/logparser.go +++ b/plugins/inputs/logparser/logparser.go @@ -3,11 +3,13 @@ package logparser import ( + "fmt" "log" "strings" "sync" "github.com/influxdata/tail" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/globpath" "github.com/influxdata/telegraf/plugins/inputs" @@ -19,6 +21,11 @@ const ( defaultWatchMethod = "inotify" ) +var ( + offsets = make(map[string]int64) + offsetsMutex = new(sync.Mutex) +) + // LogParser in the primary interface for the plugin type GrokConfig struct { MeasurementName string `toml:"measurement"` @@ -42,6 +49,7 @@ type LogParserPlugin struct { WatchMethod string tailers map[string]*tail.Tail + offsets map[string]int64 lines chan logEntry done chan struct{} wg sync.WaitGroup @@ -53,6 +61,20 @@ type LogParserPlugin struct { GrokConfig GrokConfig `toml:"grok"` } +func NewLogParser() *LogParserPlugin { + offsetsMutex.Lock() + offsetsCopy := make(map[string]int64, len(offsets)) + for k, v := range offsets { + offsetsCopy[k] = v + } + offsetsMutex.Unlock() + + return &LogParserPlugin{ + WatchMethod: defaultWatchMethod, + offsets: offsetsCopy, + } +} + const sampleConfig = ` ## Log files to parse. ## These accept standard unix glob matching rules, but with the addition of @@ -161,18 +183,21 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { l.wg.Add(1) go l.parser() - return l.tailNewfiles(l.FromBeginning) + err = l.tailNewfiles(l.FromBeginning) + + // clear offsets + l.offsets = make(map[string]int64) + // assumption that once Start is called, all parallel plugins have already been initialized + offsetsMutex.Lock() + offsets = make(map[string]int64) + offsetsMutex.Unlock() + + return err } // check the globs against files on disk, and start tailing any new files. // Assumes l's lock is held! func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error { - var seek tail.SeekInfo - if !fromBeginning { - seek.Whence = 2 - seek.Offset = 0 - } - var poll bool if l.WatchMethod == "poll" { poll = true @@ -182,7 +207,7 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error { for _, filepath := range l.Files { g, err := globpath.Compile(filepath) if err != nil { - log.Printf("E! Error Glob %s failed to compile, %s", filepath, err) + log.Printf("E! [inputs.logparser] Error Glob %s failed to compile, %s", filepath, err) continue } files := g.Match() @@ -193,11 +218,27 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error { continue } + var seek *tail.SeekInfo + if !fromBeginning { + if offset, ok := l.offsets[file]; ok { + log.Printf("D! [inputs.tail] using offset %d for file: %v", offset, file) + seek = &tail.SeekInfo{ + Whence: 0, + Offset: offset, + } + } else { + seek = &tail.SeekInfo{ + Whence: 2, + Offset: 0, + } + } + } + tailer, err := tail.TailFile(file, tail.Config{ ReOpen: true, Follow: true, - Location: &seek, + Location: seek, MustExist: true, Poll: poll, Logger: tail.DiscardingLogger, @@ -228,7 +269,7 @@ func (l *LogParserPlugin) receiver(tailer *tail.Tail) { for line = range tailer.Lines { if line.Err != nil { - log.Printf("E! Error tailing file %s, Error: %s\n", + log.Printf("E! [inputs.logparser] Error tailing file %s, Error: %s", tailer.Filename, line.Err) continue } @@ -274,7 +315,7 @@ func (l *LogParserPlugin) parser() { l.acc.AddFields(m.Name(), m.Fields(), tags, m.Time()) } } else { - log.Println("E! Error parsing log line: " + err.Error()) + log.Println("E! [inputs.logparser] Error parsing log line: " + err.Error()) } } @@ -286,23 +327,38 @@ func (l *LogParserPlugin) Stop() { defer l.Unlock() for _, t := range l.tailers { + if !l.FromBeginning { + // store offset for resume + offset, err := t.Tell() + if err == nil { + l.offsets[t.Filename] = offset + log.Printf("D! [inputs.logparser] recording offset %d for file: %v", offset, t.Filename) + } else { + l.acc.AddError(fmt.Errorf("error recording offset for file %s", t.Filename)) + } + } err := t.Stop() //message for a stopped tailer - log.Printf("D! tail dropped for file: %v", t.Filename) + log.Printf("D! [inputs.logparser] tail dropped for file: %v", t.Filename) if err != nil { - log.Printf("E! Error stopping tail on file %s\n", t.Filename) + log.Printf("E! [inputs.logparser] Error stopping tail on file %s", t.Filename) } } close(l.done) l.wg.Wait() + + // persist offsets + offsetsMutex.Lock() + for k, v := range l.offsets { + offsets[k] = v + } + offsetsMutex.Unlock() } func init() { inputs.Add("logparser", func() telegraf.Input { - return &LogParserPlugin{ - WatchMethod: defaultWatchMethod, - } + return NewLogParser() }) } diff --git a/plugins/inputs/tail/tail.go b/plugins/inputs/tail/tail.go index 834d7cf8f..245010764 100644 --- a/plugins/inputs/tail/tail.go +++ b/plugins/inputs/tail/tail.go @@ -9,6 +9,7 @@ import ( "sync" "github.com/influxdata/tail" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/globpath" "github.com/influxdata/telegraf/plugins/inputs" @@ -19,6 +20,11 @@ const ( defaultWatchMethod = "inotify" ) +var ( + offsets = make(map[string]int64) + offsetsMutex = new(sync.Mutex) +) + type Tail struct { Files []string FromBeginning bool @@ -26,6 +32,7 @@ type Tail struct { WatchMethod string tailers map[string]*tail.Tail + offsets map[string]int64 parserFunc parsers.ParserFunc wg sync.WaitGroup acc telegraf.Accumulator @@ -34,8 +41,16 @@ type Tail struct { } func NewTail() *Tail { + offsetsMutex.Lock() + offsetsCopy := make(map[string]int64, len(offsets)) + for k, v := range offsets { + offsetsCopy[k] = v + } + offsetsMutex.Unlock() + return &Tail{ FromBeginning: false, + offsets: offsetsCopy, } } @@ -87,18 +102,19 @@ func (t *Tail) Start(acc telegraf.Accumulator) error { t.acc = acc t.tailers = make(map[string]*tail.Tail) - return t.tailNewFiles(t.FromBeginning) + err := t.tailNewFiles(t.FromBeginning) + + // clear offsets + t.offsets = make(map[string]int64) + // assumption that once Start is called, all parallel plugins have already been initialized + offsetsMutex.Lock() + offsets = make(map[string]int64) + offsetsMutex.Unlock() + + return err } func (t *Tail) tailNewFiles(fromBeginning bool) error { - var seek *tail.SeekInfo - if !t.Pipe && !fromBeginning { - seek = &tail.SeekInfo{ - Whence: 2, - Offset: 0, - } - } - var poll bool if t.WatchMethod == "poll" { poll = true @@ -108,7 +124,7 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error { for _, filepath := range t.Files { g, err := globpath.Compile(filepath) if err != nil { - t.acc.AddError(fmt.Errorf("E! Error Glob %s failed to compile, %s", filepath, err)) + t.acc.AddError(fmt.Errorf("glob %s failed to compile, %s", filepath, err)) } for _, file := range g.Match() { if _, ok := t.tailers[file]; ok { @@ -116,6 +132,22 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error { continue } + var seek *tail.SeekInfo + if !t.Pipe && !fromBeginning { + if offset, ok := t.offsets[file]; ok { + log.Printf("D! [inputs.tail] using offset %d for file: %v", offset, file) + seek = &tail.SeekInfo{ + Whence: 0, + Offset: offset, + } + } else { + seek = &tail.SeekInfo{ + Whence: 2, + Offset: 0, + } + } + } + tailer, err := tail.TailFile(file, tail.Config{ ReOpen: true, @@ -159,8 +191,7 @@ func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) { var line *tail.Line for line = range tailer.Lines { if line.Err != nil { - t.acc.AddError(fmt.Errorf("E! Error tailing file %s, Error: %s\n", - tailer.Filename, err)) + t.acc.AddError(fmt.Errorf("error tailing file %s, Error: %s", tailer.Filename, err)) continue } // Fix up files with Windows line endings. @@ -188,7 +219,7 @@ func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) { t.acc.AddFields(m.Name(), m.Fields(), tags, m.Time()) } } else { - t.acc.AddError(fmt.Errorf("E! Malformed log line in %s: [%s], Error: %s\n", + t.acc.AddError(fmt.Errorf("malformed log line in %s: [%s], Error: %s", tailer.Filename, line.Text, err)) } } @@ -196,8 +227,7 @@ func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) { log.Printf("D! [inputs.tail] tail removed for file: %v", tailer.Filename) if err := tailer.Err(); err != nil { - t.acc.AddError(fmt.Errorf("E! Error tailing file %s, Error: %s\n", - tailer.Filename, err)) + t.acc.AddError(fmt.Errorf("error tailing file %s, Error: %s", tailer.Filename, err)) } } @@ -206,13 +236,29 @@ func (t *Tail) Stop() { defer t.Unlock() for _, tailer := range t.tailers { + if !t.Pipe && !t.FromBeginning { + // store offset for resume + offset, err := tailer.Tell() + if err == nil { + log.Printf("D! [inputs.tail] recording offset %d for file: %v", offset, tailer.Filename) + } else { + t.acc.AddError(fmt.Errorf("error recording offset for file %s", tailer.Filename)) + } + } err := tailer.Stop() if err != nil { - t.acc.AddError(fmt.Errorf("E! Error stopping tail on file %s\n", tailer.Filename)) + t.acc.AddError(fmt.Errorf("error stopping tail on file %s", tailer.Filename)) } } t.wg.Wait() + + // persist offsets + offsetsMutex.Lock() + for k, v := range t.offsets { + offsets[k] = v + } + offsetsMutex.Unlock() } func (t *Tail) SetParserFunc(fn parsers.ParserFunc) { diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go index 06db2c172..fb5e05a76 100644 --- a/plugins/inputs/tail/tail_test.go +++ b/plugins/inputs/tail/tail_test.go @@ -108,7 +108,7 @@ func TestTailBadLine(t *testing.T) { require.NoError(t, err) acc.WaitError(1) - assert.Contains(t, acc.Errors[0].Error(), "E! Malformed log line") + assert.Contains(t, acc.Errors[0].Error(), "malformed log line") } func TestTailDosLineendings(t *testing.T) {