From fdf00c1be6ebf1a6290c448551abae41e46e1076 Mon Sep 17 00:00:00 2001 From: Ranjib Dey Date: Sat, 3 Oct 2015 22:09:18 -0700 Subject: [PATCH] Monitor process by pidfile or exe name --- plugins/all/all.go | 1 + plugins/procstat/procstat.go | 104 ++++++++++++++++++++++++++++ plugins/procstat/procstat_test.go | 28 ++++++++ plugins/procstat/spec_processor.go | 107 +++++++++++++++++++++++++++++ 4 files changed, 240 insertions(+) create mode 100644 plugins/procstat/procstat.go create mode 100644 plugins/procstat/procstat_test.go create mode 100644 plugins/procstat/spec_processor.go diff --git a/plugins/all/all.go b/plugins/all/all.go index ffef12b33..1cb115bfc 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -16,6 +16,7 @@ import ( _ "github.com/influxdb/telegraf/plugins/nginx" _ "github.com/influxdb/telegraf/plugins/ping" _ "github.com/influxdb/telegraf/plugins/postgresql" + _ "github.com/influxdb/telegraf/plugins/procstat" _ "github.com/influxdb/telegraf/plugins/prometheus" _ "github.com/influxdb/telegraf/plugins/rabbitmq" _ "github.com/influxdb/telegraf/plugins/redis" diff --git a/plugins/procstat/procstat.go b/plugins/procstat/procstat.go new file mode 100644 index 000000000..29c7bc1ae --- /dev/null +++ b/plugins/procstat/procstat.go @@ -0,0 +1,104 @@ +package procstat + +import ( + "fmt" + "github.com/influxdb/telegraf/plugins" + "github.com/shirou/gopsutil/process" + "io/ioutil" + "os/exec" + "strconv" + "strings" + "sync" +) + +type Specification struct { + PidFile string `toml:pid_file` + Exe string + Prefix string +} + +type Procstat struct { + Specifications []*Specification +} + +func NewProcstat() *Procstat { + return &Procstat{} +} + +var sampleConfig = ` + [[process.specifications]] + # pid file + pid_file = "/path/to/foo.pid" + # executable name (used by pgrep) + exe = "/path/to/foo" + name = "foo" # required +` + +func (_ *Procstat) SampleConfig() string { + return sampleConfig +} + +func (_ *Procstat) Description() string { + return "Monitor process cpu and memory usage" +} + +func (p *Procstat) Gather(acc plugins.Accumulator) error { + var wg sync.WaitGroup + var outerr error + for _, specification := range p.Specifications { + wg.Add(1) + go func(spec *Specification, acc plugins.Accumulator) { + defer wg.Done() + proc, err := spec.createProcess() + if err != nil { + outerr = err + } else { + outerr = NewSpecProcessor(spec.Prefix, acc, proc).pushMetrics() + } + }(specification, acc) + } + wg.Wait() + return outerr +} + +func (spec *Specification) createProcess() (*process.Process, error) { + if spec.PidFile != "" { + pid, err := pidFromFile(spec.PidFile) + if err != nil { + return nil, err + } + return process.NewProcess(int32(pid)) + } else if spec.Exe != "" { + pid, err := pidFromExe(spec.Exe) + if err != nil { + return nil, err + } + return process.NewProcess(int32(pid)) + } else { + return nil, fmt.Errorf("Either exe or pid_file has to be specified") + } +} + +func pidFromFile(file string) (int, error) { + pidString, err := ioutil.ReadFile(file) + if err != nil { + return -1, fmt.Errorf("Failed to read pidfile '%s'. Error: '%s'", file, err) + } else { + return strconv.Atoi(strings.TrimSpace(string(pidString))) + } +} + +func pidFromExe(exe string) (int, error) { + pidString, err := exec.Command("pgrep", exe).Output() + if err != nil { + return -1, fmt.Errorf("Failed to execute pgrep. Error: '%s'", err) + } else { + return strconv.Atoi(strings.TrimSpace(string(pidString))) + } +} + +func init() { + plugins.Add("process", func() plugins.Plugin { + return NewProcstat() + }) +} diff --git a/plugins/procstat/procstat_test.go b/plugins/procstat/procstat_test.go new file mode 100644 index 000000000..0f02162d1 --- /dev/null +++ b/plugins/procstat/procstat_test.go @@ -0,0 +1,28 @@ +package procstat + +import ( + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "io/ioutil" + "os" + "strconv" + "testing" +) + +func TestGather(t *testing.T) { + var acc testutil.Accumulator + pid := os.Getpid() + file, err := ioutil.TempFile(os.TempDir(), "telegraf") + require.NoError(t, err) + file.Write([]byte(strconv.Itoa(pid))) + file.Close() + defer os.Remove(file.Name()) + specifications := []*Specification{&Specification{PidFile: file.Name(), Prefix: "foo"}} + p := Procstat{ + Specifications: specifications, + } + p.Gather(&acc) + assert.True(t, acc.HasFloatValue("foo_cpu_user")) + assert.True(t, acc.HasUIntValue("foo_memory_vms")) +} diff --git a/plugins/procstat/spec_processor.go b/plugins/procstat/spec_processor.go new file mode 100644 index 000000000..af33467eb --- /dev/null +++ b/plugins/procstat/spec_processor.go @@ -0,0 +1,107 @@ +package procstat + +import ( + "fmt" + "github.com/influxdb/telegraf/plugins" + "github.com/shirou/gopsutil/process" +) + +type SpecProcessor struct { + Prefix string + tags map[string]string + acc plugins.Accumulator + proc *process.Process +} + +func (p *SpecProcessor) add(metric string, value interface{}) { + p.acc.Add(p.Prefix+"_"+metric, value, p.tags) +} + +func NewSpecProcessor(prefix string, acc plugins.Accumulator, p *process.Process) *SpecProcessor { + return &SpecProcessor{ + Prefix: prefix, + tags: map[string]string{}, + acc: acc, + proc: p, + } +} + +func (p *SpecProcessor) pushMetrics() error { + if err := p.pushFDStats(); err != nil { + return err + } + if err := p.pushCtxStats(); err != nil { + return err + } + if err := p.pushIOStats(); err != nil { + return err + } + if err := p.pushCPUStats(); err != nil { + return err + } + if err := p.pushMemoryStats(); err != nil { + return err + } + return nil +} + +func (p *SpecProcessor) pushFDStats() error { + fds, err := p.proc.NumFDs() + if err != nil { + return fmt.Errorf("NumFD error: %s\n", err) + } + p.add("num_fds", fds) + return nil +} + +func (p *SpecProcessor) pushCtxStats() error { + ctx, err := p.proc.NumCtxSwitches() + if err != nil { + return fmt.Errorf("ContextSwitch error: %s\n", err) + } + p.add("voluntary_context_switches", ctx.Voluntary) + p.add("involuntary_context_switches", ctx.Involuntary) + return nil +} + +func (p *SpecProcessor) pushIOStats() error { + io, err := p.proc.IOCounters() + if err != nil { + return fmt.Errorf("IOCounters error: %s\n", err) + } + p.add("read_count", io.ReadCount) + p.add("write_count", io.WriteCount) + p.add("read_bytes", io.ReadBytes) + p.add("write_bytes", io.WriteCount) + return nil +} + +func (p *SpecProcessor) pushCPUStats() error { + cpu, err := p.proc.CPUTimes() + if err != nil { + return err + } + p.add("cpu_user", cpu.User) + p.add("cpu_system", cpu.System) + p.add("cpu_idle", cpu.Idle) + p.add("cpu_nice", cpu.Nice) + p.add("cpu_iowait", cpu.Iowait) + p.add("cpu_irq", cpu.Irq) + p.add("cpu_soft_irq", cpu.Softirq) + p.add("cpu_soft_steal", cpu.Steal) + p.add("cpu_soft_stolen", cpu.Stolen) + p.add("cpu_soft_guest", cpu.Guest) + p.add("cpu_soft_guest_nice", cpu.GuestNice) + return nil +} + +func (p *SpecProcessor) pushMemoryStats() error { + mem, err := p.proc.MemoryInfo() + if err != nil { + return err + } + p.add("memory_rss", mem.RSS) + p.add("memory_vms", mem.VMS) + p.add("memory_swap", mem.Swap) + return nil +}