From d4fe485d188038bd0f92c077a7c2cd19a1bafbc4 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 8 Mar 2016 11:42:31 +0100 Subject: [PATCH] Cross platform support for the 'processes' plugin closes #798 --- CHANGELOG.md | 2 + README.md | 4 +- etc/telegraf.conf | 4 + plugins/inputs/system/PROCESSES_README.md | 58 ++++++ plugins/inputs/system/processes.go | 219 ++++++++++++++++++---- plugins/inputs/system/processes_test.go | 133 ++++++++++++- scripts/circle-test.sh | 2 +- 7 files changed, 384 insertions(+), 38 deletions(-) create mode 100644 plugins/inputs/system/PROCESSES_README.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 2eb9fa652..5ef68bd45 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ - [#797](https://github.com/influxdata/telegraf/issues/797): Provide option for persistent MQTT consumer client sessions. - [#799](https://github.com/influxdata/telegraf/pull/799): Add number of threads for procstat input plugin. Thanks @titilambert! - [#776](https://github.com/influxdata/telegraf/pull/776): Add Zookeeper chroot option to kafka_consumer. Thanks @prune998! +- [#811](https://github.com/influxdata/telegraf/pull/811): Add processes plugin for classifying total procs on system. Thanks @titilambert! ### Bugfixes - [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":" @@ -24,6 +25,7 @@ - [#773](https://github.com/influxdata/telegraf/issues/773): Fix duplicate measurements in snmp plugin. Thanks @titilambert! - [#708](https://github.com/influxdata/telegraf/issues/708): packaging: build ARM package - [#713](https://github.com/influxdata/telegraf/issues/713): packaging: insecure permissions error on log directory +- [#816](https://github.com/influxdata/telegraf/issues/816): Fix phpfpm panic if fcgi endpoint unreachable. ## v0.10.4.1 diff --git a/README.md b/README.md index e9c20996a..fb9363100 100644 --- a/README.md +++ b/README.md @@ -214,11 +214,13 @@ Currently implemented sources: * disk * diskio * swap + * processes Telegraf can also collect metrics via the following service plugins: * statsd -* udp listener +* udp_listener +* tcp_listener * mqtt_consumer * kafka_consumer * nats_consumer diff --git a/etc/telegraf.conf b/etc/telegraf.conf index a6057ecd2..3deb7f895 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -123,6 +123,10 @@ [[inputs.mem]] # no configuration +# Get the number of processes and group them by status +[[inputs.processes]] + # no configuration + # Read metrics about swap memory usage [[inputs.swap]] # no configuration diff --git a/plugins/inputs/system/PROCESSES_README.md b/plugins/inputs/system/PROCESSES_README.md new file mode 100644 index 000000000..006e043fb --- /dev/null +++ b/plugins/inputs/system/PROCESSES_README.md @@ -0,0 +1,58 @@ +# Processes Input Plugin + +This plugin gathers info about the total number of processes and groups +them by status (zombie, sleeping, running, etc.) + +On linux this plugin requires access to procfs (/proc), on other OSes +it requires access to execute `ps`. + +### Configuration: + +```toml +# Get the number of processes and group them by status +[[inputs.processes]] + # no configuration +``` + +### Measurements & Fields: + +- processes + - blocked (aka disk sleep or uninterruptible sleep) + - running + - sleeping + - stopped + - total + - zombie + - wait (freebsd only) + - idle (bsd only) + - paging (linux only) + - total_threads (linux only) + +### Process State Mappings + +Different OSes use slightly different State codes for their processes, these +state codes are documented in `man ps`, and I will give a mapping of what major +OS state codes correspond to in telegraf metrics: + +``` +Linux FreeBSD Darwin meaning + R R R running + S S S sleeping + Z Z Z zombie + T T T stopped + none I I idle (sleeping for longer than about 20 seconds) + D D,L U blocked (waiting in uninterruptible sleep, or locked) + W W none paging (linux kernel < 2.6 only), wait (freebsd) +``` + +### Tags: + +None + +### Example Output: + +``` +$ telegraf -config ~/ws/telegraf.conf -input-filter processes -test +* Plugin: processes, Collection 1 +> processes blocked=8i,running=1i,sleeping=265i,stopped=0i,total=274i,zombie=0i,paging=0i,total_threads=687i 1457478636980905042 +``` diff --git a/plugins/inputs/system/processes.go b/plugins/inputs/system/processes.go index c4b791e3c..b7ee32066 100644 --- a/plugins/inputs/system/processes.go +++ b/plugins/inputs/system/processes.go @@ -1,61 +1,216 @@ +// +build !windows + package system import ( + "bytes" "fmt" + "io/ioutil" "log" + "os" + "os/exec" + "path" + "runtime" + "strconv" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" - "github.com/shirou/gopsutil/process" ) type Processes struct { + execPS func() ([]byte, error) + readProcFile func(statFile string) ([]byte, error) + + forcePS bool + forceProc bool } -func (_ *Processes) Description() string { - return "Get the number of processes and group them by status (Linux only)" +func (p *Processes) Description() string { + return "Get the number of processes and group them by status" } -func (_ *Processes) SampleConfig() string { return "" } +func (p *Processes) SampleConfig() string { return "" } -func (s *Processes) Gather(acc telegraf.Accumulator) error { - pids, err := process.Pids() - if err != nil { - return fmt.Errorf("error getting pids list: %s", err) +func (p *Processes) Gather(acc telegraf.Accumulator) error { + // Get an empty map of metric fields + fields := getEmptyFields() + + // Decide if we will use 'ps' to get stats (use procfs otherwise) + usePS := true + if runtime.GOOS == "linux" { + usePS = false } - // TODO handle other OS (Windows/BSD/Solaris/OSX) - fields := map[string]interface{}{ - "paging": uint64(0), - "blocked": uint64(0), - "zombie": uint64(0), - "stopped": uint64(0), - "running": uint64(0), - "sleeping": uint64(0), + if p.forcePS { + usePS = true + } else if p.forceProc { + usePS = false } - for _, pid := range pids { - process, err := process.NewProcess(pid) - if err != nil { - log.Printf("Can not get process %d status: %s", pid, err) - continue + + // Gather stats from 'ps' or procfs + if usePS { + if err := p.gatherFromPS(fields); err != nil { + return err } - status, err := process.Status() - if err != nil { - log.Printf("Can not get process %d status: %s\n", pid, err) - continue + } else { + if err := p.gatherFromProc(fields); err != nil { + return err } - _, exists := fields[status] - if !exists { - log.Printf("Status '%s' for process with pid: %d\n", status, pid) - continue - } - fields[status] = fields[status].(uint64) + uint64(1) } acc.AddFields("processes", fields, nil) return nil } + +// Gets empty fields of metrics based on the OS +func getEmptyFields() map[string]interface{} { + fields := map[string]interface{}{ + "blocked": int64(0), + "zombie": int64(0), + "stopped": int64(0), + "running": int64(0), + "sleeping": int64(0), + "total": int64(0), + } + switch runtime.GOOS { + case "freebsd": + fields["idle"] = int64(0) + fields["wait"] = int64(0) + case "darwin": + fields["idle"] = int64(0) + case "openbsd": + fields["idle"] = int64(0) + case "linux": + fields["paging"] = int64(0) + fields["total_threads"] = int64(0) + } + return fields +} + +// exec `ps` to get all process states +func (p *Processes) gatherFromPS(fields map[string]interface{}) error { + out, err := p.execPS() + if err != nil { + return err + } + + for i, status := range bytes.Fields(out) { + if i == 0 && string(status) == "STAT" { + // This is a header, skip it + continue + } + switch status[0] { + case 'W': + fields["wait"] = fields["wait"].(int64) + int64(1) + case 'U', 'D', 'L': + // Also known as uninterruptible sleep or disk sleep + fields["blocked"] = fields["blocked"].(int64) + int64(1) + case 'Z': + fields["zombie"] = fields["zombie"].(int64) + int64(1) + case 'T': + fields["stopped"] = fields["stopped"].(int64) + int64(1) + case 'R': + fields["running"] = fields["running"].(int64) + int64(1) + case 'S': + fields["sleeping"] = fields["sleeping"].(int64) + int64(1) + case 'I': + fields["idle"] = fields["idle"].(int64) + int64(1) + default: + log.Printf("processes: Unknown state [ %s ] from ps", + string(status[0])) + } + fields["total"] = fields["total"].(int64) + int64(1) + } + return nil +} + +// get process states from /proc/(pid)/stat files +func (p *Processes) gatherFromProc(fields map[string]interface{}) error { + files, err := ioutil.ReadDir("/proc") + if err != nil { + return err + } + + for _, file := range files { + if !file.IsDir() { + continue + } + + statFile := path.Join("/proc", file.Name(), "stat") + data, err := p.readProcFile(statFile) + if err != nil { + return err + } + if data == nil { + continue + } + + stats := bytes.Fields(data) + if len(stats) < 3 { + return fmt.Errorf("Something is terribly wrong with %s", statFile) + } + switch stats[2][0] { + case 'R': + fields["running"] = fields["running"].(int64) + int64(1) + case 'S': + fields["sleeping"] = fields["sleeping"].(int64) + int64(1) + case 'D': + fields["blocked"] = fields["blocked"].(int64) + int64(1) + case 'Z': + fields["zombies"] = fields["zombies"].(int64) + int64(1) + case 'T', 't': + fields["stopped"] = fields["stopped"].(int64) + int64(1) + case 'W': + fields["paging"] = fields["paging"].(int64) + int64(1) + default: + log.Printf("processes: Unknown state [ %s ] in file %s", + string(stats[2][0]), statFile) + } + fields["total"] = fields["total"].(int64) + int64(1) + + threads, err := strconv.Atoi(string(stats[19])) + if err != nil { + log.Printf("processes: Error parsing thread count: %s", err) + continue + } + fields["total_threads"] = fields["total_threads"].(int64) + int64(threads) + } + return nil +} + +func readProcFile(statFile string) ([]byte, error) { + if _, err := os.Stat(statFile); os.IsNotExist(err) { + return nil, nil + } else if err != nil { + return nil, err + } + + data, err := ioutil.ReadFile(statFile) + if err != nil { + return nil, err + } + + return data, nil +} + +func execPS() ([]byte, error) { + bin, err := exec.LookPath("ps") + if err != nil { + return nil, err + } + + out, err := exec.Command(bin, "axo", "state").Output() + if err != nil { + return nil, err + } + + return out, err +} + func init() { inputs.Add("processes", func() telegraf.Input { - return &Processes{} + return &Processes{ + execPS: execPS, + readProcFile: readProcFile, + } }) } diff --git a/plugins/inputs/system/processes_test.go b/plugins/inputs/system/processes_test.go index 246884711..0e2b5e105 100644 --- a/plugins/inputs/system/processes_test.go +++ b/plugins/inputs/system/processes_test.go @@ -1,6 +1,8 @@ package system import ( + "fmt" + "runtime" "testing" "github.com/influxdata/telegraf/testutil" @@ -9,13 +11,136 @@ import ( ) func TestProcesses(t *testing.T) { - processes := &Processes{} + processes := &Processes{ + execPS: execPS, + readProcFile: readProcFile, + } var acc testutil.Accumulator err := processes.Gather(&acc) require.NoError(t, err) - assert.True(t, acc.HasUIntField("processes", "running")) - assert.True(t, acc.HasUIntField("processes", "sleeping")) - assert.True(t, acc.HasUIntField("processes", "stopped")) + assert.True(t, acc.HasIntField("processes", "running")) + assert.True(t, acc.HasIntField("processes", "sleeping")) + assert.True(t, acc.HasIntField("processes", "stopped")) + assert.True(t, acc.HasIntField("processes", "total")) + total, ok := acc.Get("processes") + require.True(t, ok) + assert.True(t, total.Fields["total"].(int64) > 0) } + +func TestFromPS(t *testing.T) { + processes := &Processes{ + execPS: testExecPS, + forcePS: true, + } + + var acc testutil.Accumulator + err := processes.Gather(&acc) + require.NoError(t, err) + + fields := getEmptyFields() + fields["blocked"] = int64(1) + fields["running"] = int64(4) + fields["sleeping"] = int64(34) + fields["total"] = int64(39) + + acc.AssertContainsTaggedFields(t, "processes", fields, map[string]string{}) +} + +func TestFromPSError(t *testing.T) { + processes := &Processes{ + execPS: testExecPSError, + forcePS: true, + } + + var acc testutil.Accumulator + err := processes.Gather(&acc) + require.Error(t, err) +} + +func TestFromProcFiles(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("This test only runs on linux") + } + tester := tester{} + processes := &Processes{ + readProcFile: tester.testProcFile, + forceProc: true, + } + + var acc testutil.Accumulator + err := processes.Gather(&acc) + require.NoError(t, err) + + fields := getEmptyFields() + fields["sleeping"] = tester.calls + fields["total_threads"] = tester.calls * 2 + fields["total"] = tester.calls + + acc.AssertContainsTaggedFields(t, "processes", fields, map[string]string{}) +} + +func testExecPS() ([]byte, error) { + return []byte(testPSOut), nil +} + +// struct for counting calls to testProcFile +type tester struct { + calls int64 +} + +func (t *tester) testProcFile(_ string) ([]byte, error) { + t.calls++ + return []byte(fmt.Sprintf(testProcStat, "S", "2")), nil +} + +func testExecPSError() ([]byte, error) { + return []byte(testPSOut), fmt.Errorf("ERROR!") +} + +const testPSOut = ` +STAT +S +S +S +S +R +R +S +S +Ss +Ss +S +SNs +Ss +Ss +S +R+ +S +U +S +S +S +S +Ss +S+ +Ss +S +S+ +S+ +Ss +S+ +Ss +S +R+ +Ss +S +S+ +S+ +Ss +S+ +` + +const testProcStat = `10 (rcuob/0) %s 2 0 0 0 -1 2129984 0 0 0 0 0 0 0 0 20 0 %s 0 11 0 0 18446744073709551615 0 0 0 0 0 0 0 2147483647 0 18446744073709551615 0 0 17 0 0 0 0 0 0 0 0 0 0 0 0 0 0 +` diff --git a/scripts/circle-test.sh b/scripts/circle-test.sh index 9a3e0e678..f0288c73e 100755 --- a/scripts/circle-test.sh +++ b/scripts/circle-test.sh @@ -68,7 +68,7 @@ telegraf -sample-config > $tmpdir/config.toml exit_if_fail telegraf -config $tmpdir/config.toml \ -test -input-filter cpu:mem -mv $GOPATH/bin/telegraf $CIRCLE_ARTIFACTS +cat $GOPATH/bin/telegraf | gzip > $CIRCLE_ARTIFACTS/telegraf.gz eval "git describe --exact-match HEAD" if [ $? -eq 0 ]; then