From 7a8e8217318236b9fc0e5b306cbe91b1142a7472 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 6 Mar 2017 15:59:36 +0000 Subject: [PATCH] Revert "Procstat: don't cache PIDs" (#2479) --- CHANGELOG.md | 2 - plugins/inputs/procstat/procstat.go | 44 ++++++++++++++++--- plugins/inputs/procstat/procstat_test.go | 2 + plugins/inputs/procstat/spec_processor.go | 53 ++++++++++------------- 4 files changed, 61 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 323b23915..5773179b6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,8 +61,6 @@ be deprecated eventually. - [#2077](https://github.com/influxdata/telegraf/issues/2077): SQL Server Input - Arithmetic overflow error converting numeric to data type int. - [#2262](https://github.com/influxdata/telegraf/issues/2262): Flush jitter can inhibit metric collection. -- [#2287](https://github.com/influxdata/telegraf/issues/2287): Kubernetes input: Handle null startTime for stopped pods -- [#1636](https://github.com/influxdata/telegraf/issues/1636): procstat - stop caching PIDs. - [#2318](https://github.com/influxdata/telegraf/issues/2318): haproxy input - Add missing fields. - [#2287](https://github.com/influxdata/telegraf/issues/2287): Kubernetes input: Handle null startTime for stopped pods. - [#2356](https://github.com/influxdata/telegraf/issues/2356): cpu input panic when /proc/stat is empty. diff --git a/plugins/inputs/procstat/procstat.go b/plugins/inputs/procstat/procstat.go index 565d0ebd1..929490e4a 100644 --- a/plugins/inputs/procstat/procstat.go +++ b/plugins/inputs/procstat/procstat.go @@ -8,6 +8,8 @@ import ( "strconv" "strings" + "github.com/shirou/gopsutil/process" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -21,12 +23,15 @@ type Procstat struct { User string PidTag bool + // pidmap maps a pid to a process object, so we don't recreate every gather + pidmap map[int32]*process.Process // tagmap maps a pid to a map of tags for that pid tagmap map[int32]map[string]string } func NewProcstat() *Procstat { return &Procstat{ + pidmap: make(map[int32]*process.Process), tagmap: make(map[int32]map[string]string), } } @@ -62,26 +67,51 @@ func (_ *Procstat) Description() string { } func (p *Procstat) Gather(acc telegraf.Accumulator) error { - pids, err := p.getAllPids() + err := p.createProcesses() if err != nil { log.Printf("E! Error: procstat getting process, exe: [%s] pidfile: [%s] pattern: [%s] user: [%s] %s", p.Exe, p.PidFile, p.Pattern, p.User, err.Error()) } else { - for _, pid := range pids { + for pid, proc := range p.pidmap { if p.PidTag { p.tagmap[pid]["pid"] = fmt.Sprint(pid) } - p := NewSpecProcessor(p.ProcessName, p.Prefix, pid, acc, p.tagmap[pid]) - err := p.pushMetrics() - if err != nil { - log.Printf("E! Error: procstat: %s", err.Error()) - } + p := NewSpecProcessor(p.ProcessName, p.Prefix, pid, acc, proc, p.tagmap[pid]) + p.pushMetrics() } } return nil } +func (p *Procstat) createProcesses() error { + var errstring string + var outerr error + + pids, err := p.getAllPids() + if err != nil { + errstring += err.Error() + " " + } + + for _, pid := range pids { + _, ok := p.pidmap[pid] + if !ok { + proc, err := process.NewProcess(pid) + if err == nil { + p.pidmap[pid] = proc + } else { + errstring += err.Error() + " " + } + } + } + + if errstring != "" { + outerr = fmt.Errorf("%s", errstring) + } + + return outerr +} + func (p *Procstat) getAllPids() ([]int32, error) { var pids []int32 var err error diff --git a/plugins/inputs/procstat/procstat_test.go b/plugins/inputs/procstat/procstat_test.go index 001537178..ccc72bdbb 100644 --- a/plugins/inputs/procstat/procstat_test.go +++ b/plugins/inputs/procstat/procstat_test.go @@ -6,6 +6,7 @@ import ( "strconv" "testing" + "github.com/shirou/gopsutil/process" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -23,6 +24,7 @@ func TestGather(t *testing.T) { p := Procstat{ PidFile: file.Name(), Prefix: "foo", + pidmap: make(map[int32]*process.Process), tagmap: make(map[int32]map[string]string), } p.Gather(&acc) diff --git a/plugins/inputs/procstat/spec_processor.go b/plugins/inputs/procstat/spec_processor.go index 1b9f63126..3b56fbc3e 100644 --- a/plugins/inputs/procstat/spec_processor.go +++ b/plugins/inputs/procstat/spec_processor.go @@ -1,7 +1,6 @@ package procstat import ( - "fmt" "time" "github.com/shirou/gopsutil/process" @@ -10,13 +9,12 @@ import ( ) type SpecProcessor struct { - ProcessName string - Prefix string - pid int32 - tags map[string]string - fields map[string]interface{} - acc telegraf.Accumulator - proc *process.Process + Prefix string + pid int32 + tags map[string]string + fields map[string]interface{} + acc telegraf.Accumulator + proc *process.Process } func NewSpecProcessor( @@ -24,35 +22,29 @@ func NewSpecProcessor( prefix string, pid int32, acc telegraf.Accumulator, + p *process.Process, tags map[string]string, ) *SpecProcessor { + if processName != "" { + tags["process_name"] = processName + } else { + name, err := p.Name() + if err == nil { + tags["process_name"] = name + } + } return &SpecProcessor{ - ProcessName: processName, - Prefix: prefix, - pid: pid, - tags: tags, - fields: make(map[string]interface{}), - acc: acc, + Prefix: prefix, + pid: pid, + tags: tags, + fields: make(map[string]interface{}), + acc: acc, + proc: p, } } -func (p *SpecProcessor) pushMetrics() error { +func (p *SpecProcessor) pushMetrics() { var prefix string - proc, err := process.NewProcess(p.pid) - if err != nil { - return fmt.Errorf("Failed to open process with pid '%d'. Error: '%s'", - p.pid, err) - } - p.proc = proc - if p.ProcessName != "" { - p.tags["process_name"] = p.ProcessName - } else { - name, err := p.proc.Name() - if err == nil { - p.tags["process_name"] = name - } - } - if p.Prefix != "" { prefix = p.Prefix + "_" } @@ -115,5 +107,4 @@ func (p *SpecProcessor) pushMetrics() error { } p.acc.AddFields("procstat", fields, p.tags) - return nil }