Cross platform support for the 'processes' plugin

closes #798
This commit is contained in:
Cameron Sparr 2016-03-08 11:42:31 +01:00
parent 5ffa2a30be
commit 2f45b8b7f5
7 changed files with 384 additions and 38 deletions

View File

@ -14,6 +14,7 @@
- [#797](https://github.com/influxdata/telegraf/issues/797): Provide option for persistent MQTT consumer client sessions. - [#797](https://github.com/influxdata/telegraf/issues/797): Provide option for persistent MQTT consumer client sessions.
- [#799](https://github.com/influxdata/telegraf/pull/799): Add number of threads for procstat input plugin. Thanks @titilambert! - [#799](https://github.com/influxdata/telegraf/pull/799): Add number of threads for procstat input plugin. Thanks @titilambert!
- [#776](https://github.com/influxdata/telegraf/pull/776): Add Zookeeper chroot option to kafka_consumer. Thanks @prune998! - [#776](https://github.com/influxdata/telegraf/pull/776): Add Zookeeper chroot option to kafka_consumer. Thanks @prune998!
- [#811](https://github.com/influxdata/telegraf/pull/811): Add processes plugin for classifying total procs on system. Thanks @titilambert!
### Bugfixes ### Bugfixes
- [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":" - [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":"
@ -24,6 +25,7 @@
- [#773](https://github.com/influxdata/telegraf/issues/773): Fix duplicate measurements in snmp plugin. Thanks @titilambert! - [#773](https://github.com/influxdata/telegraf/issues/773): Fix duplicate measurements in snmp plugin. Thanks @titilambert!
- [#708](https://github.com/influxdata/telegraf/issues/708): packaging: build ARM package - [#708](https://github.com/influxdata/telegraf/issues/708): packaging: build ARM package
- [#713](https://github.com/influxdata/telegraf/issues/713): packaging: insecure permissions error on log directory - [#713](https://github.com/influxdata/telegraf/issues/713): packaging: insecure permissions error on log directory
- [#816](https://github.com/influxdata/telegraf/issues/816): Fix phpfpm panic if fcgi endpoint unreachable.
## v0.10.4.1 ## v0.10.4.1

View File

@ -214,11 +214,13 @@ Currently implemented sources:
* disk * disk
* diskio * diskio
* swap * swap
* processes
Telegraf can also collect metrics via the following service plugins: Telegraf can also collect metrics via the following service plugins:
* statsd * statsd
* udp listener * udp_listener
* tcp_listener
* mqtt_consumer * mqtt_consumer
* kafka_consumer * kafka_consumer
* nats_consumer * nats_consumer

View File

@ -123,6 +123,10 @@
[[inputs.mem]] [[inputs.mem]]
# no configuration # no configuration
# Get the number of processes and group them by status
[[inputs.processes]]
# no configuration
# Read metrics about swap memory usage # Read metrics about swap memory usage
[[inputs.swap]] [[inputs.swap]]
# no configuration # no configuration

View File

@ -0,0 +1,58 @@
# Processes Input Plugin
This plugin gathers info about the total number of processes and groups
them by status (zombie, sleeping, running, etc.)
On linux this plugin requires access to procfs (/proc), on other OSes
it requires access to execute `ps`.
### Configuration:
```toml
# Get the number of processes and group them by status
[[inputs.processes]]
# no configuration
```
### Measurements & Fields:
- processes
- blocked (aka disk sleep or uninterruptible sleep)
- running
- sleeping
- stopped
- total
- zombie
- wait (freebsd only)
- idle (bsd only)
- paging (linux only)
- total_threads (linux only)
### Process State Mappings
Different OSes use slightly different State codes for their processes, these
state codes are documented in `man ps`, and I will give a mapping of what major
OS state codes correspond to in telegraf metrics:
```
Linux FreeBSD Darwin meaning
R R R running
S S S sleeping
Z Z Z zombie
T T T stopped
none I I idle (sleeping for longer than about 20 seconds)
D D,L U blocked (waiting in uninterruptible sleep, or locked)
W W none paging (linux kernel < 2.6 only), wait (freebsd)
```
### Tags:
None
### Example Output:
```
$ telegraf -config ~/ws/telegraf.conf -input-filter processes -test
* Plugin: processes, Collection 1
> processes blocked=8i,running=1i,sleeping=265i,stopped=0i,total=274i,zombie=0i,paging=0i,total_threads=687i 1457478636980905042
```

View File

@ -1,61 +1,216 @@
// +build !windows
package system package system
import ( import (
"bytes"
"fmt" "fmt"
"io/ioutil"
"log" "log"
"os"
"os/exec"
"path"
"runtime"
"strconv"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/shirou/gopsutil/process"
) )
type Processes struct { type Processes struct {
execPS func() ([]byte, error)
readProcFile func(statFile string) ([]byte, error)
forcePS bool
forceProc bool
} }
func (_ *Processes) Description() string { func (p *Processes) Description() string {
return "Get the number of processes and group them by status (Linux only)" return "Get the number of processes and group them by status"
} }
func (_ *Processes) SampleConfig() string { return "" } func (p *Processes) SampleConfig() string { return "" }
func (s *Processes) Gather(acc telegraf.Accumulator) error { func (p *Processes) Gather(acc telegraf.Accumulator) error {
pids, err := process.Pids() // Get an empty map of metric fields
if err != nil { fields := getEmptyFields()
return fmt.Errorf("error getting pids list: %s", err)
// Decide if we will use 'ps' to get stats (use procfs otherwise)
usePS := true
if runtime.GOOS == "linux" {
usePS = false
} }
// TODO handle other OS (Windows/BSD/Solaris/OSX) if p.forcePS {
fields := map[string]interface{}{ usePS = true
"paging": uint64(0), } else if p.forceProc {
"blocked": uint64(0), usePS = false
"zombie": uint64(0),
"stopped": uint64(0),
"running": uint64(0),
"sleeping": uint64(0),
} }
for _, pid := range pids {
process, err := process.NewProcess(pid) // Gather stats from 'ps' or procfs
if err != nil { if usePS {
log.Printf("Can not get process %d status: %s", pid, err) if err := p.gatherFromPS(fields); err != nil {
continue return err
} }
status, err := process.Status() } else {
if err != nil { if err := p.gatherFromProc(fields); err != nil {
log.Printf("Can not get process %d status: %s\n", pid, err) return err
continue
} }
_, exists := fields[status]
if !exists {
log.Printf("Status '%s' for process with pid: %d\n", status, pid)
continue
}
fields[status] = fields[status].(uint64) + uint64(1)
} }
acc.AddFields("processes", fields, nil) acc.AddFields("processes", fields, nil)
return nil return nil
} }
// Gets empty fields of metrics based on the OS
func getEmptyFields() map[string]interface{} {
fields := map[string]interface{}{
"blocked": int64(0),
"zombie": int64(0),
"stopped": int64(0),
"running": int64(0),
"sleeping": int64(0),
"total": int64(0),
}
switch runtime.GOOS {
case "freebsd":
fields["idle"] = int64(0)
fields["wait"] = int64(0)
case "darwin":
fields["idle"] = int64(0)
case "openbsd":
fields["idle"] = int64(0)
case "linux":
fields["paging"] = int64(0)
fields["total_threads"] = int64(0)
}
return fields
}
// exec `ps` to get all process states
func (p *Processes) gatherFromPS(fields map[string]interface{}) error {
out, err := p.execPS()
if err != nil {
return err
}
for i, status := range bytes.Fields(out) {
if i == 0 && string(status) == "STAT" {
// This is a header, skip it
continue
}
switch status[0] {
case 'W':
fields["wait"] = fields["wait"].(int64) + int64(1)
case 'U', 'D', 'L':
// Also known as uninterruptible sleep or disk sleep
fields["blocked"] = fields["blocked"].(int64) + int64(1)
case 'Z':
fields["zombie"] = fields["zombie"].(int64) + int64(1)
case 'T':
fields["stopped"] = fields["stopped"].(int64) + int64(1)
case 'R':
fields["running"] = fields["running"].(int64) + int64(1)
case 'S':
fields["sleeping"] = fields["sleeping"].(int64) + int64(1)
case 'I':
fields["idle"] = fields["idle"].(int64) + int64(1)
default:
log.Printf("processes: Unknown state [ %s ] from ps",
string(status[0]))
}
fields["total"] = fields["total"].(int64) + int64(1)
}
return nil
}
// get process states from /proc/(pid)/stat files
func (p *Processes) gatherFromProc(fields map[string]interface{}) error {
files, err := ioutil.ReadDir("/proc")
if err != nil {
return err
}
for _, file := range files {
if !file.IsDir() {
continue
}
statFile := path.Join("/proc", file.Name(), "stat")
data, err := p.readProcFile(statFile)
if err != nil {
return err
}
if data == nil {
continue
}
stats := bytes.Fields(data)
if len(stats) < 3 {
return fmt.Errorf("Something is terribly wrong with %s", statFile)
}
switch stats[2][0] {
case 'R':
fields["running"] = fields["running"].(int64) + int64(1)
case 'S':
fields["sleeping"] = fields["sleeping"].(int64) + int64(1)
case 'D':
fields["blocked"] = fields["blocked"].(int64) + int64(1)
case 'Z':
fields["zombies"] = fields["zombies"].(int64) + int64(1)
case 'T', 't':
fields["stopped"] = fields["stopped"].(int64) + int64(1)
case 'W':
fields["paging"] = fields["paging"].(int64) + int64(1)
default:
log.Printf("processes: Unknown state [ %s ] in file %s",
string(stats[2][0]), statFile)
}
fields["total"] = fields["total"].(int64) + int64(1)
threads, err := strconv.Atoi(string(stats[19]))
if err != nil {
log.Printf("processes: Error parsing thread count: %s", err)
continue
}
fields["total_threads"] = fields["total_threads"].(int64) + int64(threads)
}
return nil
}
func readProcFile(statFile string) ([]byte, error) {
if _, err := os.Stat(statFile); os.IsNotExist(err) {
return nil, nil
} else if err != nil {
return nil, err
}
data, err := ioutil.ReadFile(statFile)
if err != nil {
return nil, err
}
return data, nil
}
func execPS() ([]byte, error) {
bin, err := exec.LookPath("ps")
if err != nil {
return nil, err
}
out, err := exec.Command(bin, "axo", "state").Output()
if err != nil {
return nil, err
}
return out, err
}
func init() { func init() {
inputs.Add("processes", func() telegraf.Input { inputs.Add("processes", func() telegraf.Input {
return &Processes{} return &Processes{
execPS: execPS,
readProcFile: readProcFile,
}
}) })
} }

View File

@ -1,6 +1,8 @@
package system package system
import ( import (
"fmt"
"runtime"
"testing" "testing"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
@ -9,13 +11,136 @@ import (
) )
func TestProcesses(t *testing.T) { func TestProcesses(t *testing.T) {
processes := &Processes{} processes := &Processes{
execPS: execPS,
readProcFile: readProcFile,
}
var acc testutil.Accumulator var acc testutil.Accumulator
err := processes.Gather(&acc) err := processes.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.True(t, acc.HasUIntField("processes", "running")) assert.True(t, acc.HasIntField("processes", "running"))
assert.True(t, acc.HasUIntField("processes", "sleeping")) assert.True(t, acc.HasIntField("processes", "sleeping"))
assert.True(t, acc.HasUIntField("processes", "stopped")) assert.True(t, acc.HasIntField("processes", "stopped"))
assert.True(t, acc.HasIntField("processes", "total"))
total, ok := acc.Get("processes")
require.True(t, ok)
assert.True(t, total.Fields["total"].(int64) > 0)
} }
func TestFromPS(t *testing.T) {
processes := &Processes{
execPS: testExecPS,
forcePS: true,
}
var acc testutil.Accumulator
err := processes.Gather(&acc)
require.NoError(t, err)
fields := getEmptyFields()
fields["blocked"] = int64(1)
fields["running"] = int64(4)
fields["sleeping"] = int64(34)
fields["total"] = int64(39)
acc.AssertContainsTaggedFields(t, "processes", fields, map[string]string{})
}
func TestFromPSError(t *testing.T) {
processes := &Processes{
execPS: testExecPSError,
forcePS: true,
}
var acc testutil.Accumulator
err := processes.Gather(&acc)
require.Error(t, err)
}
func TestFromProcFiles(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("This test only runs on linux")
}
tester := tester{}
processes := &Processes{
readProcFile: tester.testProcFile,
forceProc: true,
}
var acc testutil.Accumulator
err := processes.Gather(&acc)
require.NoError(t, err)
fields := getEmptyFields()
fields["sleeping"] = tester.calls
fields["total_threads"] = tester.calls * 2
fields["total"] = tester.calls
acc.AssertContainsTaggedFields(t, "processes", fields, map[string]string{})
}
func testExecPS() ([]byte, error) {
return []byte(testPSOut), nil
}
// struct for counting calls to testProcFile
type tester struct {
calls int64
}
func (t *tester) testProcFile(_ string) ([]byte, error) {
t.calls++
return []byte(fmt.Sprintf(testProcStat, "S", "2")), nil
}
func testExecPSError() ([]byte, error) {
return []byte(testPSOut), fmt.Errorf("ERROR!")
}
const testPSOut = `
STAT
S
S
S
S
R
R
S
S
Ss
Ss
S
SNs
Ss
Ss
S
R+
S
U
S
S
S
S
Ss
S+
Ss
S
S+
S+
Ss
S+
Ss
S
R+
Ss
S
S+
S+
Ss
S+
`
const testProcStat = `10 (rcuob/0) %s 2 0 0 0 -1 2129984 0 0 0 0 0 0 0 0 20 0 %s 0 11 0 0 18446744073709551615 0 0 0 0 0 0 0 2147483647 0 18446744073709551615 0 0 17 0 0 0 0 0 0 0 0 0 0 0 0 0 0
`

View File

@ -68,7 +68,7 @@ telegraf -sample-config > $tmpdir/config.toml
exit_if_fail telegraf -config $tmpdir/config.toml \ exit_if_fail telegraf -config $tmpdir/config.toml \
-test -input-filter cpu:mem -test -input-filter cpu:mem
mv $GOPATH/bin/telegraf $CIRCLE_ARTIFACTS cat $GOPATH/bin/telegraf | gzip > $CIRCLE_ARTIFACTS/telegraf.gz
eval "git describe --exact-match HEAD" eval "git describe --exact-match HEAD"
if [ $? -eq 0 ]; then if [ $? -eq 0 ]; then