From 37ae3956c18698aba7b1877ac816bea573506f28 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 2 May 2016 18:42:30 -0600 Subject: [PATCH] Space Igloo --- plugins/inputs/all/all.go | 1 + plugins/inputs/igloo/README.md | 23 +++ plugins/inputs/igloo/igloo.go | 331 +++++++++++++++++++++++++++++++++ 3 files changed, 355 insertions(+) create mode 100644 plugins/inputs/igloo/README.md create mode 100644 plugins/inputs/igloo/igloo.go diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 93ea3e779..c24c7b7c5 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -19,6 +19,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/haproxy" _ "github.com/influxdata/telegraf/plugins/inputs/http_response" _ "github.com/influxdata/telegraf/plugins/inputs/httpjson" + _ "github.com/influxdata/telegraf/plugins/inputs/igloo" _ "github.com/influxdata/telegraf/plugins/inputs/influxdb" _ "github.com/influxdata/telegraf/plugins/inputs/ipmi_sensor" _ "github.com/influxdata/telegraf/plugins/inputs/jolokia" diff --git a/plugins/inputs/igloo/README.md b/plugins/inputs/igloo/README.md new file mode 100644 index 000000000..fe0c7cc17 --- /dev/null +++ b/plugins/inputs/igloo/README.md @@ -0,0 +1,23 @@ +# igloo Input Plugin + +The igloo plugin "tails" a logfile and parses each log message. + +By default, the igloo plugin acts like the following unix tail command: + +``` +tail -F --lines=0 myfile.log +``` + +- `-F` means that it will follow the _name_ of the given file, so +that it will be compatible with log-rotated files, and that it will retry on +inaccessible files. +- `--lines=0` means that it will start at the end of the file (unless +the `from_beginning` option is set). + +see http://man7.org/linux/man-pages/man1/tail.1.html for more details. + +### Configuration: + +```toml +``` + diff --git a/plugins/inputs/igloo/igloo.go b/plugins/inputs/igloo/igloo.go new file mode 100644 index 000000000..fd4bd7ea5 --- /dev/null +++ b/plugins/inputs/igloo/igloo.go @@ -0,0 +1,331 @@ +package igloo + +import ( + "fmt" + "log" + "regexp" + "sort" + "strconv" + "strings" + "sync" + "time" + + "github.com/hpcloud/tail" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/globpath" + "github.com/influxdata/telegraf/plugins/inputs" +) + +// format of timestamps +const ( + rfcFormat string = "%s-%s-%sT%s:%s:%s.%sZ" +) + +var ( + // regex for finding timestamps + tRe = regexp.MustCompile(`Timestamp=((\d{4})-(\d{2})-(\d{2}) (\d{2}):(\d{2}):(\d{2}),(\d+))`) +) + +type Tail struct { + Files []string + FromBeginning bool + TagKeys []string + Counters []string + NumFields []string + StrFields []string + + numfieldsRe map[string]*regexp.Regexp + strfieldsRe map[string]*regexp.Regexp + countersRe map[string]*regexp.Regexp + tagsRe map[string]*regexp.Regexp + + counters map[string]map[string]int64 + + tailers []*tail.Tail + wg sync.WaitGroup + acc telegraf.Accumulator + + sync.Mutex +} + +func NewTail() *Tail { + return &Tail{ + FromBeginning: false, + } +} + +const sampleConfig = ` + ## logfiles to parse. + ## + ## These accept standard unix glob matching rules, but with the addition of + ## ** as a "super asterisk". ie: + ## "/var/log/**.log" -> recursively find all .log files in /var/log + ## "/var/log/*/*.log" -> find all .log files with a parent dir in /var/log + ## "/var/log/apache.log" -> just tail the apache log file + ## + ## See https://github.com/gobwas/glob for more examples + ## + files = ["$HOME/sample.log"] + ## Read file from beginning. + from_beginning = false + + ## Each log message is searched for these tag keys in TagKey=Value format. + ## Any that are found will be tagged on the resulting influx measurements. + tag_keys = [ + "HostLocal", + "ProductName", + "OperationName", + ] + + ## counters are keys which are treated as counters. + ## so if counters = ["Result"], then this means that the following ocurrence + ## on a log line: + ## Result=Success + ## would be treated as a counter: Result_Success, and it will be incremented + ## for every occurrence, until Telegraf is restarted. + counters = ["Result"] + ## num_fields are log line occurrences that are translated into numerical + ## fields. ie: + ## Duration=1 + num_fields = ["Duration", "Attempt"] + ## str_fields are log line occurences that are translated into string fields, + ## ie: + ## ActivityGUID=0bb03bf4-ae1d-4487-bb6f-311653b35760 + str_fields = ["ActivityGUID"] +` + +func (t *Tail) SampleConfig() string { + return sampleConfig +} + +func (t *Tail) Description() string { + return "Stream an igloo file, like the tail -f command" +} + +func (t *Tail) Gather(acc telegraf.Accumulator) error { + return nil +} + +func (t *Tail) buildRegexes() error { + t.numfieldsRe = make(map[string]*regexp.Regexp) + t.strfieldsRe = make(map[string]*regexp.Regexp) + t.tagsRe = make(map[string]*regexp.Regexp) + t.countersRe = make(map[string]*regexp.Regexp) + t.counters = make(map[string]map[string]int64) + + for _, field := range t.NumFields { + re, err := regexp.Compile(field + `=([0-9\.]+)`) + if err != nil { + return err + } + t.numfieldsRe[field] = re + } + + for _, field := range t.StrFields { + re, err := regexp.Compile(field + `=([0-9a-zA-Z\.\-]+)`) + if err != nil { + return err + } + t.strfieldsRe[field] = re + } + + for _, field := range t.TagKeys { + re, err := regexp.Compile(field + `=([0-9a-zA-Z\.\-]+)`) + if err != nil { + return err + } + t.tagsRe[field] = re + } + + for _, field := range t.Counters { + re, err := regexp.Compile("(" + field + ")" + `=([0-9a-zA-Z\.\-]+)`) + if err != nil { + return err + } + t.countersRe[field] = re + } + + return nil +} + +func (t *Tail) Start(acc telegraf.Accumulator) error { + t.Lock() + defer t.Unlock() + + t.acc = acc + if err := t.buildRegexes(); err != nil { + return err + } + + var seek tail.SeekInfo + if !t.FromBeginning { + seek.Whence = 2 + seek.Offset = 0 + } + + var errS string + // Create a "tailer" for each file + for _, filepath := range t.Files { + g, err := globpath.Compile(filepath) + if err != nil { + log.Printf("ERROR Glob %s failed to compile, %s", filepath, err) + } + for file, _ := range g.Match() { + tailer, err := tail.TailFile(file, + tail.Config{ + ReOpen: true, + Follow: true, + Location: &seek, + }) + if err != nil { + errS += err.Error() + " " + continue + } + // create a goroutine for each "tailer" + go t.receiver(tailer) + t.tailers = append(t.tailers, tailer) + } + } + + if errS != "" { + return fmt.Errorf(errS) + } + return nil +} + +// this is launched as a goroutine to continuously watch a tailed logfile +// for changes, parse any incoming msgs, and add to the accumulator. +func (t *Tail) receiver(tailer *tail.Tail) { + t.wg.Add(1) + defer t.wg.Done() + + var err error + var line *tail.Line + for line = range tailer.Lines { + if line.Err != nil { + log.Printf("ERROR tailing file %s, Error: %s\n", + tailer.Filename, err) + continue + } + err = t.Parse(line.Text) + if err != nil { + log.Printf("ERROR: %s", err) + } + } +} + +func (t *Tail) Parse(line string) error { + // find the timestamp: + match := tRe.FindAllStringSubmatch(line, -1) + if len(match) < 1 { + return nil + } + if len(match[0]) < 9 { + return nil + } + // make an rfc3339 timestamp and parse it: + ts, err := time.Parse(time.RFC3339Nano, + fmt.Sprintf(rfcFormat, match[0][2], match[0][3], match[0][4], match[0][5], match[0][6], match[0][7], match[0][8])) + if err != nil { + return nil + } + + fields := make(map[string]interface{}) + tags := make(map[string]string) + + // parse numerical fields: + for name, re := range t.numfieldsRe { + match := re.FindAllStringSubmatch(line, -1) + if len(match) < 1 { + continue + } + if len(match[0]) < 2 { + continue + } + num, err := strconv.ParseFloat(match[0][1], 64) + if err == nil { + fields[name] = num + } + } + + // parse string fields: + for name, re := range t.strfieldsRe { + match := re.FindAllStringSubmatch(line, -1) + if len(match) < 1 { + continue + } + if len(match[0]) < 2 { + continue + } + fields[name] = match[0][1] + } + + // parse tags: + for name, re := range t.tagsRe { + match := re.FindAllStringSubmatch(line, -1) + if len(match) < 1 { + continue + } + if len(match[0]) < 2 { + continue + } + tags[name] = match[0][1] + } + + if len(t.countersRe) > 0 { + // Make a unique key for the measurement name/tags + var tg []string + for k, v := range tags { + tg = append(tg, fmt.Sprintf("%s=%s", k, v)) + } + sort.Strings(tg) + hash := fmt.Sprintf("%s%s", strings.Join(tg, ""), "igloo") + + // check if this hash already has a counter map + _, ok := t.counters[hash] + if !ok { + // doesnt have counter map, so make one + t.counters[hash] = make(map[string]int64) + } + + // search for counter matches: + for _, re := range t.countersRe { + match := re.FindAllStringSubmatch(line, -1) + if len(match) < 1 { + continue + } + if len(match[0]) < 3 { + continue + } + counterName := match[0][1] + "_" + match[0][2] + // increment this counter + t.counters[hash][counterName] += 1 + // add this counter to the output fields + fields[counterName] = t.counters[hash][counterName] + } + } + + t.acc.AddFields("igloo", fields, tags, ts) + return nil +} + +func (t *Tail) Stop() { + t.Lock() + defer t.Unlock() + + for _, t := range t.tailers { + err := t.Stop() + if err != nil { + log.Printf("ERROR stopping tail on file %s\n", t.Filename) + } + t.Cleanup() + } + t.wg.Wait() +} + +func init() { + inputs.Add("igloo", func() telegraf.Input { + return NewTail() + }) +}