Resume from last known offset when reloading in tail input (#6074)
This commit is contained in:
parent
a0fec3cd82
commit
981dd5bfc0
|
@ -3,11 +3,13 @@
|
||||||
package logparser
|
package logparser
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/influxdata/tail"
|
"github.com/influxdata/tail"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/internal/globpath"
|
"github.com/influxdata/telegraf/internal/globpath"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
@ -19,6 +21,11 @@ const (
|
||||||
defaultWatchMethod = "inotify"
|
defaultWatchMethod = "inotify"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
offsets = make(map[string]int64)
|
||||||
|
offsetsMutex = new(sync.Mutex)
|
||||||
|
)
|
||||||
|
|
||||||
// LogParser in the primary interface for the plugin
|
// LogParser in the primary interface for the plugin
|
||||||
type GrokConfig struct {
|
type GrokConfig struct {
|
||||||
MeasurementName string `toml:"measurement"`
|
MeasurementName string `toml:"measurement"`
|
||||||
|
@ -42,6 +49,7 @@ type LogParserPlugin struct {
|
||||||
WatchMethod string
|
WatchMethod string
|
||||||
|
|
||||||
tailers map[string]*tail.Tail
|
tailers map[string]*tail.Tail
|
||||||
|
offsets map[string]int64
|
||||||
lines chan logEntry
|
lines chan logEntry
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
@ -53,6 +61,20 @@ type LogParserPlugin struct {
|
||||||
GrokConfig GrokConfig `toml:"grok"`
|
GrokConfig GrokConfig `toml:"grok"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewLogParser() *LogParserPlugin {
|
||||||
|
offsetsMutex.Lock()
|
||||||
|
offsetsCopy := make(map[string]int64, len(offsets))
|
||||||
|
for k, v := range offsets {
|
||||||
|
offsetsCopy[k] = v
|
||||||
|
}
|
||||||
|
offsetsMutex.Unlock()
|
||||||
|
|
||||||
|
return &LogParserPlugin{
|
||||||
|
WatchMethod: defaultWatchMethod,
|
||||||
|
offsets: offsetsCopy,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const sampleConfig = `
|
const sampleConfig = `
|
||||||
## Log files to parse.
|
## Log files to parse.
|
||||||
## These accept standard unix glob matching rules, but with the addition of
|
## These accept standard unix glob matching rules, but with the addition of
|
||||||
|
@ -161,18 +183,21 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
|
||||||
l.wg.Add(1)
|
l.wg.Add(1)
|
||||||
go l.parser()
|
go l.parser()
|
||||||
|
|
||||||
return l.tailNewfiles(l.FromBeginning)
|
err = l.tailNewfiles(l.FromBeginning)
|
||||||
|
|
||||||
|
// clear offsets
|
||||||
|
l.offsets = make(map[string]int64)
|
||||||
|
// assumption that once Start is called, all parallel plugins have already been initialized
|
||||||
|
offsetsMutex.Lock()
|
||||||
|
offsets = make(map[string]int64)
|
||||||
|
offsetsMutex.Unlock()
|
||||||
|
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// check the globs against files on disk, and start tailing any new files.
|
// check the globs against files on disk, and start tailing any new files.
|
||||||
// Assumes l's lock is held!
|
// Assumes l's lock is held!
|
||||||
func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
|
func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
|
||||||
var seek tail.SeekInfo
|
|
||||||
if !fromBeginning {
|
|
||||||
seek.Whence = 2
|
|
||||||
seek.Offset = 0
|
|
||||||
}
|
|
||||||
|
|
||||||
var poll bool
|
var poll bool
|
||||||
if l.WatchMethod == "poll" {
|
if l.WatchMethod == "poll" {
|
||||||
poll = true
|
poll = true
|
||||||
|
@ -182,7 +207,7 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
|
||||||
for _, filepath := range l.Files {
|
for _, filepath := range l.Files {
|
||||||
g, err := globpath.Compile(filepath)
|
g, err := globpath.Compile(filepath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("E! Error Glob %s failed to compile, %s", filepath, err)
|
log.Printf("E! [inputs.logparser] Error Glob %s failed to compile, %s", filepath, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
files := g.Match()
|
files := g.Match()
|
||||||
|
@ -193,11 +218,27 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var seek *tail.SeekInfo
|
||||||
|
if !fromBeginning {
|
||||||
|
if offset, ok := l.offsets[file]; ok {
|
||||||
|
log.Printf("D! [inputs.tail] using offset %d for file: %v", offset, file)
|
||||||
|
seek = &tail.SeekInfo{
|
||||||
|
Whence: 0,
|
||||||
|
Offset: offset,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
seek = &tail.SeekInfo{
|
||||||
|
Whence: 2,
|
||||||
|
Offset: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tailer, err := tail.TailFile(file,
|
tailer, err := tail.TailFile(file,
|
||||||
tail.Config{
|
tail.Config{
|
||||||
ReOpen: true,
|
ReOpen: true,
|
||||||
Follow: true,
|
Follow: true,
|
||||||
Location: &seek,
|
Location: seek,
|
||||||
MustExist: true,
|
MustExist: true,
|
||||||
Poll: poll,
|
Poll: poll,
|
||||||
Logger: tail.DiscardingLogger,
|
Logger: tail.DiscardingLogger,
|
||||||
|
@ -228,7 +269,7 @@ func (l *LogParserPlugin) receiver(tailer *tail.Tail) {
|
||||||
for line = range tailer.Lines {
|
for line = range tailer.Lines {
|
||||||
|
|
||||||
if line.Err != nil {
|
if line.Err != nil {
|
||||||
log.Printf("E! Error tailing file %s, Error: %s\n",
|
log.Printf("E! [inputs.logparser] Error tailing file %s, Error: %s",
|
||||||
tailer.Filename, line.Err)
|
tailer.Filename, line.Err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -274,7 +315,7 @@ func (l *LogParserPlugin) parser() {
|
||||||
l.acc.AddFields(m.Name(), m.Fields(), tags, m.Time())
|
l.acc.AddFields(m.Name(), m.Fields(), tags, m.Time())
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Println("E! Error parsing log line: " + err.Error())
|
log.Println("E! [inputs.logparser] Error parsing log line: " + err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -286,23 +327,38 @@ func (l *LogParserPlugin) Stop() {
|
||||||
defer l.Unlock()
|
defer l.Unlock()
|
||||||
|
|
||||||
for _, t := range l.tailers {
|
for _, t := range l.tailers {
|
||||||
|
if !l.FromBeginning {
|
||||||
|
// store offset for resume
|
||||||
|
offset, err := t.Tell()
|
||||||
|
if err == nil {
|
||||||
|
l.offsets[t.Filename] = offset
|
||||||
|
log.Printf("D! [inputs.logparser] recording offset %d for file: %v", offset, t.Filename)
|
||||||
|
} else {
|
||||||
|
l.acc.AddError(fmt.Errorf("error recording offset for file %s", t.Filename))
|
||||||
|
}
|
||||||
|
}
|
||||||
err := t.Stop()
|
err := t.Stop()
|
||||||
|
|
||||||
//message for a stopped tailer
|
//message for a stopped tailer
|
||||||
log.Printf("D! tail dropped for file: %v", t.Filename)
|
log.Printf("D! [inputs.logparser] tail dropped for file: %v", t.Filename)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("E! Error stopping tail on file %s\n", t.Filename)
|
log.Printf("E! [inputs.logparser] Error stopping tail on file %s", t.Filename)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
close(l.done)
|
close(l.done)
|
||||||
l.wg.Wait()
|
l.wg.Wait()
|
||||||
|
|
||||||
|
// persist offsets
|
||||||
|
offsetsMutex.Lock()
|
||||||
|
for k, v := range l.offsets {
|
||||||
|
offsets[k] = v
|
||||||
|
}
|
||||||
|
offsetsMutex.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
inputs.Add("logparser", func() telegraf.Input {
|
inputs.Add("logparser", func() telegraf.Input {
|
||||||
return &LogParserPlugin{
|
return NewLogParser()
|
||||||
WatchMethod: defaultWatchMethod,
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/influxdata/tail"
|
"github.com/influxdata/tail"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/internal/globpath"
|
"github.com/influxdata/telegraf/internal/globpath"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
@ -19,6 +20,11 @@ const (
|
||||||
defaultWatchMethod = "inotify"
|
defaultWatchMethod = "inotify"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
offsets = make(map[string]int64)
|
||||||
|
offsetsMutex = new(sync.Mutex)
|
||||||
|
)
|
||||||
|
|
||||||
type Tail struct {
|
type Tail struct {
|
||||||
Files []string
|
Files []string
|
||||||
FromBeginning bool
|
FromBeginning bool
|
||||||
|
@ -26,6 +32,7 @@ type Tail struct {
|
||||||
WatchMethod string
|
WatchMethod string
|
||||||
|
|
||||||
tailers map[string]*tail.Tail
|
tailers map[string]*tail.Tail
|
||||||
|
offsets map[string]int64
|
||||||
parserFunc parsers.ParserFunc
|
parserFunc parsers.ParserFunc
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
acc telegraf.Accumulator
|
acc telegraf.Accumulator
|
||||||
|
@ -34,8 +41,16 @@ type Tail struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTail() *Tail {
|
func NewTail() *Tail {
|
||||||
|
offsetsMutex.Lock()
|
||||||
|
offsetsCopy := make(map[string]int64, len(offsets))
|
||||||
|
for k, v := range offsets {
|
||||||
|
offsetsCopy[k] = v
|
||||||
|
}
|
||||||
|
offsetsMutex.Unlock()
|
||||||
|
|
||||||
return &Tail{
|
return &Tail{
|
||||||
FromBeginning: false,
|
FromBeginning: false,
|
||||||
|
offsets: offsetsCopy,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,18 +102,19 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {
|
||||||
t.acc = acc
|
t.acc = acc
|
||||||
t.tailers = make(map[string]*tail.Tail)
|
t.tailers = make(map[string]*tail.Tail)
|
||||||
|
|
||||||
return t.tailNewFiles(t.FromBeginning)
|
err := t.tailNewFiles(t.FromBeginning)
|
||||||
|
|
||||||
|
// clear offsets
|
||||||
|
t.offsets = make(map[string]int64)
|
||||||
|
// assumption that once Start is called, all parallel plugins have already been initialized
|
||||||
|
offsetsMutex.Lock()
|
||||||
|
offsets = make(map[string]int64)
|
||||||
|
offsetsMutex.Unlock()
|
||||||
|
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Tail) tailNewFiles(fromBeginning bool) error {
|
func (t *Tail) tailNewFiles(fromBeginning bool) error {
|
||||||
var seek *tail.SeekInfo
|
|
||||||
if !t.Pipe && !fromBeginning {
|
|
||||||
seek = &tail.SeekInfo{
|
|
||||||
Whence: 2,
|
|
||||||
Offset: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var poll bool
|
var poll bool
|
||||||
if t.WatchMethod == "poll" {
|
if t.WatchMethod == "poll" {
|
||||||
poll = true
|
poll = true
|
||||||
|
@ -108,7 +124,7 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {
|
||||||
for _, filepath := range t.Files {
|
for _, filepath := range t.Files {
|
||||||
g, err := globpath.Compile(filepath)
|
g, err := globpath.Compile(filepath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.acc.AddError(fmt.Errorf("E! Error Glob %s failed to compile, %s", filepath, err))
|
t.acc.AddError(fmt.Errorf("glob %s failed to compile, %s", filepath, err))
|
||||||
}
|
}
|
||||||
for _, file := range g.Match() {
|
for _, file := range g.Match() {
|
||||||
if _, ok := t.tailers[file]; ok {
|
if _, ok := t.tailers[file]; ok {
|
||||||
|
@ -116,6 +132,22 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var seek *tail.SeekInfo
|
||||||
|
if !t.Pipe && !fromBeginning {
|
||||||
|
if offset, ok := t.offsets[file]; ok {
|
||||||
|
log.Printf("D! [inputs.tail] using offset %d for file: %v", offset, file)
|
||||||
|
seek = &tail.SeekInfo{
|
||||||
|
Whence: 0,
|
||||||
|
Offset: offset,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
seek = &tail.SeekInfo{
|
||||||
|
Whence: 2,
|
||||||
|
Offset: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tailer, err := tail.TailFile(file,
|
tailer, err := tail.TailFile(file,
|
||||||
tail.Config{
|
tail.Config{
|
||||||
ReOpen: true,
|
ReOpen: true,
|
||||||
|
@ -159,8 +191,7 @@ func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) {
|
||||||
var line *tail.Line
|
var line *tail.Line
|
||||||
for line = range tailer.Lines {
|
for line = range tailer.Lines {
|
||||||
if line.Err != nil {
|
if line.Err != nil {
|
||||||
t.acc.AddError(fmt.Errorf("E! Error tailing file %s, Error: %s\n",
|
t.acc.AddError(fmt.Errorf("error tailing file %s, Error: %s", tailer.Filename, err))
|
||||||
tailer.Filename, err))
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Fix up files with Windows line endings.
|
// Fix up files with Windows line endings.
|
||||||
|
@ -188,7 +219,7 @@ func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) {
|
||||||
t.acc.AddFields(m.Name(), m.Fields(), tags, m.Time())
|
t.acc.AddFields(m.Name(), m.Fields(), tags, m.Time())
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
t.acc.AddError(fmt.Errorf("E! Malformed log line in %s: [%s], Error: %s\n",
|
t.acc.AddError(fmt.Errorf("malformed log line in %s: [%s], Error: %s",
|
||||||
tailer.Filename, line.Text, err))
|
tailer.Filename, line.Text, err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -196,8 +227,7 @@ func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) {
|
||||||
log.Printf("D! [inputs.tail] tail removed for file: %v", tailer.Filename)
|
log.Printf("D! [inputs.tail] tail removed for file: %v", tailer.Filename)
|
||||||
|
|
||||||
if err := tailer.Err(); err != nil {
|
if err := tailer.Err(); err != nil {
|
||||||
t.acc.AddError(fmt.Errorf("E! Error tailing file %s, Error: %s\n",
|
t.acc.AddError(fmt.Errorf("error tailing file %s, Error: %s", tailer.Filename, err))
|
||||||
tailer.Filename, err))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -206,13 +236,29 @@ func (t *Tail) Stop() {
|
||||||
defer t.Unlock()
|
defer t.Unlock()
|
||||||
|
|
||||||
for _, tailer := range t.tailers {
|
for _, tailer := range t.tailers {
|
||||||
|
if !t.Pipe && !t.FromBeginning {
|
||||||
|
// store offset for resume
|
||||||
|
offset, err := tailer.Tell()
|
||||||
|
if err == nil {
|
||||||
|
log.Printf("D! [inputs.tail] recording offset %d for file: %v", offset, tailer.Filename)
|
||||||
|
} else {
|
||||||
|
t.acc.AddError(fmt.Errorf("error recording offset for file %s", tailer.Filename))
|
||||||
|
}
|
||||||
|
}
|
||||||
err := tailer.Stop()
|
err := tailer.Stop()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.acc.AddError(fmt.Errorf("E! Error stopping tail on file %s\n", tailer.Filename))
|
t.acc.AddError(fmt.Errorf("error stopping tail on file %s", tailer.Filename))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
t.wg.Wait()
|
t.wg.Wait()
|
||||||
|
|
||||||
|
// persist offsets
|
||||||
|
offsetsMutex.Lock()
|
||||||
|
for k, v := range t.offsets {
|
||||||
|
offsets[k] = v
|
||||||
|
}
|
||||||
|
offsetsMutex.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Tail) SetParserFunc(fn parsers.ParserFunc) {
|
func (t *Tail) SetParserFunc(fn parsers.ParserFunc) {
|
||||||
|
|
|
@ -108,7 +108,7 @@ func TestTailBadLine(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
acc.WaitError(1)
|
acc.WaitError(1)
|
||||||
assert.Contains(t, acc.Errors[0].Error(), "E! Malformed log line")
|
assert.Contains(t, acc.Errors[0].Error(), "malformed log line")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTailDosLineendings(t *testing.T) {
|
func TestTailDosLineendings(t *testing.T) {
|
||||||
|
|
Loading…
Reference in New Issue