parent
ca3df67156
commit
d4fe485d18
|
@ -14,6 +14,7 @@
|
||||||
- [#797](https://github.com/influxdata/telegraf/issues/797): Provide option for persistent MQTT consumer client sessions.
|
- [#797](https://github.com/influxdata/telegraf/issues/797): Provide option for persistent MQTT consumer client sessions.
|
||||||
- [#799](https://github.com/influxdata/telegraf/pull/799): Add number of threads for procstat input plugin. Thanks @titilambert!
|
- [#799](https://github.com/influxdata/telegraf/pull/799): Add number of threads for procstat input plugin. Thanks @titilambert!
|
||||||
- [#776](https://github.com/influxdata/telegraf/pull/776): Add Zookeeper chroot option to kafka_consumer. Thanks @prune998!
|
- [#776](https://github.com/influxdata/telegraf/pull/776): Add Zookeeper chroot option to kafka_consumer. Thanks @prune998!
|
||||||
|
- [#811](https://github.com/influxdata/telegraf/pull/811): Add processes plugin for classifying total procs on system. Thanks @titilambert!
|
||||||
|
|
||||||
### Bugfixes
|
### Bugfixes
|
||||||
- [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":"
|
- [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":"
|
||||||
|
@ -24,6 +25,7 @@
|
||||||
- [#773](https://github.com/influxdata/telegraf/issues/773): Fix duplicate measurements in snmp plugin. Thanks @titilambert!
|
- [#773](https://github.com/influxdata/telegraf/issues/773): Fix duplicate measurements in snmp plugin. Thanks @titilambert!
|
||||||
- [#708](https://github.com/influxdata/telegraf/issues/708): packaging: build ARM package
|
- [#708](https://github.com/influxdata/telegraf/issues/708): packaging: build ARM package
|
||||||
- [#713](https://github.com/influxdata/telegraf/issues/713): packaging: insecure permissions error on log directory
|
- [#713](https://github.com/influxdata/telegraf/issues/713): packaging: insecure permissions error on log directory
|
||||||
|
- [#816](https://github.com/influxdata/telegraf/issues/816): Fix phpfpm panic if fcgi endpoint unreachable.
|
||||||
|
|
||||||
## v0.10.4.1
|
## v0.10.4.1
|
||||||
|
|
||||||
|
|
|
@ -214,11 +214,13 @@ Currently implemented sources:
|
||||||
* disk
|
* disk
|
||||||
* diskio
|
* diskio
|
||||||
* swap
|
* swap
|
||||||
|
* processes
|
||||||
|
|
||||||
Telegraf can also collect metrics via the following service plugins:
|
Telegraf can also collect metrics via the following service plugins:
|
||||||
|
|
||||||
* statsd
|
* statsd
|
||||||
* udp listener
|
* udp_listener
|
||||||
|
* tcp_listener
|
||||||
* mqtt_consumer
|
* mqtt_consumer
|
||||||
* kafka_consumer
|
* kafka_consumer
|
||||||
* nats_consumer
|
* nats_consumer
|
||||||
|
|
|
@ -123,6 +123,10 @@
|
||||||
[[inputs.mem]]
|
[[inputs.mem]]
|
||||||
# no configuration
|
# no configuration
|
||||||
|
|
||||||
|
# Get the number of processes and group them by status
|
||||||
|
[[inputs.processes]]
|
||||||
|
# no configuration
|
||||||
|
|
||||||
# Read metrics about swap memory usage
|
# Read metrics about swap memory usage
|
||||||
[[inputs.swap]]
|
[[inputs.swap]]
|
||||||
# no configuration
|
# no configuration
|
||||||
|
|
|
@ -0,0 +1,58 @@
|
||||||
|
# Processes Input Plugin
|
||||||
|
|
||||||
|
This plugin gathers info about the total number of processes and groups
|
||||||
|
them by status (zombie, sleeping, running, etc.)
|
||||||
|
|
||||||
|
On linux this plugin requires access to procfs (/proc), on other OSes
|
||||||
|
it requires access to execute `ps`.
|
||||||
|
|
||||||
|
### Configuration:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
# Get the number of processes and group them by status
|
||||||
|
[[inputs.processes]]
|
||||||
|
# no configuration
|
||||||
|
```
|
||||||
|
|
||||||
|
### Measurements & Fields:
|
||||||
|
|
||||||
|
- processes
|
||||||
|
- blocked (aka disk sleep or uninterruptible sleep)
|
||||||
|
- running
|
||||||
|
- sleeping
|
||||||
|
- stopped
|
||||||
|
- total
|
||||||
|
- zombie
|
||||||
|
- wait (freebsd only)
|
||||||
|
- idle (bsd only)
|
||||||
|
- paging (linux only)
|
||||||
|
- total_threads (linux only)
|
||||||
|
|
||||||
|
### Process State Mappings
|
||||||
|
|
||||||
|
Different OSes use slightly different State codes for their processes, these
|
||||||
|
state codes are documented in `man ps`, and I will give a mapping of what major
|
||||||
|
OS state codes correspond to in telegraf metrics:
|
||||||
|
|
||||||
|
```
|
||||||
|
Linux FreeBSD Darwin meaning
|
||||||
|
R R R running
|
||||||
|
S S S sleeping
|
||||||
|
Z Z Z zombie
|
||||||
|
T T T stopped
|
||||||
|
none I I idle (sleeping for longer than about 20 seconds)
|
||||||
|
D D,L U blocked (waiting in uninterruptible sleep, or locked)
|
||||||
|
W W none paging (linux kernel < 2.6 only), wait (freebsd)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Tags:
|
||||||
|
|
||||||
|
None
|
||||||
|
|
||||||
|
### Example Output:
|
||||||
|
|
||||||
|
```
|
||||||
|
$ telegraf -config ~/ws/telegraf.conf -input-filter processes -test
|
||||||
|
* Plugin: processes, Collection 1
|
||||||
|
> processes blocked=8i,running=1i,sleeping=265i,stopped=0i,total=274i,zombie=0i,paging=0i,total_threads=687i 1457478636980905042
|
||||||
|
```
|
|
@ -1,61 +1,216 @@
|
||||||
|
// +build !windows
|
||||||
|
|
||||||
package system
|
package system
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"path"
|
||||||
|
"runtime"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
"github.com/shirou/gopsutil/process"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Processes struct {
|
type Processes struct {
|
||||||
|
execPS func() ([]byte, error)
|
||||||
|
readProcFile func(statFile string) ([]byte, error)
|
||||||
|
|
||||||
|
forcePS bool
|
||||||
|
forceProc bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_ *Processes) Description() string {
|
func (p *Processes) Description() string {
|
||||||
return "Get the number of processes and group them by status (Linux only)"
|
return "Get the number of processes and group them by status"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_ *Processes) SampleConfig() string { return "" }
|
func (p *Processes) SampleConfig() string { return "" }
|
||||||
|
|
||||||
func (s *Processes) Gather(acc telegraf.Accumulator) error {
|
func (p *Processes) Gather(acc telegraf.Accumulator) error {
|
||||||
pids, err := process.Pids()
|
// Get an empty map of metric fields
|
||||||
if err != nil {
|
fields := getEmptyFields()
|
||||||
return fmt.Errorf("error getting pids list: %s", err)
|
|
||||||
|
// Decide if we will use 'ps' to get stats (use procfs otherwise)
|
||||||
|
usePS := true
|
||||||
|
if runtime.GOOS == "linux" {
|
||||||
|
usePS = false
|
||||||
}
|
}
|
||||||
// TODO handle other OS (Windows/BSD/Solaris/OSX)
|
if p.forcePS {
|
||||||
fields := map[string]interface{}{
|
usePS = true
|
||||||
"paging": uint64(0),
|
} else if p.forceProc {
|
||||||
"blocked": uint64(0),
|
usePS = false
|
||||||
"zombie": uint64(0),
|
|
||||||
"stopped": uint64(0),
|
|
||||||
"running": uint64(0),
|
|
||||||
"sleeping": uint64(0),
|
|
||||||
}
|
}
|
||||||
for _, pid := range pids {
|
|
||||||
process, err := process.NewProcess(pid)
|
// Gather stats from 'ps' or procfs
|
||||||
if err != nil {
|
if usePS {
|
||||||
log.Printf("Can not get process %d status: %s", pid, err)
|
if err := p.gatherFromPS(fields); err != nil {
|
||||||
continue
|
return err
|
||||||
}
|
}
|
||||||
status, err := process.Status()
|
} else {
|
||||||
if err != nil {
|
if err := p.gatherFromProc(fields); err != nil {
|
||||||
log.Printf("Can not get process %d status: %s\n", pid, err)
|
return err
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
_, exists := fields[status]
|
|
||||||
if !exists {
|
|
||||||
log.Printf("Status '%s' for process with pid: %d\n", status, pid)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
fields[status] = fields[status].(uint64) + uint64(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
acc.AddFields("processes", fields, nil)
|
acc.AddFields("processes", fields, nil)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Gets empty fields of metrics based on the OS
|
||||||
|
func getEmptyFields() map[string]interface{} {
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"blocked": int64(0),
|
||||||
|
"zombie": int64(0),
|
||||||
|
"stopped": int64(0),
|
||||||
|
"running": int64(0),
|
||||||
|
"sleeping": int64(0),
|
||||||
|
"total": int64(0),
|
||||||
|
}
|
||||||
|
switch runtime.GOOS {
|
||||||
|
case "freebsd":
|
||||||
|
fields["idle"] = int64(0)
|
||||||
|
fields["wait"] = int64(0)
|
||||||
|
case "darwin":
|
||||||
|
fields["idle"] = int64(0)
|
||||||
|
case "openbsd":
|
||||||
|
fields["idle"] = int64(0)
|
||||||
|
case "linux":
|
||||||
|
fields["paging"] = int64(0)
|
||||||
|
fields["total_threads"] = int64(0)
|
||||||
|
}
|
||||||
|
return fields
|
||||||
|
}
|
||||||
|
|
||||||
|
// exec `ps` to get all process states
|
||||||
|
func (p *Processes) gatherFromPS(fields map[string]interface{}) error {
|
||||||
|
out, err := p.execPS()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, status := range bytes.Fields(out) {
|
||||||
|
if i == 0 && string(status) == "STAT" {
|
||||||
|
// This is a header, skip it
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
switch status[0] {
|
||||||
|
case 'W':
|
||||||
|
fields["wait"] = fields["wait"].(int64) + int64(1)
|
||||||
|
case 'U', 'D', 'L':
|
||||||
|
// Also known as uninterruptible sleep or disk sleep
|
||||||
|
fields["blocked"] = fields["blocked"].(int64) + int64(1)
|
||||||
|
case 'Z':
|
||||||
|
fields["zombie"] = fields["zombie"].(int64) + int64(1)
|
||||||
|
case 'T':
|
||||||
|
fields["stopped"] = fields["stopped"].(int64) + int64(1)
|
||||||
|
case 'R':
|
||||||
|
fields["running"] = fields["running"].(int64) + int64(1)
|
||||||
|
case 'S':
|
||||||
|
fields["sleeping"] = fields["sleeping"].(int64) + int64(1)
|
||||||
|
case 'I':
|
||||||
|
fields["idle"] = fields["idle"].(int64) + int64(1)
|
||||||
|
default:
|
||||||
|
log.Printf("processes: Unknown state [ %s ] from ps",
|
||||||
|
string(status[0]))
|
||||||
|
}
|
||||||
|
fields["total"] = fields["total"].(int64) + int64(1)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// get process states from /proc/(pid)/stat files
|
||||||
|
func (p *Processes) gatherFromProc(fields map[string]interface{}) error {
|
||||||
|
files, err := ioutil.ReadDir("/proc")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, file := range files {
|
||||||
|
if !file.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
statFile := path.Join("/proc", file.Name(), "stat")
|
||||||
|
data, err := p.readProcFile(statFile)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if data == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
stats := bytes.Fields(data)
|
||||||
|
if len(stats) < 3 {
|
||||||
|
return fmt.Errorf("Something is terribly wrong with %s", statFile)
|
||||||
|
}
|
||||||
|
switch stats[2][0] {
|
||||||
|
case 'R':
|
||||||
|
fields["running"] = fields["running"].(int64) + int64(1)
|
||||||
|
case 'S':
|
||||||
|
fields["sleeping"] = fields["sleeping"].(int64) + int64(1)
|
||||||
|
case 'D':
|
||||||
|
fields["blocked"] = fields["blocked"].(int64) + int64(1)
|
||||||
|
case 'Z':
|
||||||
|
fields["zombies"] = fields["zombies"].(int64) + int64(1)
|
||||||
|
case 'T', 't':
|
||||||
|
fields["stopped"] = fields["stopped"].(int64) + int64(1)
|
||||||
|
case 'W':
|
||||||
|
fields["paging"] = fields["paging"].(int64) + int64(1)
|
||||||
|
default:
|
||||||
|
log.Printf("processes: Unknown state [ %s ] in file %s",
|
||||||
|
string(stats[2][0]), statFile)
|
||||||
|
}
|
||||||
|
fields["total"] = fields["total"].(int64) + int64(1)
|
||||||
|
|
||||||
|
threads, err := strconv.Atoi(string(stats[19]))
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("processes: Error parsing thread count: %s", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fields["total_threads"] = fields["total_threads"].(int64) + int64(threads)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func readProcFile(statFile string) ([]byte, error) {
|
||||||
|
if _, err := os.Stat(statFile); os.IsNotExist(err) {
|
||||||
|
return nil, nil
|
||||||
|
} else if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := ioutil.ReadFile(statFile)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return data, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func execPS() ([]byte, error) {
|
||||||
|
bin, err := exec.LookPath("ps")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
out, err := exec.Command(bin, "axo", "state").Output()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return out, err
|
||||||
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
inputs.Add("processes", func() telegraf.Input {
|
inputs.Add("processes", func() telegraf.Input {
|
||||||
return &Processes{}
|
return &Processes{
|
||||||
|
execPS: execPS,
|
||||||
|
readProcFile: readProcFile,
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package system
|
package system
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"runtime"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
@ -9,13 +11,136 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestProcesses(t *testing.T) {
|
func TestProcesses(t *testing.T) {
|
||||||
processes := &Processes{}
|
processes := &Processes{
|
||||||
|
execPS: execPS,
|
||||||
|
readProcFile: readProcFile,
|
||||||
|
}
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
err := processes.Gather(&acc)
|
err := processes.Gather(&acc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
assert.True(t, acc.HasUIntField("processes", "running"))
|
assert.True(t, acc.HasIntField("processes", "running"))
|
||||||
assert.True(t, acc.HasUIntField("processes", "sleeping"))
|
assert.True(t, acc.HasIntField("processes", "sleeping"))
|
||||||
assert.True(t, acc.HasUIntField("processes", "stopped"))
|
assert.True(t, acc.HasIntField("processes", "stopped"))
|
||||||
|
assert.True(t, acc.HasIntField("processes", "total"))
|
||||||
|
total, ok := acc.Get("processes")
|
||||||
|
require.True(t, ok)
|
||||||
|
assert.True(t, total.Fields["total"].(int64) > 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFromPS(t *testing.T) {
|
||||||
|
processes := &Processes{
|
||||||
|
execPS: testExecPS,
|
||||||
|
forcePS: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
err := processes.Gather(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fields := getEmptyFields()
|
||||||
|
fields["blocked"] = int64(1)
|
||||||
|
fields["running"] = int64(4)
|
||||||
|
fields["sleeping"] = int64(34)
|
||||||
|
fields["total"] = int64(39)
|
||||||
|
|
||||||
|
acc.AssertContainsTaggedFields(t, "processes", fields, map[string]string{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFromPSError(t *testing.T) {
|
||||||
|
processes := &Processes{
|
||||||
|
execPS: testExecPSError,
|
||||||
|
forcePS: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
err := processes.Gather(&acc)
|
||||||
|
require.Error(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFromProcFiles(t *testing.T) {
|
||||||
|
if runtime.GOOS != "linux" {
|
||||||
|
t.Skip("This test only runs on linux")
|
||||||
|
}
|
||||||
|
tester := tester{}
|
||||||
|
processes := &Processes{
|
||||||
|
readProcFile: tester.testProcFile,
|
||||||
|
forceProc: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
err := processes.Gather(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fields := getEmptyFields()
|
||||||
|
fields["sleeping"] = tester.calls
|
||||||
|
fields["total_threads"] = tester.calls * 2
|
||||||
|
fields["total"] = tester.calls
|
||||||
|
|
||||||
|
acc.AssertContainsTaggedFields(t, "processes", fields, map[string]string{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func testExecPS() ([]byte, error) {
|
||||||
|
return []byte(testPSOut), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// struct for counting calls to testProcFile
|
||||||
|
type tester struct {
|
||||||
|
calls int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *tester) testProcFile(_ string) ([]byte, error) {
|
||||||
|
t.calls++
|
||||||
|
return []byte(fmt.Sprintf(testProcStat, "S", "2")), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func testExecPSError() ([]byte, error) {
|
||||||
|
return []byte(testPSOut), fmt.Errorf("ERROR!")
|
||||||
|
}
|
||||||
|
|
||||||
|
const testPSOut = `
|
||||||
|
STAT
|
||||||
|
S
|
||||||
|
S
|
||||||
|
S
|
||||||
|
S
|
||||||
|
R
|
||||||
|
R
|
||||||
|
S
|
||||||
|
S
|
||||||
|
Ss
|
||||||
|
Ss
|
||||||
|
S
|
||||||
|
SNs
|
||||||
|
Ss
|
||||||
|
Ss
|
||||||
|
S
|
||||||
|
R+
|
||||||
|
S
|
||||||
|
U
|
||||||
|
S
|
||||||
|
S
|
||||||
|
S
|
||||||
|
S
|
||||||
|
Ss
|
||||||
|
S+
|
||||||
|
Ss
|
||||||
|
S
|
||||||
|
S+
|
||||||
|
S+
|
||||||
|
Ss
|
||||||
|
S+
|
||||||
|
Ss
|
||||||
|
S
|
||||||
|
R+
|
||||||
|
Ss
|
||||||
|
S
|
||||||
|
S+
|
||||||
|
S+
|
||||||
|
Ss
|
||||||
|
S+
|
||||||
|
`
|
||||||
|
|
||||||
|
const testProcStat = `10 (rcuob/0) %s 2 0 0 0 -1 2129984 0 0 0 0 0 0 0 0 20 0 %s 0 11 0 0 18446744073709551615 0 0 0 0 0 0 0 2147483647 0 18446744073709551615 0 0 17 0 0 0 0 0 0 0 0 0 0 0 0 0 0
|
||||||
|
`
|
||||||
|
|
|
@ -68,7 +68,7 @@ telegraf -sample-config > $tmpdir/config.toml
|
||||||
exit_if_fail telegraf -config $tmpdir/config.toml \
|
exit_if_fail telegraf -config $tmpdir/config.toml \
|
||||||
-test -input-filter cpu:mem
|
-test -input-filter cpu:mem
|
||||||
|
|
||||||
mv $GOPATH/bin/telegraf $CIRCLE_ARTIFACTS
|
cat $GOPATH/bin/telegraf | gzip > $CIRCLE_ARTIFACTS/telegraf.gz
|
||||||
|
|
||||||
eval "git describe --exact-match HEAD"
|
eval "git describe --exact-match HEAD"
|
||||||
if [ $? -eq 0 ]; then
|
if [ $? -eq 0 ]; then
|
||||||
|
|
Loading…
Reference in New Issue