Procstat: don't cache PIDs (#2206)
* Procstat: don't cache PIDs Changed the procstat input plugin to not cache PIDs. Solves #1636. The logic of creating a process by pid was moved from `procstat.go` to `spec_processor.go`. * Procstat: go fmt * procstat: modify changelog for #2206
This commit is contained in:
parent
036d1beb87
commit
2a32cba35b
|
@ -45,6 +45,8 @@ It is highly recommended that all users migrate to the new riemann output plugin
|
||||||
|
|
||||||
- [#2077](https://github.com/influxdata/telegraf/issues/2077): SQL Server Input - Arithmetic overflow error converting numeric to data type int.
|
- [#2077](https://github.com/influxdata/telegraf/issues/2077): SQL Server Input - Arithmetic overflow error converting numeric to data type int.
|
||||||
- [#2262](https://github.com/influxdata/telegraf/issues/2262): Flush jitter can inhibit metric collection.
|
- [#2262](https://github.com/influxdata/telegraf/issues/2262): Flush jitter can inhibit metric collection.
|
||||||
|
- [#2287](https://github.com/influxdata/telegraf/issues/2287): Kubernetes input: Handle null startTime for stopped pods
|
||||||
|
- [#1636](https://github.com/influxdata/telegraf/issues/1636): procstat - stop caching PIDs.
|
||||||
- [#2318](https://github.com/influxdata/telegraf/issues/2318): haproxy input - Add missing fields.
|
- [#2318](https://github.com/influxdata/telegraf/issues/2318): haproxy input - Add missing fields.
|
||||||
- [#2287](https://github.com/influxdata/telegraf/issues/2287): Kubernetes input: Handle null startTime for stopped pods.
|
- [#2287](https://github.com/influxdata/telegraf/issues/2287): Kubernetes input: Handle null startTime for stopped pods.
|
||||||
- [#2356](https://github.com/influxdata/telegraf/issues/2356): cpu input panic when /proc/stat is empty.
|
- [#2356](https://github.com/influxdata/telegraf/issues/2356): cpu input panic when /proc/stat is empty.
|
||||||
|
|
|
@ -8,8 +8,6 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/shirou/gopsutil/process"
|
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
)
|
)
|
||||||
|
@ -23,15 +21,12 @@ type Procstat struct {
|
||||||
User string
|
User string
|
||||||
PidTag bool
|
PidTag bool
|
||||||
|
|
||||||
// pidmap maps a pid to a process object, so we don't recreate every gather
|
|
||||||
pidmap map[int32]*process.Process
|
|
||||||
// tagmap maps a pid to a map of tags for that pid
|
// tagmap maps a pid to a map of tags for that pid
|
||||||
tagmap map[int32]map[string]string
|
tagmap map[int32]map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewProcstat() *Procstat {
|
func NewProcstat() *Procstat {
|
||||||
return &Procstat{
|
return &Procstat{
|
||||||
pidmap: make(map[int32]*process.Process),
|
|
||||||
tagmap: make(map[int32]map[string]string),
|
tagmap: make(map[int32]map[string]string),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -67,51 +62,26 @@ func (_ *Procstat) Description() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Procstat) Gather(acc telegraf.Accumulator) error {
|
func (p *Procstat) Gather(acc telegraf.Accumulator) error {
|
||||||
err := p.createProcesses()
|
pids, err := p.getAllPids()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("E! Error: procstat getting process, exe: [%s] pidfile: [%s] pattern: [%s] user: [%s] %s",
|
log.Printf("E! Error: procstat getting process, exe: [%s] pidfile: [%s] pattern: [%s] user: [%s] %s",
|
||||||
p.Exe, p.PidFile, p.Pattern, p.User, err.Error())
|
p.Exe, p.PidFile, p.Pattern, p.User, err.Error())
|
||||||
} else {
|
} else {
|
||||||
for pid, proc := range p.pidmap {
|
for _, pid := range pids {
|
||||||
if p.PidTag {
|
if p.PidTag {
|
||||||
p.tagmap[pid]["pid"] = fmt.Sprint(pid)
|
p.tagmap[pid]["pid"] = fmt.Sprint(pid)
|
||||||
}
|
}
|
||||||
p := NewSpecProcessor(p.ProcessName, p.Prefix, pid, acc, proc, p.tagmap[pid])
|
p := NewSpecProcessor(p.ProcessName, p.Prefix, pid, acc, p.tagmap[pid])
|
||||||
p.pushMetrics()
|
err := p.pushMetrics()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("E! Error: procstat: %s", err.Error())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Procstat) createProcesses() error {
|
|
||||||
var errstring string
|
|
||||||
var outerr error
|
|
||||||
|
|
||||||
pids, err := p.getAllPids()
|
|
||||||
if err != nil {
|
|
||||||
errstring += err.Error() + " "
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, pid := range pids {
|
|
||||||
_, ok := p.pidmap[pid]
|
|
||||||
if !ok {
|
|
||||||
proc, err := process.NewProcess(pid)
|
|
||||||
if err == nil {
|
|
||||||
p.pidmap[pid] = proc
|
|
||||||
} else {
|
|
||||||
errstring += err.Error() + " "
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if errstring != "" {
|
|
||||||
outerr = fmt.Errorf("%s", errstring)
|
|
||||||
}
|
|
||||||
|
|
||||||
return outerr
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Procstat) getAllPids() ([]int32, error) {
|
func (p *Procstat) getAllPids() ([]int32, error) {
|
||||||
var pids []int32
|
var pids []int32
|
||||||
var err error
|
var err error
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/shirou/gopsutil/process"
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
@ -24,7 +23,6 @@ func TestGather(t *testing.T) {
|
||||||
p := Procstat{
|
p := Procstat{
|
||||||
PidFile: file.Name(),
|
PidFile: file.Name(),
|
||||||
Prefix: "foo",
|
Prefix: "foo",
|
||||||
pidmap: make(map[int32]*process.Process),
|
|
||||||
tagmap: make(map[int32]map[string]string),
|
tagmap: make(map[int32]map[string]string),
|
||||||
}
|
}
|
||||||
p.Gather(&acc)
|
p.Gather(&acc)
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package procstat
|
package procstat
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/shirou/gopsutil/process"
|
"github.com/shirou/gopsutil/process"
|
||||||
|
@ -9,12 +10,13 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type SpecProcessor struct {
|
type SpecProcessor struct {
|
||||||
Prefix string
|
ProcessName string
|
||||||
pid int32
|
Prefix string
|
||||||
tags map[string]string
|
pid int32
|
||||||
fields map[string]interface{}
|
tags map[string]string
|
||||||
acc telegraf.Accumulator
|
fields map[string]interface{}
|
||||||
proc *process.Process
|
acc telegraf.Accumulator
|
||||||
|
proc *process.Process
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSpecProcessor(
|
func NewSpecProcessor(
|
||||||
|
@ -22,29 +24,35 @@ func NewSpecProcessor(
|
||||||
prefix string,
|
prefix string,
|
||||||
pid int32,
|
pid int32,
|
||||||
acc telegraf.Accumulator,
|
acc telegraf.Accumulator,
|
||||||
p *process.Process,
|
|
||||||
tags map[string]string,
|
tags map[string]string,
|
||||||
) *SpecProcessor {
|
) *SpecProcessor {
|
||||||
if processName != "" {
|
|
||||||
tags["process_name"] = processName
|
|
||||||
} else {
|
|
||||||
name, err := p.Name()
|
|
||||||
if err == nil {
|
|
||||||
tags["process_name"] = name
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return &SpecProcessor{
|
return &SpecProcessor{
|
||||||
Prefix: prefix,
|
ProcessName: processName,
|
||||||
pid: pid,
|
Prefix: prefix,
|
||||||
tags: tags,
|
pid: pid,
|
||||||
fields: make(map[string]interface{}),
|
tags: tags,
|
||||||
acc: acc,
|
fields: make(map[string]interface{}),
|
||||||
proc: p,
|
acc: acc,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SpecProcessor) pushMetrics() {
|
func (p *SpecProcessor) pushMetrics() error {
|
||||||
var prefix string
|
var prefix string
|
||||||
|
proc, err := process.NewProcess(p.pid)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to open process with pid '%d'. Error: '%s'",
|
||||||
|
p.pid, err)
|
||||||
|
}
|
||||||
|
p.proc = proc
|
||||||
|
if p.ProcessName != "" {
|
||||||
|
p.tags["process_name"] = p.ProcessName
|
||||||
|
} else {
|
||||||
|
name, err := p.proc.Name()
|
||||||
|
if err == nil {
|
||||||
|
p.tags["process_name"] = name
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if p.Prefix != "" {
|
if p.Prefix != "" {
|
||||||
prefix = p.Prefix + "_"
|
prefix = p.Prefix + "_"
|
||||||
}
|
}
|
||||||
|
@ -107,4 +115,5 @@ func (p *SpecProcessor) pushMetrics() {
|
||||||
}
|
}
|
||||||
|
|
||||||
p.acc.AddFields("procstat", fields, p.tags)
|
p.acc.AddFields("procstat", fields, p.tags)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue