diff --git a/Godeps b/Godeps index 926adcb74..9dd6a83ed 100644 --- a/Godeps +++ b/Godeps @@ -23,6 +23,7 @@ github.com/gonuts/go-shellquote e842a11b24c6abfb3dd27af69a17f482e4b483c2 github.com/gorilla/context 1ea25387ff6f684839d82767c1733ff4d4d15d0a github.com/gorilla/mux c9e326e2bdec29039a3761c07bece13133863e1e github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 +github.com/hpcloud/tail b2940955ab8b26e19d43a43c4da0475dd81bdb56 github.com/influxdata/config b79f6829346b8d6e78ba73544b1e1038f1f1c9da github.com/influxdata/influxdb 21db76b3374c733f37ed16ad93f3484020034351 github.com/influxdata/toml af4df43894b16e3fd2b788d01bd27ad0776ef2d0 diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 36526f4d1..93ea3e779 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -56,6 +56,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/statsd" _ "github.com/influxdata/telegraf/plugins/inputs/sysstat" _ "github.com/influxdata/telegraf/plugins/inputs/system" + _ "github.com/influxdata/telegraf/plugins/inputs/tail" _ "github.com/influxdata/telegraf/plugins/inputs/tcp_listener" _ "github.com/influxdata/telegraf/plugins/inputs/trig" _ "github.com/influxdata/telegraf/plugins/inputs/twemproxy" diff --git a/plugins/inputs/filestat/filestat.go b/plugins/inputs/filestat/filestat.go index 831d37444..938c12e34 100644 --- a/plugins/inputs/filestat/filestat.go +++ b/plugins/inputs/filestat/filestat.go @@ -14,8 +14,14 @@ import ( const sampleConfig = ` ## Files to gather stats about. ## These accept standard unix glob matching rules, but with the addition of - ## ** as a "super asterisk". See https://github.com/gobwas/glob. - files = ["/etc/telegraf/telegraf.conf", "/var/log/**.log"] + ## ** as a "super asterisk". ie: + ## "/var/log/**.log" -> recursively find all .log files in /var/log + ## "/var/log/*/*.log" -> find all .log files with a parent dir in /var/log + ## "/var/log/apache.log" -> just tail the apache log file + ## + ## See https://github.com/gobwas/glob for more examples + ## + files = ["/var/log/**.log"] ## If true, read the entire file and calculate an md5 checksum. md5 = false ` diff --git a/plugins/inputs/tail/README.md b/plugins/inputs/tail/README.md new file mode 100644 index 000000000..3b1c50665 --- /dev/null +++ b/plugins/inputs/tail/README.md @@ -0,0 +1,29 @@ +# tail Input Plugin + +The tail plugin "tails" a logfile and parses each log message. + +By default, the tail plugin acts like the following unix tail command: + +``` +tail --follow=name --lines=0 --retry myfile.log +``` + +- `--follow=name` means that it will follow the _name_ of the given file, so +that it will be compatible with log-rotated files. +- `--lines=0` means that it will start at the end of the file (unless +the `from_beginning` option is set). +- `--retry` means it will retry on inaccessible files. + +see http://man7.org/linux/man-pages/man1/tail.1.html for more details. + +The plugin expects messages in one of the +[Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md). + +### Configuration: + +```toml +# Stream a log file, like the tail -f command +[[inputs.tail]] + # SampleConfig +``` + diff --git a/plugins/inputs/tail/tail.go b/plugins/inputs/tail/tail.go new file mode 100644 index 000000000..cb99eff61 --- /dev/null +++ b/plugins/inputs/tail/tail.go @@ -0,0 +1,152 @@ +package tail + +import ( + "fmt" + "log" + "sync" + + "github.com/hpcloud/tail" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/globpath" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" +) + +type Tail struct { + Files []string + FromBeginning bool + + tailers []*tail.Tail + parser parsers.Parser + wg sync.WaitGroup + acc telegraf.Accumulator + + sync.Mutex +} + +const sampleConfig = ` + ## files to tail. + ## These accept standard unix glob matching rules, but with the addition of + ## ** as a "super asterisk". ie: + ## "/var/log/**.log" -> recursively find all .log files in /var/log + ## "/var/log/*/*.log" -> find all .log files with a parent dir in /var/log + ## "/var/log/apache.log" -> just tail the apache log file + ## + ## See https://github.com/gobwas/glob for more examples + ## + files = ["/var/mymetrics.out"] + ## Read file from beginning. + from_beginning = false + + ## Data format to consume. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +` + +func (t *Tail) SampleConfig() string { + return sampleConfig +} + +func (t *Tail) Description() string { + return "Stream a log file, like the tail -f command" +} + +func (t *Tail) Gather(acc telegraf.Accumulator) error { + return nil +} + +func (t *Tail) Start(acc telegraf.Accumulator) error { + t.Lock() + defer t.Unlock() + + t.acc = acc + + var seek tail.SeekInfo + if !t.FromBeginning { + seek.Whence = 2 + seek.Offset = 0 + } + + var errS string + // Create a "tailer" for each file + for _, filepath := range t.Files { + g, err := globpath.Compile(filepath) + if err != nil { + log.Printf("ERROR Glob %s failed to compile, %s", filepath, err) + } + for file, _ := range g.Match() { + tailer, err := tail.TailFile(file, + tail.Config{ + ReOpen: true, + Follow: true, + Location: &seek, + }) + if err != nil { + errS += err.Error() + " " + continue + } + // create a goroutine for each "tailer" + go t.receiver(tailer) + t.tailers = append(t.tailers, tailer) + } + } + + if errS != "" { + return fmt.Errorf(errS) + } + return nil +} + +// this is launched as a goroutine to continuously watch a tailed logfile +// for changes, parse any incoming msgs, and add to the accumulator. +func (t *Tail) receiver(tailer *tail.Tail) { + t.wg.Add(1) + defer t.wg.Done() + + var m telegraf.Metric + var err error + var line *tail.Line + for line = range tailer.Lines { + if line.Err != nil { + log.Printf("ERROR tailing file %s, Error: %s\n", + tailer.Filename, err) + continue + } + m, err = t.parser.ParseLine(line.Text) + if err == nil { + t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + } else { + log.Printf("Malformed log line in %s: [%s], Error: %s\n", + tailer.Filename, line.Text, err) + } + } +} + +func (t *Tail) Stop() { + t.Lock() + defer t.Unlock() + + for _, t := range t.tailers { + err := t.Stop() + if err != nil { + log.Printf("ERROR stopping tail on file %s\n", t.Filename) + } + t.Cleanup() + } + t.wg.Wait() +} + +func (t *Tail) SetParser(parser parsers.Parser) { + t.parser = parser +} + +func init() { + inputs.Add("tail", func() telegraf.Input { + return &Tail{ + FromBeginning: false, + } + }) +} diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go new file mode 100644 index 000000000..78e2dd578 --- /dev/null +++ b/plugins/inputs/tail/tail_test.go @@ -0,0 +1 @@ +package tail diff --git a/plugins/inputs/tcp_listener/tcp_listener.go b/plugins/inputs/tcp_listener/tcp_listener.go index ce07a7601..a420ed759 100644 --- a/plugins/inputs/tcp_listener/tcp_listener.go +++ b/plugins/inputs/tcp_listener/tcp_listener.go @@ -221,7 +221,10 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) { // tcpParser parses the incoming tcp byte packets func (t *TcpListener) tcpParser() error { defer t.wg.Done() + var packet []byte + var metrics []telegraf.Metric + var err error for { select { case <-t.done: @@ -230,7 +233,7 @@ func (t *TcpListener) tcpParser() error { if len(packet) == 0 { continue } - metrics, err := t.parser.Parse(packet) + metrics, err = t.parser.Parse(packet) if err == nil { t.storeMetrics(metrics) } else { diff --git a/plugins/inputs/udp_listener/udp_listener.go b/plugins/inputs/udp_listener/udp_listener.go index 39249de37..8e2637ce7 100644 --- a/plugins/inputs/udp_listener/udp_listener.go +++ b/plugins/inputs/udp_listener/udp_listener.go @@ -135,12 +135,14 @@ func (u *UdpListener) udpParser() error { defer u.wg.Done() var packet []byte + var metrics []telegraf.Metric + var err error for { select { case <-u.done: return nil case packet = <-u.in: - metrics, err := u.parser.Parse(packet) + metrics, err = u.parser.Parse(packet) if err == nil { u.storeMetrics(metrics) } else {