2017-08-10 19:36:11 +00:00
|
|
|
// +build !solaris
|
|
|
|
|
2016-04-22 21:47:26 +00:00
|
|
|
package tail
|
|
|
|
|
|
|
|
import (
|
2017-06-16 20:16:48 +00:00
|
|
|
"strings"
|
2016-04-22 21:47:26 +00:00
|
|
|
"sync"
|
|
|
|
|
2017-03-29 21:25:33 +00:00
|
|
|
"github.com/influxdata/tail"
|
2016-04-22 21:47:26 +00:00
|
|
|
"github.com/influxdata/telegraf"
|
|
|
|
"github.com/influxdata/telegraf/internal/globpath"
|
|
|
|
"github.com/influxdata/telegraf/plugins/inputs"
|
|
|
|
"github.com/influxdata/telegraf/plugins/parsers"
|
2019-08-21 23:30:55 +00:00
|
|
|
"github.com/influxdata/telegraf/plugins/parsers/csv"
|
2016-04-22 21:47:26 +00:00
|
|
|
)
|
|
|
|
|
2017-09-11 18:56:04 +00:00
|
|
|
const (
|
|
|
|
defaultWatchMethod = "inotify"
|
|
|
|
)
|
|
|
|
|
2019-07-12 00:39:59 +00:00
|
|
|
var (
|
|
|
|
offsets = make(map[string]int64)
|
|
|
|
offsetsMutex = new(sync.Mutex)
|
|
|
|
)
|
|
|
|
|
2016-04-22 21:47:26 +00:00
|
|
|
type Tail struct {
|
|
|
|
Files []string
|
|
|
|
FromBeginning bool
|
2016-12-16 14:01:49 +00:00
|
|
|
Pipe bool
|
2017-09-11 18:56:04 +00:00
|
|
|
WatchMethod string
|
2016-04-22 21:47:26 +00:00
|
|
|
|
2019-09-23 22:39:50 +00:00
|
|
|
Log telegraf.Logger
|
|
|
|
|
2018-09-18 16:23:45 +00:00
|
|
|
tailers map[string]*tail.Tail
|
2019-07-12 00:39:59 +00:00
|
|
|
offsets map[string]int64
|
2018-09-18 16:23:45 +00:00
|
|
|
parserFunc parsers.ParserFunc
|
|
|
|
wg sync.WaitGroup
|
|
|
|
acc telegraf.Accumulator
|
2016-04-22 21:47:26 +00:00
|
|
|
|
|
|
|
sync.Mutex
|
|
|
|
}
|
|
|
|
|
2016-04-26 16:43:41 +00:00
|
|
|
func NewTail() *Tail {
|
2019-07-12 00:39:59 +00:00
|
|
|
offsetsMutex.Lock()
|
|
|
|
offsetsCopy := make(map[string]int64, len(offsets))
|
|
|
|
for k, v := range offsets {
|
|
|
|
offsetsCopy[k] = v
|
|
|
|
}
|
|
|
|
offsetsMutex.Unlock()
|
|
|
|
|
2016-04-26 16:43:41 +00:00
|
|
|
return &Tail{
|
|
|
|
FromBeginning: false,
|
2019-07-12 00:39:59 +00:00
|
|
|
offsets: offsetsCopy,
|
2016-04-26 16:43:41 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-04-22 21:47:26 +00:00
|
|
|
const sampleConfig = `
|
|
|
|
## files to tail.
|
|
|
|
## These accept standard unix glob matching rules, but with the addition of
|
|
|
|
## ** as a "super asterisk". ie:
|
|
|
|
## "/var/log/**.log" -> recursively find all .log files in /var/log
|
|
|
|
## "/var/log/*/*.log" -> find all .log files with a parent dir in /var/log
|
|
|
|
## "/var/log/apache.log" -> just tail the apache log file
|
|
|
|
##
|
|
|
|
## See https://github.com/gobwas/glob for more examples
|
|
|
|
##
|
|
|
|
files = ["/var/mymetrics.out"]
|
|
|
|
## Read file from beginning.
|
|
|
|
from_beginning = false
|
2016-12-16 14:01:49 +00:00
|
|
|
## Whether file is a named pipe
|
|
|
|
pipe = false
|
2016-04-22 21:47:26 +00:00
|
|
|
|
2017-09-11 18:56:04 +00:00
|
|
|
## Method used to watch for file updates. Can be either "inotify" or "poll".
|
|
|
|
# watch_method = "inotify"
|
|
|
|
|
2016-04-22 21:47:26 +00:00
|
|
|
## Data format to consume.
|
2017-04-27 21:59:18 +00:00
|
|
|
## Each data format has its own unique set of configuration options, read
|
2016-04-22 21:47:26 +00:00
|
|
|
## more about them here:
|
|
|
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
|
|
|
data_format = "influx"
|
|
|
|
`
|
|
|
|
|
|
|
|
func (t *Tail) SampleConfig() string {
|
|
|
|
return sampleConfig
|
|
|
|
}
|
|
|
|
|
|
|
|
func (t *Tail) Description() string {
|
|
|
|
return "Stream a log file, like the tail -f command"
|
|
|
|
}
|
|
|
|
|
|
|
|
func (t *Tail) Gather(acc telegraf.Accumulator) error {
|
2018-09-17 23:20:50 +00:00
|
|
|
t.Lock()
|
|
|
|
defer t.Unlock()
|
|
|
|
|
|
|
|
return t.tailNewFiles(true)
|
2016-04-22 21:47:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (t *Tail) Start(acc telegraf.Accumulator) error {
|
|
|
|
t.Lock()
|
|
|
|
defer t.Unlock()
|
|
|
|
|
|
|
|
t.acc = acc
|
2018-09-17 23:20:50 +00:00
|
|
|
t.tailers = make(map[string]*tail.Tail)
|
|
|
|
|
2019-07-12 00:39:59 +00:00
|
|
|
err := t.tailNewFiles(t.FromBeginning)
|
|
|
|
|
|
|
|
// clear offsets
|
|
|
|
t.offsets = make(map[string]int64)
|
|
|
|
// assumption that once Start is called, all parallel plugins have already been initialized
|
|
|
|
offsetsMutex.Lock()
|
|
|
|
offsets = make(map[string]int64)
|
|
|
|
offsetsMutex.Unlock()
|
|
|
|
|
|
|
|
return err
|
2018-09-17 23:20:50 +00:00
|
|
|
}
|
2016-04-22 21:47:26 +00:00
|
|
|
|
2018-09-17 23:20:50 +00:00
|
|
|
func (t *Tail) tailNewFiles(fromBeginning bool) error {
|
2017-09-11 18:56:04 +00:00
|
|
|
var poll bool
|
|
|
|
if t.WatchMethod == "poll" {
|
|
|
|
poll = true
|
|
|
|
}
|
|
|
|
|
2016-04-22 21:47:26 +00:00
|
|
|
// Create a "tailer" for each file
|
|
|
|
for _, filepath := range t.Files {
|
|
|
|
g, err := globpath.Compile(filepath)
|
|
|
|
if err != nil {
|
2019-09-23 22:39:50 +00:00
|
|
|
t.Log.Errorf("Glob %q failed to compile: %s", filepath, err.Error())
|
2016-04-22 21:47:26 +00:00
|
|
|
}
|
2018-12-18 22:23:25 +00:00
|
|
|
for _, file := range g.Match() {
|
2018-09-17 23:20:50 +00:00
|
|
|
if _, ok := t.tailers[file]; ok {
|
|
|
|
// we're already tailing this file
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2019-07-12 00:39:59 +00:00
|
|
|
var seek *tail.SeekInfo
|
|
|
|
if !t.Pipe && !fromBeginning {
|
|
|
|
if offset, ok := t.offsets[file]; ok {
|
2019-09-23 22:39:50 +00:00
|
|
|
t.Log.Debugf("Using offset %d for %q", offset, file)
|
2019-07-12 00:39:59 +00:00
|
|
|
seek = &tail.SeekInfo{
|
|
|
|
Whence: 0,
|
|
|
|
Offset: offset,
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
seek = &tail.SeekInfo{
|
|
|
|
Whence: 2,
|
|
|
|
Offset: 0,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-04-22 21:47:26 +00:00
|
|
|
tailer, err := tail.TailFile(file,
|
|
|
|
tail.Config{
|
2016-07-18 13:44:25 +00:00
|
|
|
ReOpen: true,
|
|
|
|
Follow: true,
|
2016-12-16 14:01:49 +00:00
|
|
|
Location: seek,
|
2016-07-18 13:44:25 +00:00
|
|
|
MustExist: true,
|
2017-09-11 18:56:04 +00:00
|
|
|
Poll: poll,
|
2016-12-16 14:01:49 +00:00
|
|
|
Pipe: t.Pipe,
|
2017-08-16 19:06:07 +00:00
|
|
|
Logger: tail.DiscardingLogger,
|
2016-04-22 21:47:26 +00:00
|
|
|
})
|
|
|
|
if err != nil {
|
2018-09-17 23:20:50 +00:00
|
|
|
t.acc.AddError(err)
|
2016-04-22 21:47:26 +00:00
|
|
|
continue
|
|
|
|
}
|
2018-09-18 16:23:45 +00:00
|
|
|
|
2019-09-23 22:39:50 +00:00
|
|
|
t.Log.Debugf("Tail added for %q", file)
|
2018-09-18 16:23:45 +00:00
|
|
|
|
|
|
|
parser, err := t.parserFunc()
|
|
|
|
if err != nil {
|
2019-09-23 22:39:50 +00:00
|
|
|
t.Log.Errorf("Creating parser: %s", err.Error())
|
2018-09-18 16:23:45 +00:00
|
|
|
}
|
|
|
|
|
2016-04-22 21:47:26 +00:00
|
|
|
// create a goroutine for each "tailer"
|
2016-05-24 14:16:33 +00:00
|
|
|
t.wg.Add(1)
|
2019-08-21 23:30:55 +00:00
|
|
|
go func() {
|
|
|
|
defer t.wg.Done()
|
|
|
|
t.receiver(parser, tailer)
|
|
|
|
}()
|
2018-09-18 16:23:45 +00:00
|
|
|
t.tailers[tailer.Filename] = tailer
|
2016-04-22 21:47:26 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2019-08-21 23:30:55 +00:00
|
|
|
// ParseLine parses a line of text.
|
|
|
|
func parseLine(parser parsers.Parser, line string, firstLine bool) ([]telegraf.Metric, error) {
|
|
|
|
switch parser.(type) {
|
|
|
|
case *csv.Parser:
|
|
|
|
// The csv parser parses headers in Parse and skips them in ParseLine.
|
|
|
|
// As a temporary solution call Parse only when getting the first
|
|
|
|
// line from the file.
|
|
|
|
if firstLine {
|
|
|
|
return parser.Parse([]byte(line))
|
|
|
|
} else {
|
|
|
|
m, err := parser.ParseLine(line)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if m != nil {
|
|
|
|
return []telegraf.Metric{m}, nil
|
|
|
|
}
|
|
|
|
return []telegraf.Metric{}, nil
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
return parser.Parse([]byte(line))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Receiver is launched as a goroutine to continuously watch a tailed logfile
|
2016-04-22 21:47:26 +00:00
|
|
|
// for changes, parse any incoming msgs, and add to the accumulator.
|
2018-09-18 16:23:45 +00:00
|
|
|
func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) {
|
|
|
|
var firstLine = true
|
2019-08-21 23:30:55 +00:00
|
|
|
for line := range tailer.Lines {
|
2016-04-22 21:47:26 +00:00
|
|
|
if line.Err != nil {
|
2019-09-23 22:39:50 +00:00
|
|
|
t.Log.Errorf("Tailing %q: %s", tailer.Filename, line.Err.Error())
|
2016-04-22 21:47:26 +00:00
|
|
|
continue
|
|
|
|
}
|
2017-06-16 20:16:48 +00:00
|
|
|
// Fix up files with Windows line endings.
|
|
|
|
text := strings.TrimRight(line.Text, "\r")
|
|
|
|
|
2019-08-21 23:30:55 +00:00
|
|
|
metrics, err := parseLine(parser, text, firstLine)
|
|
|
|
if err != nil {
|
2019-09-23 22:39:50 +00:00
|
|
|
t.Log.Errorf("Malformed log line in %q: [%q]: %s",
|
|
|
|
tailer.Filename, line.Text, err.Error())
|
2019-08-21 23:30:55 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
firstLine = false
|
|
|
|
|
|
|
|
for _, metric := range metrics {
|
|
|
|
metric.AddTag("path", tailer.Filename)
|
|
|
|
t.acc.AddMetric(metric)
|
2016-04-22 21:47:26 +00:00
|
|
|
}
|
|
|
|
}
|
2018-09-18 16:23:45 +00:00
|
|
|
|
2019-09-23 22:39:50 +00:00
|
|
|
t.Log.Debugf("Tail removed for %q", tailer.Filename)
|
2018-09-18 16:23:45 +00:00
|
|
|
|
2016-12-16 14:01:49 +00:00
|
|
|
if err := tailer.Err(); err != nil {
|
2019-09-23 22:39:50 +00:00
|
|
|
t.Log.Errorf("Tailing %q: %s", tailer.Filename, err.Error())
|
2016-12-16 14:01:49 +00:00
|
|
|
}
|
2016-04-22 21:47:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (t *Tail) Stop() {
|
|
|
|
t.Lock()
|
|
|
|
defer t.Unlock()
|
|
|
|
|
2017-03-24 19:03:36 +00:00
|
|
|
for _, tailer := range t.tailers {
|
2019-07-12 00:39:59 +00:00
|
|
|
if !t.Pipe && !t.FromBeginning {
|
|
|
|
// store offset for resume
|
|
|
|
offset, err := tailer.Tell()
|
|
|
|
if err == nil {
|
2019-09-23 22:39:50 +00:00
|
|
|
t.Log.Debugf("Recording offset %d for %q", offset, tailer.Filename)
|
2019-07-12 00:39:59 +00:00
|
|
|
} else {
|
2019-09-23 22:39:50 +00:00
|
|
|
t.Log.Errorf("Recording offset for %q: %s", tailer.Filename, err.Error())
|
2019-07-12 00:39:59 +00:00
|
|
|
}
|
|
|
|
}
|
2017-03-24 19:03:36 +00:00
|
|
|
err := tailer.Stop()
|
2016-04-22 21:47:26 +00:00
|
|
|
if err != nil {
|
2019-09-23 22:39:50 +00:00
|
|
|
t.Log.Errorf("Stopping tail on %q: %s", tailer.Filename, err.Error())
|
2016-04-22 21:47:26 +00:00
|
|
|
}
|
2018-09-18 16:23:45 +00:00
|
|
|
}
|
|
|
|
|
2016-04-22 21:47:26 +00:00
|
|
|
t.wg.Wait()
|
2019-07-12 00:39:59 +00:00
|
|
|
|
|
|
|
// persist offsets
|
|
|
|
offsetsMutex.Lock()
|
|
|
|
for k, v := range t.offsets {
|
|
|
|
offsets[k] = v
|
|
|
|
}
|
|
|
|
offsetsMutex.Unlock()
|
2016-04-22 21:47:26 +00:00
|
|
|
}
|
|
|
|
|
2018-09-18 16:23:45 +00:00
|
|
|
func (t *Tail) SetParserFunc(fn parsers.ParserFunc) {
|
|
|
|
t.parserFunc = fn
|
2016-04-22 21:47:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func init() {
|
|
|
|
inputs.Add("tail", func() telegraf.Input {
|
2016-04-26 16:43:41 +00:00
|
|
|
return NewTail()
|
2016-04-22 21:47:26 +00:00
|
|
|
})
|
|
|
|
}
|