Support tailing files created after startup in tail input (#4704)
This commit is contained in:
parent
41d528c8ce
commit
44c2435f64
|
@ -25,7 +25,7 @@ type Tail struct {
|
||||||
Pipe bool
|
Pipe bool
|
||||||
WatchMethod string
|
WatchMethod string
|
||||||
|
|
||||||
tailers []*tail.Tail
|
tailers map[string]*tail.Tail
|
||||||
parser parsers.Parser
|
parser parsers.Parser
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
acc telegraf.Accumulator
|
acc telegraf.Accumulator
|
||||||
|
@ -74,7 +74,10 @@ func (t *Tail) Description() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Tail) Gather(acc telegraf.Accumulator) error {
|
func (t *Tail) Gather(acc telegraf.Accumulator) error {
|
||||||
return nil
|
t.Lock()
|
||||||
|
defer t.Unlock()
|
||||||
|
|
||||||
|
return t.tailNewFiles(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Tail) Start(acc telegraf.Accumulator) error {
|
func (t *Tail) Start(acc telegraf.Accumulator) error {
|
||||||
|
@ -82,9 +85,14 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {
|
||||||
defer t.Unlock()
|
defer t.Unlock()
|
||||||
|
|
||||||
t.acc = acc
|
t.acc = acc
|
||||||
|
t.tailers = make(map[string]*tail.Tail)
|
||||||
|
|
||||||
|
return t.tailNewFiles(t.FromBeginning)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Tail) tailNewFiles(fromBeginning bool) error {
|
||||||
var seek *tail.SeekInfo
|
var seek *tail.SeekInfo
|
||||||
if !t.Pipe && !t.FromBeginning {
|
if !t.Pipe && !fromBeginning {
|
||||||
seek = &tail.SeekInfo{
|
seek = &tail.SeekInfo{
|
||||||
Whence: 2,
|
Whence: 2,
|
||||||
Offset: 0,
|
Offset: 0,
|
||||||
|
@ -103,6 +111,11 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {
|
||||||
t.acc.AddError(fmt.Errorf("E! Error Glob %s failed to compile, %s", filepath, err))
|
t.acc.AddError(fmt.Errorf("E! Error Glob %s failed to compile, %s", filepath, err))
|
||||||
}
|
}
|
||||||
for file, _ := range g.Match() {
|
for file, _ := range g.Match() {
|
||||||
|
if _, ok := t.tailers[file]; ok {
|
||||||
|
// we're already tailing this file
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
tailer, err := tail.TailFile(file,
|
tailer, err := tail.TailFile(file,
|
||||||
tail.Config{
|
tail.Config{
|
||||||
ReOpen: true,
|
ReOpen: true,
|
||||||
|
@ -114,16 +127,15 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {
|
||||||
Logger: tail.DiscardingLogger,
|
Logger: tail.DiscardingLogger,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
acc.AddError(err)
|
t.acc.AddError(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// create a goroutine for each "tailer"
|
// create a goroutine for each "tailer"
|
||||||
t.wg.Add(1)
|
t.wg.Add(1)
|
||||||
go t.receiver(tailer)
|
go t.receiver(tailer)
|
||||||
t.tailers = append(t.tailers, tailer)
|
t.tailers[file] = tailer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue