diff --git a/CHANGELOG.md b/CHANGELOG.md index bfb65f362..ab8c45ea0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,8 @@ - [#4259](https://github.com/influxdata/telegraf/pull/4259): Add container status tag to docker input. - [#3523](https://github.com/influxdata/telegraf/pull/3523): Add valuecounter aggregator plugin. - [#4307](https://github.com/influxdata/telegraf/pull/4307): Add new measurement with results of pgrep lookup to procstat input. +- [#4311](https://github.com/influxdata/telegraf/pull/4311): Add support for comma in logparser timestamp format. +- [#4292](https://github.com/influxdata/telegraf/pull/4292): Add path tag to tail input plugin. ## v1.7.1 [unreleased] @@ -27,6 +29,7 @@ - [#4277](https://github.com/influxdata/telegraf/pull/4277): Treat sigterm as a clean shutdown signal. - [#4284](https://github.com/influxdata/telegraf/pull/4284): Fix selection of tags under nested objects in the JSON parser. +- [#4135](https://github.com/influxdata/telegraf/issues/4135): Fix postfix input handling multi-level queues. ## v1.7 [2018-06-12] diff --git a/Makefile b/Makefile index 4d78ab1e1..8650df267 100644 --- a/Makefile +++ b/Makefile @@ -54,11 +54,11 @@ fmtcheck: @echo '[INFO] done.' test-windows: - go test ./plugins/inputs/ping/... - go test ./plugins/inputs/win_perf_counters/... - go test ./plugins/inputs/win_services/... - go test ./plugins/inputs/procstat/... - go test ./plugins/inputs/ntpq/... + go test -short ./plugins/inputs/ping/... + go test -short ./plugins/inputs/win_perf_counters/... + go test -short ./plugins/inputs/win_services/... + go test -short ./plugins/inputs/procstat/... + go test -short ./plugins/inputs/ntpq/... # vet runs the Go source code static analysis tool `vet` to find # any common errors. diff --git a/plugins/inputs/logparser/README.md b/plugins/inputs/logparser/README.md index 7a1df8bc8..1caa3830c 100644 --- a/plugins/inputs/logparser/README.md +++ b/plugins/inputs/logparser/README.md @@ -108,7 +108,9 @@ You must capture at least one field per line. - ts-"CUSTOM" CUSTOM time layouts must be within quotes and be the representation of the -"reference time", which is `Mon Jan 2 15:04:05 -0700 MST 2006` +"reference time", which is `Mon Jan 2 15:04:05 -0700 MST 2006`. +To match a comma decimal point you can use a period. For example `%{TIMESTAMP:timestamp:ts-"2006-01-02 15:04:05.000"}` can be used to match `"2018-01-02 15:04:05,000"` +To match a comma decimal point you can use a period in the pattern string. See https://golang.org/pkg/time/#Parse for more details. Telegraf has many of its own [built-in patterns](./grok/patterns/influx-patterns), diff --git a/plugins/inputs/logparser/grok/grok.go b/plugins/inputs/logparser/grok/grok.go index 2bde576c7..e9b4d72d8 100644 --- a/plugins/inputs/logparser/grok/grok.go +++ b/plugins/inputs/logparser/grok/grok.go @@ -335,6 +335,9 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { case DROP: // goodbye! default: + // Replace commas with dot character + v = strings.Replace(v, ",", ".", -1) + ts, err := time.ParseInLocation(t, v, p.loc) if err == nil { if ts.Year() == 0 { diff --git a/plugins/inputs/logparser/grok/grok_test.go b/plugins/inputs/logparser/grok/grok_test.go index caafa9dc1..3a88b27e6 100644 --- a/plugins/inputs/logparser/grok/grok_test.go +++ b/plugins/inputs/logparser/grok/grok_test.go @@ -983,6 +983,23 @@ func TestSyslogTimestampParser(t *testing.T) { require.Equal(t, 2018, m.Time().Year()) } +func TestReplaceTimestampComma(t *testing.T) { + + p := &Parser{ + Patterns: []string{`%{TIMESTAMP_ISO8601:timestamp:ts-"2006-01-02 15:04:05.000"} successfulMatches=%{NUMBER:value:int}`}, + } + + require.NoError(t, p.Compile()) + m, err := p.ParseLine("2018-02-21 13:10:34,555 successfulMatches=1") + require.NoError(t, err) + require.NotNil(t, m) + + require.Equal(t, 2018, m.Time().Year()) + require.Equal(t, 13, m.Time().Hour()) + require.Equal(t, 34, m.Time().Second()) + //Convert Nanosecond to milisecond for compare + require.Equal(t, 555, m.Time().Nanosecond()/1000000) + func TestEmptyYearInTimestamp(t *testing.T) { p := &Parser{ Patterns: []string{`%{APPLE_SYSLOG_TIME_SHORT:timestamp:ts-"Jan 2 15:04:05"} %{HOSTNAME} %{APP_NAME:app_name}\[%{NUMBER:pid:int}\]%{GREEDYDATA:message}`}, @@ -997,4 +1014,5 @@ func TestEmptyYearInTimestamp(t *testing.T) { require.NoError(t, err) require.NotNil(t, m) require.Equal(t, 2018, m.Time().Year()) + } diff --git a/plugins/inputs/postfix/postfix.go b/plugins/inputs/postfix/postfix.go index a33879760..8700362d0 100644 --- a/plugins/inputs/postfix/postfix.go +++ b/plugins/inputs/postfix/postfix.go @@ -4,7 +4,7 @@ import ( "fmt" "os" "os/exec" - "path" + "path/filepath" "strings" "time" @@ -28,36 +28,37 @@ func getQueueDirectory() (string, error) { return strings.TrimSpace(string(qd)), nil } -func qScan(path string) (int64, int64, int64, error) { - f, err := os.Open(path) - if err != nil { - return 0, 0, 0, err - } - - finfos, err := f.Readdir(-1) - f.Close() - if err != nil { - return 0, 0, 0, err - } - +func qScan(path string, acc telegraf.Accumulator) (int64, int64, int64, error) { var length, size int64 var oldest time.Time - for _, finfo := range finfos { + err := filepath.Walk(path, func(_ string, finfo os.FileInfo, err error) error { + if err != nil { + acc.AddError(fmt.Errorf("error scanning %s: %s", path, err)) + return nil + } + if finfo.IsDir() { + return nil + } + length++ size += finfo.Size() ctime := statCTime(finfo.Sys()) if ctime.IsZero() { - continue + return nil } if oldest.IsZero() || ctime.Before(oldest) { oldest = ctime } + return nil + }) + if err != nil { + return 0, 0, 0, err } var age int64 if !oldest.IsZero() { age = int64(time.Now().Sub(oldest) / time.Second) - } else if len(finfos) != 0 { + } else if length != 0 { // system doesn't support ctime age = -1 } @@ -77,8 +78,8 @@ func (p *Postfix) Gather(acc telegraf.Accumulator) error { } } - for _, q := range []string{"active", "hold", "incoming", "maildrop"} { - length, size, age, err := qScan(path.Join(p.QueueDirectory, q)) + for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} { + length, size, age, err := qScan(filepath.Join(p.QueueDirectory, q), acc) if err != nil { acc.AddError(fmt.Errorf("error scanning queue %s: %s", q, err)) continue @@ -90,30 +91,6 @@ func (p *Postfix) Gather(acc telegraf.Accumulator) error { acc.AddFields("postfix_queue", fields, map[string]string{"queue": q}) } - var dLength, dSize int64 - dAge := int64(-1) - for _, q := range []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "E", "F"} { - length, size, age, err := qScan(path.Join(p.QueueDirectory, "deferred", q)) - if err != nil { - if os.IsNotExist(err) { - // the directories are created on first use - continue - } - acc.AddError(fmt.Errorf("error scanning queue deferred/%s: %s", q, err)) - return nil - } - dLength += length - dSize += size - if age > dAge { - dAge = age - } - } - fields := map[string]interface{}{"length": dLength, "size": dSize} - if dAge != -1 { - fields["age"] = dAge - } - acc.AddFields("postfix_queue", fields, map[string]string{"queue": "deferred"}) - return nil } diff --git a/plugins/inputs/postfix/postfix_test.go b/plugins/inputs/postfix/postfix_test.go index 75a6817a7..5dbc91d13 100644 --- a/plugins/inputs/postfix/postfix_test.go +++ b/plugins/inputs/postfix/postfix_test.go @@ -3,7 +3,7 @@ package postfix import ( "io/ioutil" "os" - "path" + "path/filepath" "testing" "github.com/influxdata/telegraf/testutil" @@ -16,19 +16,16 @@ func TestGather(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(td) - for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} { - require.NoError(t, os.Mkdir(path.Join(td, q), 0755)) - } - for _, q := range []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "F"} { // "E" deliberately left off - require.NoError(t, os.Mkdir(path.Join(td, "deferred", q), 0755)) + for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred/0/0", "deferred/F/F"} { + require.NoError(t, os.MkdirAll(filepath.FromSlash(td+"/"+q), 0755)) } - require.NoError(t, ioutil.WriteFile(path.Join(td, "active", "01"), []byte("abc"), 0644)) - require.NoError(t, ioutil.WriteFile(path.Join(td, "active", "02"), []byte("defg"), 0644)) - require.NoError(t, ioutil.WriteFile(path.Join(td, "hold", "01"), []byte("abc"), 0644)) - require.NoError(t, ioutil.WriteFile(path.Join(td, "incoming", "01"), []byte("abcd"), 0644)) - require.NoError(t, ioutil.WriteFile(path.Join(td, "deferred", "0", "01"), []byte("abc"), 0644)) - require.NoError(t, ioutil.WriteFile(path.Join(td, "deferred", "F", "F1"), []byte("abc"), 0644)) + require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/active/01"), []byte("abc"), 0644)) + require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/active/02"), []byte("defg"), 0644)) + require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/hold/01"), []byte("abc"), 0644)) + require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/incoming/01"), []byte("abcd"), 0644)) + require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/deferred/0/0/01"), []byte("abc"), 0644)) + require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/deferred/F/F/F1"), []byte("abc"), 0644)) p := Postfix{ QueueDirectory: td, diff --git a/plugins/inputs/tail/README.md b/plugins/inputs/tail/README.md index 5d7253649..27cb6418e 100644 --- a/plugins/inputs/tail/README.md +++ b/plugins/inputs/tail/README.md @@ -1,4 +1,4 @@ -# tail Input Plugin +# Tail Input Plugin The tail plugin "tails" a logfile and parses each log message. @@ -49,3 +49,7 @@ The plugin expects messages in one of the data_format = "influx" ``` +### Metrics: + +Metrics are produced according to the `data_format` option. Additionally a +tag labeled `path` is added to the metric containing the filename being tailed. diff --git a/plugins/inputs/tail/tail.go b/plugins/inputs/tail/tail.go index 86534da5f..0de2a344c 100644 --- a/plugins/inputs/tail/tail.go +++ b/plugins/inputs/tail/tail.go @@ -146,7 +146,11 @@ func (t *Tail) receiver(tailer *tail.Tail) { m, err = t.parser.ParseLine(text) if err == nil { - t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + if m != nil { + tags := m.Tags() + tags["path"] = tailer.Filename + t.acc.AddFields(m.Name(), m.Fields(), tags, m.Time()) + } } else { t.acc.AddError(fmt.Errorf("E! Malformed log line in %s: [%s], Error: %s\n", tailer.Filename, line.Text, err))