diff --git a/plugins/inputs/reader/reader.go b/plugins/inputs/reader/reader.go new file mode 100644 index 000000000..74b180e25 --- /dev/null +++ b/plugins/inputs/reader/reader.go @@ -0,0 +1,106 @@ +package reader + +import ( + "io/ioutil" + "log" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/globpath" + "github.com/influxdata/telegraf/plugins/parsers" +) + +type Reader struct { + Filepaths []string `toml:"files"` + FromBeginning bool + DataFormat string `toml:"data_format"` + ParserConfig parsers.Config + Parser parsers.Parser + Tags []string + + Filenames []string +} + +const sampleConfig = `## Log files to parse. +## These accept standard unix glob matching rules, but with the addition of +## ** as a "super asterisk". ie: +## /var/log/**.log -> recursively find all .log files in /var/log +## /var/log/*/*.log -> find all .log files with a parent dir in /var/log +## /var/log/apache.log -> only tail the apache log file +files = ["/var/log/apache/access.log"] + +## The dataformat to be read from files +## Each data format has its own unique set of configuration options, read +## more about them here: +## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md +data_format = "" +'''` + +// SampleConfig returns the default configuration of the Input +func (r *Reader) SampleConfig() string { + return sampleConfig +} + +func (r *Reader) Description() string { + return "reload and gather from file[s] on telegraf's interval" +} + +func (r *Reader) Gather(acc telegraf.Accumulator) error { + r.refreshFilePaths() + for _, k := range r.Filenames { + metrics, err := r.readMetric(k) + if err != nil { + return err + } + + for _, m := range metrics { + acc.AddFields(m.Name(), m.Fields(), m.Tags()) + } + } + return nil +} + +func (r *Reader) compileParser() { + if r.DataFormat == "grok" { + log.Printf("Grok isn't supported yet") + return + } + r.ParserConfig = parsers.Config{ + DataFormat: r.DataFormat, + TagKeys: r.Tags, + } + nParser, err := parsers.NewParser(&r.ParserConfig) + if err != nil { + log.Printf("E! Error building parser: %v", err) + } + + r.Parser = nParser +} + +func (r *Reader) refreshFilePaths() { + var allFiles []string + for _, filepath := range r.Filepaths { + g, err := globpath.Compile(filepath) + if err != nil { + log.Printf("E! Error Glob %s failed to compile, %s", filepath, err) + continue + } + files := g.Match() + + for k := range files { + allFiles = append(allFiles, k) + } + } + + r.Filenames = allFiles +} + +//requires that Parser has been compiled +func (r *Reader) readMetric(filename string) ([]telegraf.Metric, error) { + fileContents, err := ioutil.ReadFile(filename) + if err != nil { + log.Printf("E! File could not be opened: %v", filename) + } + + return r.Parser.Parse(fileContents) + +} diff --git a/plugins/inputs/reader/reader_test.go b/plugins/inputs/reader/reader_test.go new file mode 100644 index 000000000..e073a6f54 --- /dev/null +++ b/plugins/inputs/reader/reader_test.go @@ -0,0 +1,41 @@ +package reader + +import ( + "log" + "runtime" + "strings" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" +) + +func TestRefreshFilePaths(t *testing.T) { + testDir := getPluginDir() + r := Reader{ + Filepaths: []string{testDir + "/logparser/grok/testdata/**.log"}, + } + + r.refreshFilePaths() + //log.Printf("filenames: %v", filenames) + assert.Equal(t, len(r.Filenames), 2) +} +func TestJSONParserCompile(t *testing.T) { + testDir := getPluginDir() + var acc testutil.Accumulator + r := Reader{ + Filepaths: []string{testDir + "/reader/testfiles/**.log"}, + DataFormat: "json", + Tags: []string{"parent_ignored_child"}, + } + r.compileParser() + r.Gather(&acc) + log.Printf("acc: %v", acc.Metrics[0].Tags) + assert.Equal(t, map[string]string{"parent_ignored_child": "hi"}, acc.Metrics[0].Tags) + assert.Equal(t, 5, len(acc.Metrics[0].Fields)) +} + +func getPluginDir() string { + _, filename, _, _ := runtime.Caller(1) + return strings.Replace(filename, "/reader/reader_test.go", "", 1) +} diff --git a/plugins/inputs/reader/testfiles/json_a.log b/plugins/inputs/reader/testfiles/json_a.log new file mode 100644 index 000000000..739fd65d8 --- /dev/null +++ b/plugins/inputs/reader/testfiles/json_a.log @@ -0,0 +1,14 @@ +{ + "parent": { + "child": 3.0, + "ignored_child": "hi" + }, + "ignored_null": null, + "integer": 4, + "list": [3, 4], + "ignored_parent": { + "another_ignored_null": null, + "ignored_string": "hello, world!" + }, + "another_list": [4] + } \ No newline at end of file