254 lines
5.9 KiB
Go
254 lines
5.9 KiB
Go
package logparser
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"reflect"
|
|
"sync"
|
|
|
|
"github.com/hpcloud/tail"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/internal/errchan"
|
|
"github.com/influxdata/telegraf/internal/globpath"
|
|
"github.com/influxdata/telegraf/registry/inputs"
|
|
|
|
// Parsers
|
|
"github.com/influxdata/telegraf/plugins/inputs/logparser/grok"
|
|
)
|
|
|
|
type LogParser interface {
|
|
ParseLine(line string) (telegraf.Metric, error)
|
|
Compile() error
|
|
}
|
|
|
|
type LogParserPlugin struct {
|
|
Files []string
|
|
FromBeginning bool
|
|
|
|
tailers map[string]*tail.Tail
|
|
lines chan string
|
|
done chan struct{}
|
|
wg sync.WaitGroup
|
|
acc telegraf.Accumulator
|
|
parsers []LogParser
|
|
|
|
sync.Mutex
|
|
|
|
GrokParser *grok.Parser `toml:"grok"`
|
|
}
|
|
|
|
const sampleConfig = `
|
|
## Log files to parse.
|
|
## These accept standard unix glob matching rules, but with the addition of
|
|
## ** as a "super asterisk". ie:
|
|
## /var/log/**.log -> recursively find all .log files in /var/log
|
|
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
|
|
## /var/log/apache.log -> only tail the apache log file
|
|
files = ["/var/log/apache/access.log"]
|
|
## Read files that currently exist from the beginning. Files that are created
|
|
## while telegraf is running (and that match the "files" globs) will always
|
|
## be read from the beginning.
|
|
from_beginning = false
|
|
|
|
## Parse logstash-style "grok" patterns:
|
|
## Telegraf built-in parsing patterns: https://goo.gl/dkay10
|
|
[inputs.logparser.grok]
|
|
## This is a list of patterns to check the given log file(s) for.
|
|
## Note that adding patterns here increases processing time. The most
|
|
## efficient configuration is to have one pattern per logparser.
|
|
## Other common built-in patterns are:
|
|
## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs)
|
|
## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent)
|
|
patterns = ["%{COMBINED_LOG_FORMAT}"]
|
|
## Name of the outputted measurement name.
|
|
measurement = "apache_access_log"
|
|
## Full path(s) to custom pattern files.
|
|
custom_pattern_files = []
|
|
## Custom patterns can also be defined here. Put one pattern per line.
|
|
custom_patterns = '''
|
|
'''
|
|
`
|
|
|
|
func (l *LogParserPlugin) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
func (l *LogParserPlugin) Description() string {
|
|
return "Stream and parse log file(s)."
|
|
}
|
|
|
|
func (l *LogParserPlugin) Gather(acc telegraf.Accumulator) error {
|
|
l.Lock()
|
|
defer l.Unlock()
|
|
|
|
// always start from the beginning of files that appear while we're running
|
|
return l.tailNewfiles(true)
|
|
}
|
|
|
|
func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
|
|
l.Lock()
|
|
defer l.Unlock()
|
|
|
|
l.acc = acc
|
|
l.lines = make(chan string, 1000)
|
|
l.done = make(chan struct{})
|
|
l.tailers = make(map[string]*tail.Tail)
|
|
|
|
// Looks for fields which implement LogParser interface
|
|
l.parsers = []LogParser{}
|
|
s := reflect.ValueOf(l).Elem()
|
|
for i := 0; i < s.NumField(); i++ {
|
|
f := s.Field(i)
|
|
|
|
if !f.CanInterface() {
|
|
continue
|
|
}
|
|
|
|
if lpPlugin, ok := f.Interface().(LogParser); ok {
|
|
if reflect.ValueOf(lpPlugin).IsNil() {
|
|
continue
|
|
}
|
|
l.parsers = append(l.parsers, lpPlugin)
|
|
}
|
|
}
|
|
|
|
if len(l.parsers) == 0 {
|
|
return fmt.Errorf("ERROR: logparser input plugin: no parser defined.")
|
|
}
|
|
|
|
// compile log parser patterns:
|
|
errChan := errchan.New(len(l.parsers))
|
|
for _, parser := range l.parsers {
|
|
if err := parser.Compile(); err != nil {
|
|
errChan.C <- err
|
|
}
|
|
}
|
|
if err := errChan.Error(); err != nil {
|
|
return err
|
|
}
|
|
|
|
l.wg.Add(1)
|
|
go l.parser()
|
|
|
|
return l.tailNewfiles(l.FromBeginning)
|
|
}
|
|
|
|
// check the globs against files on disk, and start tailing any new files.
|
|
// Assumes l's lock is held!
|
|
func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
|
|
var seek tail.SeekInfo
|
|
if !fromBeginning {
|
|
seek.Whence = 2
|
|
seek.Offset = 0
|
|
}
|
|
|
|
errChan := errchan.New(len(l.Files))
|
|
|
|
// Create a "tailer" for each file
|
|
for _, filepath := range l.Files {
|
|
g, err := globpath.Compile(filepath)
|
|
if err != nil {
|
|
log.Printf("E! Error Glob %s failed to compile, %s", filepath, err)
|
|
continue
|
|
}
|
|
files := g.Match()
|
|
errChan = errchan.New(len(files))
|
|
|
|
for file, _ := range files {
|
|
if _, ok := l.tailers[file]; ok {
|
|
// we're already tailing this file
|
|
continue
|
|
}
|
|
|
|
tailer, err := tail.TailFile(file,
|
|
tail.Config{
|
|
ReOpen: true,
|
|
Follow: true,
|
|
Location: &seek,
|
|
MustExist: true,
|
|
})
|
|
errChan.C <- err
|
|
|
|
// create a goroutine for each "tailer"
|
|
l.wg.Add(1)
|
|
go l.receiver(tailer)
|
|
l.tailers[file] = tailer
|
|
}
|
|
}
|
|
|
|
return errChan.Error()
|
|
}
|
|
|
|
// receiver is launched as a goroutine to continuously watch a tailed logfile
|
|
// for changes and send any log lines down the l.lines channel.
|
|
func (l *LogParserPlugin) receiver(tailer *tail.Tail) {
|
|
defer l.wg.Done()
|
|
|
|
var line *tail.Line
|
|
for line = range tailer.Lines {
|
|
|
|
if line.Err != nil {
|
|
log.Printf("E! Error tailing file %s, Error: %s\n",
|
|
tailer.Filename, line.Err)
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case <-l.done:
|
|
case l.lines <- line.Text:
|
|
}
|
|
}
|
|
}
|
|
|
|
// parser is launched as a goroutine to watch the l.lines channel.
|
|
// when a line is available, parser parses it and adds the metric(s) to the
|
|
// accumulator.
|
|
func (l *LogParserPlugin) parser() {
|
|
defer l.wg.Done()
|
|
|
|
var m telegraf.Metric
|
|
var err error
|
|
var line string
|
|
for {
|
|
select {
|
|
case <-l.done:
|
|
return
|
|
case line = <-l.lines:
|
|
if line == "" || line == "\n" {
|
|
continue
|
|
}
|
|
}
|
|
|
|
for _, parser := range l.parsers {
|
|
m, err = parser.ParseLine(line)
|
|
if err == nil {
|
|
if m != nil {
|
|
l.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (l *LogParserPlugin) Stop() {
|
|
l.Lock()
|
|
defer l.Unlock()
|
|
|
|
for _, t := range l.tailers {
|
|
err := t.Stop()
|
|
if err != nil {
|
|
log.Printf("E! Error stopping tail on file %s\n", t.Filename)
|
|
}
|
|
t.Cleanup()
|
|
}
|
|
close(l.done)
|
|
l.wg.Wait()
|
|
}
|
|
|
|
func init() {
|
|
inputs.Add("logparser", func() telegraf.Input {
|
|
return &LogParserPlugin{}
|
|
})
|
|
}
|