Add limit to number of undelivered lines to read ahead in tail (#7210)

This commit is contained in:
Daniel Nelson 2020-03-27 15:40:08 -07:00 committed by GitHub
parent 608e818645
commit 9a1c26d6cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 108 additions and 46 deletions

View File

@ -19,12 +19,11 @@ see http://man7.org/linux/man-pages/man1/tail.1.html for more details.
The plugin expects messages in one of the
[Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md).
### Configuration:
### Configuration
```toml
# Stream a log file, like the tail -f command
[[inputs.tail]]
## files to tail.
## File names or a pattern to tail.
## These accept standard unix glob matching rules, but with the addition of
## ** as a "super asterisk". ie:
## "/var/log/**.log" -> recursively find all .log files in /var/log
@ -34,14 +33,21 @@ The plugin expects messages in one of the
## See https://github.com/gobwas/glob for more examples
##
files = ["/var/mymetrics.out"]
## Read file from beginning.
from_beginning = false
# from_beginning = false
## Whether file is a named pipe
pipe = false
# pipe = false
## Method used to watch for file updates. Can be either "inotify" or "poll".
# watch_method = "inotify"
## Maximum lines of the file to process that have not yet be written by the
## output. For best throughput set based on the number of metrics on each
## line and the size of the output's metric_batch_size.
# max_undelivered_lines = 1000
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
@ -49,7 +55,7 @@ The plugin expects messages in one of the
data_format = "influx"
```
### Metrics:
### Metrics
Metrics are produced according to the `data_format` option. Additionally a
tag labeled `path` is added to the metric containing the filename being tailed.

View File

@ -3,6 +3,8 @@
package tail
import (
"context"
"errors"
"strings"
"sync"
@ -16,6 +18,7 @@ import (
const (
defaultWatchMethod = "inotify"
defaultMaxUndeliveredLines = 1000
)
var (
@ -23,21 +26,25 @@ var (
offsetsMutex = new(sync.Mutex)
)
type empty struct{}
type semaphore chan empty
type Tail struct {
Files []string
FromBeginning bool
Pipe bool
WatchMethod string
Log telegraf.Logger
Files []string `toml:"files"`
FromBeginning bool `toml:"from_beginning"`
Pipe bool `toml:"pipe"`
WatchMethod string `toml:"watch_method"`
MaxUndeliveredLines int `toml:"max_undelivered_lines"`
Log telegraf.Logger `toml:"-"`
tailers map[string]*tail.Tail
offsets map[string]int64
parserFunc parsers.ParserFunc
wg sync.WaitGroup
acc telegraf.Accumulator
sync.Mutex
ctx context.Context
cancel context.CancelFunc
acc telegraf.TrackingAccumulator
sem semaphore
}
func NewTail() *Tail {
@ -50,12 +57,13 @@ func NewTail() *Tail {
return &Tail{
FromBeginning: false,
MaxUndeliveredLines: 1000,
offsets: offsetsCopy,
}
}
const sampleConfig = `
## files to tail.
## File names or a pattern to tail.
## These accept standard unix glob matching rules, but with the addition of
## ** as a "super asterisk". ie:
## "/var/log/**.log" -> recursively find all .log files in /var/log
@ -65,14 +73,21 @@ const sampleConfig = `
## See https://github.com/gobwas/glob for more examples
##
files = ["/var/mymetrics.out"]
## Read file from beginning.
from_beginning = false
# from_beginning = false
## Whether file is a named pipe
pipe = false
# pipe = false
## Method used to watch for file updates. Can be either "inotify" or "poll".
# watch_method = "inotify"
## Maximum lines of the file to process that have not yet be written by the
## output. For best throughput set based on the number of metrics on each
## line and the size of the output's metric_batch_size.
# max_undelivered_lines = 1000
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
@ -88,18 +103,36 @@ func (t *Tail) Description() string {
return "Stream a log file, like the tail -f command"
}
func (t *Tail) Gather(acc telegraf.Accumulator) error {
t.Lock()
defer t.Unlock()
func (t *Tail) Init() error {
if t.MaxUndeliveredLines == 0 {
return errors.New("max_undelivered_lines must be positive")
}
t.sem = make(semaphore, t.MaxUndeliveredLines)
return nil
}
func (t *Tail) Gather(acc telegraf.Accumulator) error {
return t.tailNewFiles(true)
}
func (t *Tail) Start(acc telegraf.Accumulator) error {
t.Lock()
defer t.Unlock()
t.acc = acc.WithTracking(t.MaxUndeliveredLines)
t.ctx, t.cancel = context.WithCancel(context.Background())
t.wg.Add(1)
go func() {
defer t.wg.Done()
for {
select {
case <-t.ctx.Done():
return
case <-t.acc.Delivered():
<-t.sem
}
}
}()
t.acc = acc
t.tailers = make(map[string]*tail.Tail)
err := t.tailNewFiles(t.FromBeginning)
@ -175,6 +208,12 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {
go func() {
defer t.wg.Done()
t.receiver(parser, tailer)
t.Log.Debugf("Tail removed for %q", tailer.Filename)
if err := tailer.Err(); err != nil {
t.Log.Errorf("Tailing %q: %s", tailer.Filename, err.Error())
}
}()
t.tailers[tailer.Filename] = tailer
}
@ -229,21 +268,19 @@ func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) {
for _, metric := range metrics {
metric.AddTag("path", tailer.Filename)
t.acc.AddMetric(metric)
}
}
t.Log.Debugf("Tail removed for %q", tailer.Filename)
if err := tailer.Err(); err != nil {
t.Log.Errorf("Tailing %q: %s", tailer.Filename, err.Error())
// Block until plugin is stopping or room is available to add metrics.
select {
case <-t.ctx.Done():
return
case t.sem <- empty{}:
t.acc.AddTrackingMetricGroup(metrics)
}
}
}
func (t *Tail) Stop() {
t.Lock()
defer t.Unlock()
for _, tailer := range t.tailers {
if !t.Pipe && !t.FromBeginning {
// store offset for resume
@ -260,6 +297,7 @@ func (t *Tail) Stop() {
}
}
t.cancel()
t.wg.Wait()
// persist offsets

View File

@ -26,6 +26,7 @@ func TestTailFromBeginning(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
defer tmpfile.Close()
_, err = tmpfile.WriteString("cpu,mytag=foo usage_idle=100\n")
require.NoError(t, err)
@ -34,11 +35,13 @@ func TestTailFromBeginning(t *testing.T) {
tt.FromBeginning = true
tt.Files = []string{tmpfile.Name()}
tt.SetParserFunc(parsers.NewInfluxParser)
defer tt.Stop()
defer tmpfile.Close()
err = tt.Init()
require.NoError(t, err)
acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc))
defer tt.Stop()
require.NoError(t, acc.GatherError(tt.Gather))
acc.Wait(1)
@ -60,6 +63,7 @@ func TestTailFromEnd(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
defer tmpfile.Close()
_, err = tmpfile.WriteString("cpu,mytag=foo usage_idle=100\n")
require.NoError(t, err)
@ -67,11 +71,13 @@ func TestTailFromEnd(t *testing.T) {
tt.Log = testutil.Logger{}
tt.Files = []string{tmpfile.Name()}
tt.SetParserFunc(parsers.NewInfluxParser)
defer tt.Stop()
defer tmpfile.Close()
err = tt.Init()
require.NoError(t, err)
acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc))
defer tt.Stop()
for _, tailer := range tt.tailers {
for n, err := tailer.Tell(); err == nil && n == 0; n, err = tailer.Tell() {
// wait for tailer to jump to end
@ -99,17 +105,20 @@ func TestTailBadLine(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
defer tmpfile.Close()
tt := NewTail()
tt.Log = testutil.Logger{}
tt.FromBeginning = true
tt.Files = []string{tmpfile.Name()}
tt.SetParserFunc(parsers.NewInfluxParser)
defer tt.Stop()
defer tmpfile.Close()
err = tt.Init()
require.NoError(t, err)
acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc))
defer tt.Stop()
buf := &bytes.Buffer{}
log.SetOutput(buf)
@ -127,6 +136,7 @@ func TestTailDosLineendings(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
defer tmpfile.Close()
_, err = tmpfile.WriteString("cpu usage_idle=100\r\ncpu2 usage_idle=200\r\n")
require.NoError(t, err)
@ -135,11 +145,13 @@ func TestTailDosLineendings(t *testing.T) {
tt.FromBeginning = true
tt.Files = []string{tmpfile.Name()}
tt.SetParserFunc(parsers.NewInfluxParser)
defer tt.Stop()
defer tmpfile.Close()
err = tt.Init()
require.NoError(t, err)
acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc))
defer tt.Stop()
require.NoError(t, acc.GatherError(tt.Gather))
acc.Wait(2)
@ -180,11 +192,14 @@ cpu,42
TimeFunc: func() time.Time { return time.Unix(0, 0) },
}, nil
})
defer plugin.Stop()
err = plugin.Init()
require.NoError(t, err)
acc := testutil.Accumulator{}
err = plugin.Start(&acc)
require.NoError(t, err)
defer plugin.Stop()
err = plugin.Gather(&acc)
require.NoError(t, err)
acc.Wait(2)
@ -237,11 +252,14 @@ func TestMultipleMetricsOnFirstLine(t *testing.T) {
MetricName: "cpu",
})
})
defer plugin.Stop()
err = plugin.Init()
require.NoError(t, err)
acc := testutil.Accumulator{}
err = plugin.Start(&acc)
require.NoError(t, err)
defer plugin.Stop()
err = plugin.Gather(&acc)
require.NoError(t, err)
acc.Wait(2)