From f2fb9cc1e80a05eee9a0e8299c0025e4fcba02b7 Mon Sep 17 00:00:00 2001 From: Patrick Hemmer Date: Thu, 21 Jun 2018 19:01:38 -0400 Subject: [PATCH] Fix postfix input handling of multi-level queues (#4333) (cherry picked from commit 16454e25bad046fad13eaf670ab4973ddcd5b069) --- CHANGELOG.md | 1 + plugins/inputs/postfix/postfix.go | 61 ++++++++------------------ plugins/inputs/postfix/postfix_test.go | 21 ++++----- 3 files changed, 29 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5e8c8e220..2e4d87993 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - [#4277](https://github.com/influxdata/telegraf/pull/4277): Treat sigterm as a clean shutdown signal. - [#4284](https://github.com/influxdata/telegraf/pull/4284): Fix selection of tags under nested objects in the JSON parser. +- [#4135](https://github.com/influxdata/telegraf/issues/4135): Fix postfix input handling multi-level queues. ## v1.7 [2018-06-12] diff --git a/plugins/inputs/postfix/postfix.go b/plugins/inputs/postfix/postfix.go index a33879760..8700362d0 100644 --- a/plugins/inputs/postfix/postfix.go +++ b/plugins/inputs/postfix/postfix.go @@ -4,7 +4,7 @@ import ( "fmt" "os" "os/exec" - "path" + "path/filepath" "strings" "time" @@ -28,36 +28,37 @@ func getQueueDirectory() (string, error) { return strings.TrimSpace(string(qd)), nil } -func qScan(path string) (int64, int64, int64, error) { - f, err := os.Open(path) - if err != nil { - return 0, 0, 0, err - } - - finfos, err := f.Readdir(-1) - f.Close() - if err != nil { - return 0, 0, 0, err - } - +func qScan(path string, acc telegraf.Accumulator) (int64, int64, int64, error) { var length, size int64 var oldest time.Time - for _, finfo := range finfos { + err := filepath.Walk(path, func(_ string, finfo os.FileInfo, err error) error { + if err != nil { + acc.AddError(fmt.Errorf("error scanning %s: %s", path, err)) + return nil + } + if finfo.IsDir() { + return nil + } + length++ size += finfo.Size() ctime := statCTime(finfo.Sys()) if ctime.IsZero() { - continue + return nil } if oldest.IsZero() || ctime.Before(oldest) { oldest = ctime } + return nil + }) + if err != nil { + return 0, 0, 0, err } var age int64 if !oldest.IsZero() { age = int64(time.Now().Sub(oldest) / time.Second) - } else if len(finfos) != 0 { + } else if length != 0 { // system doesn't support ctime age = -1 } @@ -77,8 +78,8 @@ func (p *Postfix) Gather(acc telegraf.Accumulator) error { } } - for _, q := range []string{"active", "hold", "incoming", "maildrop"} { - length, size, age, err := qScan(path.Join(p.QueueDirectory, q)) + for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} { + length, size, age, err := qScan(filepath.Join(p.QueueDirectory, q), acc) if err != nil { acc.AddError(fmt.Errorf("error scanning queue %s: %s", q, err)) continue @@ -90,30 +91,6 @@ func (p *Postfix) Gather(acc telegraf.Accumulator) error { acc.AddFields("postfix_queue", fields, map[string]string{"queue": q}) } - var dLength, dSize int64 - dAge := int64(-1) - for _, q := range []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "E", "F"} { - length, size, age, err := qScan(path.Join(p.QueueDirectory, "deferred", q)) - if err != nil { - if os.IsNotExist(err) { - // the directories are created on first use - continue - } - acc.AddError(fmt.Errorf("error scanning queue deferred/%s: %s", q, err)) - return nil - } - dLength += length - dSize += size - if age > dAge { - dAge = age - } - } - fields := map[string]interface{}{"length": dLength, "size": dSize} - if dAge != -1 { - fields["age"] = dAge - } - acc.AddFields("postfix_queue", fields, map[string]string{"queue": "deferred"}) - return nil } diff --git a/plugins/inputs/postfix/postfix_test.go b/plugins/inputs/postfix/postfix_test.go index 75a6817a7..5dbc91d13 100644 --- a/plugins/inputs/postfix/postfix_test.go +++ b/plugins/inputs/postfix/postfix_test.go @@ -3,7 +3,7 @@ package postfix import ( "io/ioutil" "os" - "path" + "path/filepath" "testing" "github.com/influxdata/telegraf/testutil" @@ -16,19 +16,16 @@ func TestGather(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(td) - for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} { - require.NoError(t, os.Mkdir(path.Join(td, q), 0755)) - } - for _, q := range []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "F"} { // "E" deliberately left off - require.NoError(t, os.Mkdir(path.Join(td, "deferred", q), 0755)) + for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred/0/0", "deferred/F/F"} { + require.NoError(t, os.MkdirAll(filepath.FromSlash(td+"/"+q), 0755)) } - require.NoError(t, ioutil.WriteFile(path.Join(td, "active", "01"), []byte("abc"), 0644)) - require.NoError(t, ioutil.WriteFile(path.Join(td, "active", "02"), []byte("defg"), 0644)) - require.NoError(t, ioutil.WriteFile(path.Join(td, "hold", "01"), []byte("abc"), 0644)) - require.NoError(t, ioutil.WriteFile(path.Join(td, "incoming", "01"), []byte("abcd"), 0644)) - require.NoError(t, ioutil.WriteFile(path.Join(td, "deferred", "0", "01"), []byte("abc"), 0644)) - require.NoError(t, ioutil.WriteFile(path.Join(td, "deferred", "F", "F1"), []byte("abc"), 0644)) + require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/active/01"), []byte("abc"), 0644)) + require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/active/02"), []byte("defg"), 0644)) + require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/hold/01"), []byte("abc"), 0644)) + require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/incoming/01"), []byte("abcd"), 0644)) + require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/deferred/0/0/01"), []byte("abc"), 0644)) + require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/deferred/F/F/F1"), []byte("abc"), 0644)) p := Postfix{ QueueDirectory: td,