telegraf/plugins/inputs/tail/tail.go

231 lines
5.0 KiB
Go
Raw Normal View History

// +build !solaris
2016-04-22 21:47:26 +00:00
package tail
import (
"fmt"
"log"
"strings"
2016-04-22 21:47:26 +00:00
"sync"
2017-03-29 21:25:33 +00:00
"github.com/influxdata/tail"
2016-04-22 21:47:26 +00:00
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)
const (
defaultWatchMethod = "inotify"
)
2016-04-22 21:47:26 +00:00
type Tail struct {
Files []string
FromBeginning bool
Pipe bool
WatchMethod string
2016-04-22 21:47:26 +00:00
tailers map[string]*tail.Tail
parserFunc parsers.ParserFunc
wg sync.WaitGroup
acc telegraf.Accumulator
2016-04-22 21:47:26 +00:00
sync.Mutex
}
2016-04-26 16:43:41 +00:00
func NewTail() *Tail {
return &Tail{
FromBeginning: false,
}
}
2016-04-22 21:47:26 +00:00
const sampleConfig = `
## files to tail.
## These accept standard unix glob matching rules, but with the addition of
## ** as a "super asterisk". ie:
## "/var/log/**.log" -> recursively find all .log files in /var/log
## "/var/log/*/*.log" -> find all .log files with a parent dir in /var/log
## "/var/log/apache.log" -> just tail the apache log file
##
## See https://github.com/gobwas/glob for more examples
##
files = ["/var/mymetrics.out"]
## Read file from beginning.
from_beginning = false
## Whether file is a named pipe
pipe = false
2016-04-22 21:47:26 +00:00
## Method used to watch for file updates. Can be either "inotify" or "poll".
# watch_method = "inotify"
2016-04-22 21:47:26 +00:00
## Data format to consume.
2017-04-27 21:59:18 +00:00
## Each data format has its own unique set of configuration options, read
2016-04-22 21:47:26 +00:00
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
`
func (t *Tail) SampleConfig() string {
return sampleConfig
}
func (t *Tail) Description() string {
return "Stream a log file, like the tail -f command"
}
func (t *Tail) Gather(acc telegraf.Accumulator) error {
t.Lock()
defer t.Unlock()
return t.tailNewFiles(true)
2016-04-22 21:47:26 +00:00
}
func (t *Tail) Start(acc telegraf.Accumulator) error {
t.Lock()
defer t.Unlock()
t.acc = acc
t.tailers = make(map[string]*tail.Tail)
return t.tailNewFiles(t.FromBeginning)
}
2016-04-22 21:47:26 +00:00
func (t *Tail) tailNewFiles(fromBeginning bool) error {
var seek *tail.SeekInfo
if !t.Pipe && !fromBeginning {
seek = &tail.SeekInfo{
Whence: 2,
Offset: 0,
}
2016-04-22 21:47:26 +00:00
}
var poll bool
if t.WatchMethod == "poll" {
poll = true
}
2016-04-22 21:47:26 +00:00
// Create a "tailer" for each file
for _, filepath := range t.Files {
g, err := globpath.Compile(filepath)
if err != nil {
2017-03-24 19:03:36 +00:00
t.acc.AddError(fmt.Errorf("E! Error Glob %s failed to compile, %s", filepath, err))
2016-04-22 21:47:26 +00:00
}
2018-12-18 22:23:25 +00:00
for _, file := range g.Match() {
if _, ok := t.tailers[file]; ok {
// we're already tailing this file
continue
}
2016-04-22 21:47:26 +00:00
tailer, err := tail.TailFile(file,
tail.Config{
ReOpen: true,
Follow: true,
Location: seek,
MustExist: true,
Poll: poll,
Pipe: t.Pipe,
Logger: tail.DiscardingLogger,
2016-04-22 21:47:26 +00:00
})
if err != nil {
t.acc.AddError(err)
2016-04-22 21:47:26 +00:00
continue
}
log.Printf("D! [inputs.tail] tail added for file: %v", file)
parser, err := t.parserFunc()
if err != nil {
t.acc.AddError(fmt.Errorf("error creating parser: %v", err))
}
2016-04-22 21:47:26 +00:00
// create a goroutine for each "tailer"
t.wg.Add(1)
go t.receiver(parser, tailer)
t.tailers[tailer.Filename] = tailer
2016-04-22 21:47:26 +00:00
}
}
return nil
}
// this is launched as a goroutine to continuously watch a tailed logfile
// for changes, parse any incoming msgs, and add to the accumulator.
func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) {
2016-04-22 21:47:26 +00:00
defer t.wg.Done()
var firstLine = true
var metrics []telegraf.Metric
2016-04-22 21:47:26 +00:00
var m telegraf.Metric
var err error
var line *tail.Line
for line = range tailer.Lines {
if line.Err != nil {
2017-03-24 19:03:36 +00:00
t.acc.AddError(fmt.Errorf("E! Error tailing file %s, Error: %s\n",
tailer.Filename, err))
2016-04-22 21:47:26 +00:00
continue
}
// Fix up files with Windows line endings.
text := strings.TrimRight(line.Text, "\r")
if firstLine {
metrics, err = parser.Parse([]byte(text))
if err == nil {
if len(metrics) == 0 {
firstLine = false
continue
} else {
m = metrics[0]
}
}
firstLine = false
} else {
m, err = parser.ParseLine(text)
}
2016-04-22 21:47:26 +00:00
if err == nil {
if m != nil {
tags := m.Tags()
tags["path"] = tailer.Filename
t.acc.AddFields(m.Name(), m.Fields(), tags, m.Time())
}
2016-04-22 21:47:26 +00:00
} else {
2017-03-24 19:03:36 +00:00
t.acc.AddError(fmt.Errorf("E! Malformed log line in %s: [%s], Error: %s\n",
tailer.Filename, line.Text, err))
2016-04-22 21:47:26 +00:00
}
}
log.Printf("D! [inputs.tail] tail removed for file: %v", tailer.Filename)
if err := tailer.Err(); err != nil {
2017-03-24 19:03:36 +00:00
t.acc.AddError(fmt.Errorf("E! Error tailing file %s, Error: %s\n",
tailer.Filename, err))
}
2016-04-22 21:47:26 +00:00
}
func (t *Tail) Stop() {
t.Lock()
defer t.Unlock()
2017-03-24 19:03:36 +00:00
for _, tailer := range t.tailers {
err := tailer.Stop()
2016-04-22 21:47:26 +00:00
if err != nil {
2017-03-24 19:03:36 +00:00
t.acc.AddError(fmt.Errorf("E! Error stopping tail on file %s\n", tailer.Filename))
2016-04-22 21:47:26 +00:00
}
}
for _, tailer := range t.tailers {
2017-03-24 19:03:36 +00:00
tailer.Cleanup()
2016-04-22 21:47:26 +00:00
}
t.wg.Wait()
}
func (t *Tail) SetParserFunc(fn parsers.ParserFunc) {
t.parserFunc = fn
2016-04-22 21:47:26 +00:00
}
func init() {
inputs.Add("tail", func() telegraf.Input {
2016-04-26 16:43:41 +00:00
return NewTail()
2016-04-22 21:47:26 +00:00
})
}