diff --git a/plugins/inputs/tail/README.md b/plugins/inputs/tail/README.md index 27cb6418e..e9f9cc8cb 100644 --- a/plugins/inputs/tail/README.md +++ b/plugins/inputs/tail/README.md @@ -19,12 +19,11 @@ see http://man7.org/linux/man-pages/man1/tail.1.html for more details. The plugin expects messages in one of the [Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md). -### Configuration: +### Configuration ```toml -# Stream a log file, like the tail -f command [[inputs.tail]] - ## files to tail. + ## File names or a pattern to tail. ## These accept standard unix glob matching rules, but with the addition of ## ** as a "super asterisk". ie: ## "/var/log/**.log" -> recursively find all .log files in /var/log @@ -34,14 +33,21 @@ The plugin expects messages in one of the ## See https://github.com/gobwas/glob for more examples ## files = ["/var/mymetrics.out"] + ## Read file from beginning. - from_beginning = false + # from_beginning = false + ## Whether file is a named pipe - pipe = false + # pipe = false ## Method used to watch for file updates. Can be either "inotify" or "poll". # watch_method = "inotify" + ## Maximum lines of the file to process that have not yet be written by the + ## output. For best throughput set based on the number of metrics on each + ## line and the size of the output's metric_batch_size. + # max_undelivered_lines = 1000 + ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: @@ -49,7 +55,7 @@ The plugin expects messages in one of the data_format = "influx" ``` -### Metrics: +### Metrics Metrics are produced according to the `data_format` option. Additionally a tag labeled `path` is added to the metric containing the filename being tailed. diff --git a/plugins/inputs/tail/tail.go b/plugins/inputs/tail/tail.go index db4d56424..9e7d6ecf1 100644 --- a/plugins/inputs/tail/tail.go +++ b/plugins/inputs/tail/tail.go @@ -3,6 +3,8 @@ package tail import ( + "context" + "errors" "strings" "sync" @@ -15,7 +17,8 @@ import ( ) const ( - defaultWatchMethod = "inotify" + defaultWatchMethod = "inotify" + defaultMaxUndeliveredLines = 1000 ) var ( @@ -23,21 +26,25 @@ var ( offsetsMutex = new(sync.Mutex) ) +type empty struct{} +type semaphore chan empty + type Tail struct { - Files []string - FromBeginning bool - Pipe bool - WatchMethod string - - Log telegraf.Logger + Files []string `toml:"files"` + FromBeginning bool `toml:"from_beginning"` + Pipe bool `toml:"pipe"` + WatchMethod string `toml:"watch_method"` + MaxUndeliveredLines int `toml:"max_undelivered_lines"` + Log telegraf.Logger `toml:"-"` tailers map[string]*tail.Tail offsets map[string]int64 parserFunc parsers.ParserFunc wg sync.WaitGroup - acc telegraf.Accumulator - - sync.Mutex + ctx context.Context + cancel context.CancelFunc + acc telegraf.TrackingAccumulator + sem semaphore } func NewTail() *Tail { @@ -49,13 +56,14 @@ func NewTail() *Tail { offsetsMutex.Unlock() return &Tail{ - FromBeginning: false, - offsets: offsetsCopy, + FromBeginning: false, + MaxUndeliveredLines: 1000, + offsets: offsetsCopy, } } const sampleConfig = ` - ## files to tail. + ## File names or a pattern to tail. ## These accept standard unix glob matching rules, but with the addition of ## ** as a "super asterisk". ie: ## "/var/log/**.log" -> recursively find all .log files in /var/log @@ -65,14 +73,21 @@ const sampleConfig = ` ## See https://github.com/gobwas/glob for more examples ## files = ["/var/mymetrics.out"] + ## Read file from beginning. - from_beginning = false + # from_beginning = false + ## Whether file is a named pipe - pipe = false + # pipe = false ## Method used to watch for file updates. Can be either "inotify" or "poll". # watch_method = "inotify" + ## Maximum lines of the file to process that have not yet be written by the + ## output. For best throughput set based on the number of metrics on each + ## line and the size of the output's metric_batch_size. + # max_undelivered_lines = 1000 + ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: @@ -88,18 +103,36 @@ func (t *Tail) Description() string { return "Stream a log file, like the tail -f command" } -func (t *Tail) Gather(acc telegraf.Accumulator) error { - t.Lock() - defer t.Unlock() +func (t *Tail) Init() error { + if t.MaxUndeliveredLines == 0 { + return errors.New("max_undelivered_lines must be positive") + } + t.sem = make(semaphore, t.MaxUndeliveredLines) + return nil +} +func (t *Tail) Gather(acc telegraf.Accumulator) error { return t.tailNewFiles(true) } func (t *Tail) Start(acc telegraf.Accumulator) error { - t.Lock() - defer t.Unlock() + t.acc = acc.WithTracking(t.MaxUndeliveredLines) + + t.ctx, t.cancel = context.WithCancel(context.Background()) + + t.wg.Add(1) + go func() { + defer t.wg.Done() + for { + select { + case <-t.ctx.Done(): + return + case <-t.acc.Delivered(): + <-t.sem + } + } + }() - t.acc = acc t.tailers = make(map[string]*tail.Tail) err := t.tailNewFiles(t.FromBeginning) @@ -175,6 +208,12 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error { go func() { defer t.wg.Done() t.receiver(parser, tailer) + + t.Log.Debugf("Tail removed for %q", tailer.Filename) + + if err := tailer.Err(); err != nil { + t.Log.Errorf("Tailing %q: %s", tailer.Filename, err.Error()) + } }() t.tailers[tailer.Filename] = tailer } @@ -229,21 +268,19 @@ func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) { for _, metric := range metrics { metric.AddTag("path", tailer.Filename) - t.acc.AddMetric(metric) } - } - t.Log.Debugf("Tail removed for %q", tailer.Filename) - - if err := tailer.Err(); err != nil { - t.Log.Errorf("Tailing %q: %s", tailer.Filename, err.Error()) + // Block until plugin is stopping or room is available to add metrics. + select { + case <-t.ctx.Done(): + return + case t.sem <- empty{}: + t.acc.AddTrackingMetricGroup(metrics) + } } } func (t *Tail) Stop() { - t.Lock() - defer t.Unlock() - for _, tailer := range t.tailers { if !t.Pipe && !t.FromBeginning { // store offset for resume @@ -260,6 +297,7 @@ func (t *Tail) Stop() { } } + t.cancel() t.wg.Wait() // persist offsets diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go index 4b96e092f..88d63f723 100644 --- a/plugins/inputs/tail/tail_test.go +++ b/plugins/inputs/tail/tail_test.go @@ -26,6 +26,7 @@ func TestTailFromBeginning(t *testing.T) { tmpfile, err := ioutil.TempFile("", "") require.NoError(t, err) defer os.Remove(tmpfile.Name()) + defer tmpfile.Close() _, err = tmpfile.WriteString("cpu,mytag=foo usage_idle=100\n") require.NoError(t, err) @@ -34,11 +35,13 @@ func TestTailFromBeginning(t *testing.T) { tt.FromBeginning = true tt.Files = []string{tmpfile.Name()} tt.SetParserFunc(parsers.NewInfluxParser) - defer tt.Stop() - defer tmpfile.Close() + + err = tt.Init() + require.NoError(t, err) acc := testutil.Accumulator{} require.NoError(t, tt.Start(&acc)) + defer tt.Stop() require.NoError(t, acc.GatherError(tt.Gather)) acc.Wait(1) @@ -60,6 +63,7 @@ func TestTailFromEnd(t *testing.T) { tmpfile, err := ioutil.TempFile("", "") require.NoError(t, err) defer os.Remove(tmpfile.Name()) + defer tmpfile.Close() _, err = tmpfile.WriteString("cpu,mytag=foo usage_idle=100\n") require.NoError(t, err) @@ -67,11 +71,13 @@ func TestTailFromEnd(t *testing.T) { tt.Log = testutil.Logger{} tt.Files = []string{tmpfile.Name()} tt.SetParserFunc(parsers.NewInfluxParser) - defer tt.Stop() - defer tmpfile.Close() + + err = tt.Init() + require.NoError(t, err) acc := testutil.Accumulator{} require.NoError(t, tt.Start(&acc)) + defer tt.Stop() for _, tailer := range tt.tailers { for n, err := tailer.Tell(); err == nil && n == 0; n, err = tailer.Tell() { // wait for tailer to jump to end @@ -99,17 +105,20 @@ func TestTailBadLine(t *testing.T) { tmpfile, err := ioutil.TempFile("", "") require.NoError(t, err) defer os.Remove(tmpfile.Name()) + defer tmpfile.Close() tt := NewTail() tt.Log = testutil.Logger{} tt.FromBeginning = true tt.Files = []string{tmpfile.Name()} tt.SetParserFunc(parsers.NewInfluxParser) - defer tt.Stop() - defer tmpfile.Close() + + err = tt.Init() + require.NoError(t, err) acc := testutil.Accumulator{} require.NoError(t, tt.Start(&acc)) + defer tt.Stop() buf := &bytes.Buffer{} log.SetOutput(buf) @@ -127,6 +136,7 @@ func TestTailDosLineendings(t *testing.T) { tmpfile, err := ioutil.TempFile("", "") require.NoError(t, err) defer os.Remove(tmpfile.Name()) + defer tmpfile.Close() _, err = tmpfile.WriteString("cpu usage_idle=100\r\ncpu2 usage_idle=200\r\n") require.NoError(t, err) @@ -135,11 +145,13 @@ func TestTailDosLineendings(t *testing.T) { tt.FromBeginning = true tt.Files = []string{tmpfile.Name()} tt.SetParserFunc(parsers.NewInfluxParser) - defer tt.Stop() - defer tmpfile.Close() + + err = tt.Init() + require.NoError(t, err) acc := testutil.Accumulator{} require.NoError(t, tt.Start(&acc)) + defer tt.Stop() require.NoError(t, acc.GatherError(tt.Gather)) acc.Wait(2) @@ -180,11 +192,14 @@ cpu,42 TimeFunc: func() time.Time { return time.Unix(0, 0) }, }, nil }) - defer plugin.Stop() + + err = plugin.Init() + require.NoError(t, err) acc := testutil.Accumulator{} err = plugin.Start(&acc) require.NoError(t, err) + defer plugin.Stop() err = plugin.Gather(&acc) require.NoError(t, err) acc.Wait(2) @@ -237,11 +252,14 @@ func TestMultipleMetricsOnFirstLine(t *testing.T) { MetricName: "cpu", }) }) - defer plugin.Stop() + + err = plugin.Init() + require.NoError(t, err) acc := testutil.Accumulator{} err = plugin.Start(&acc) require.NoError(t, err) + defer plugin.Stop() err = plugin.Gather(&acc) require.NoError(t, err) acc.Wait(2)