Add polling method to logparser and tail inputs (#3213)

This commit is contained in:
Daniel Nelson 2017-09-11 11:56:04 -07:00 committed by GitHub
parent cb40972635
commit b06e2a0c3d
4 changed files with 42 additions and 6 deletions

View File

@ -21,6 +21,9 @@ regex patterns.
## be read from the beginning. ## be read from the beginning.
from_beginning = false from_beginning = false
## Method used to watch for file updates. Can be either "inotify" or "poll".
# watch_method = "inotify"
## Parse logstash-style "grok" patterns: ## Parse logstash-style "grok" patterns:
## Telegraf built-in parsing patterns: https://goo.gl/dkay10 ## Telegraf built-in parsing patterns: https://goo.gl/dkay10
[inputs.logparser.grok] [inputs.logparser.grok]

View File

@ -19,6 +19,10 @@ import (
"github.com/influxdata/telegraf/plugins/inputs/logparser/grok" "github.com/influxdata/telegraf/plugins/inputs/logparser/grok"
) )
const (
defaultWatchMethod = "inotify"
)
// LogParser in the primary interface for the plugin // LogParser in the primary interface for the plugin
type LogParser interface { type LogParser interface {
ParseLine(line string) (telegraf.Metric, error) ParseLine(line string) (telegraf.Metric, error)
@ -34,6 +38,7 @@ type logEntry struct {
type LogParserPlugin struct { type LogParserPlugin struct {
Files []string Files []string
FromBeginning bool FromBeginning bool
WatchMethod string
tailers map[string]*tail.Tail tailers map[string]*tail.Tail
lines chan logEntry lines chan logEntry
@ -61,6 +66,9 @@ const sampleConfig = `
## be read from the beginning. ## be read from the beginning.
from_beginning = false from_beginning = false
## Method used to watch for file updates. Can be either "inotify" or "poll".
# watch_method = "inotify"
## Parse logstash-style "grok" patterns: ## Parse logstash-style "grok" patterns:
## Telegraf built-in parsing patterns: https://goo.gl/dkay10 ## Telegraf built-in parsing patterns: https://goo.gl/dkay10
[inputs.logparser.grok] [inputs.logparser.grok]
@ -167,6 +175,11 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
seek.Offset = 0 seek.Offset = 0
} }
var poll bool
if l.WatchMethod == "poll" {
poll = true
}
// Create a "tailer" for each file // Create a "tailer" for each file
for _, filepath := range l.Files { for _, filepath := range l.Files {
g, err := globpath.Compile(filepath) g, err := globpath.Compile(filepath)
@ -188,6 +201,7 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
Follow: true, Follow: true,
Location: &seek, Location: &seek,
MustExist: true, MustExist: true,
Poll: poll,
Logger: tail.DiscardingLogger, Logger: tail.DiscardingLogger,
}) })
if err != nil { if err != nil {
@ -285,6 +299,8 @@ func (l *LogParserPlugin) Stop() {
func init() { func init() {
inputs.Add("logparser", func() telegraf.Input { inputs.Add("logparser", func() telegraf.Input {
return &LogParserPlugin{} return &LogParserPlugin{
WatchMethod: defaultWatchMethod,
}
}) })
} }

View File

@ -39,6 +39,9 @@ The plugin expects messages in one of the
## Whether file is a named pipe ## Whether file is a named pipe
pipe = false pipe = false
## Method used to watch for file updates. Can be either "inotify" or "poll".
# watch_method = "inotify"
## Data format to consume. ## Data format to consume.
## Each data format has its own unique set of configuration options, read ## Each data format has its own unique set of configuration options, read
## more about them here: ## more about them here:

View File

@ -15,10 +15,15 @@ import (
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
) )
const (
defaultWatchMethod = "inotify"
)
type Tail struct { type Tail struct {
Files []string Files []string
FromBeginning bool FromBeginning bool
Pipe bool Pipe bool
WatchMethod string
tailers []*tail.Tail tailers []*tail.Tail
parser parsers.Parser parser parsers.Parser
@ -50,6 +55,9 @@ const sampleConfig = `
## Whether file is a named pipe ## Whether file is a named pipe
pipe = false pipe = false
## Method used to watch for file updates. Can be either "inotify" or "poll".
# watch_method = "inotify"
## Data format to consume. ## Data format to consume.
## Each data format has its own unique set of configuration options, read ## Each data format has its own unique set of configuration options, read
## more about them here: ## more about them here:
@ -83,6 +91,11 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {
} }
} }
var poll bool
if t.WatchMethod == "poll" {
poll = true
}
// Create a "tailer" for each file // Create a "tailer" for each file
for _, filepath := range t.Files { for _, filepath := range t.Files {
g, err := globpath.Compile(filepath) g, err := globpath.Compile(filepath)
@ -96,6 +109,7 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {
Follow: true, Follow: true,
Location: seek, Location: seek,
MustExist: true, MustExist: true,
Poll: poll,
Pipe: t.Pipe, Pipe: t.Pipe,
Logger: tail.DiscardingLogger, Logger: tail.DiscardingLogger,
}) })