add sysstat plugin
This commit is contained in:
315
plugins/inputs/sysstat/sysstat.go
Normal file
315
plugins/inputs/sysstat/sysstat.go
Normal file
@@ -0,0 +1,315 @@
|
||||
// build +linux
|
||||
|
||||
package sysstat
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/csv"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
var (
|
||||
execCommand = exec.Command // execCommand is used to mock commands in tests.
|
||||
dfltActivities = []string{"DISK"}
|
||||
)
|
||||
|
||||
const parseInterval = 1 // parseInterval is the interval (in seconds) where the parsing takes place.
|
||||
|
||||
type Sysstat struct {
|
||||
// Interval that defines how long data is collected by Sadc cmd.
|
||||
//
|
||||
// This value has to be the same as the thelegraf collection interval.
|
||||
Interval int `toml:"collect_interval"`
|
||||
|
||||
// Sadc represents the path to the sadc collector utility.
|
||||
Sadc string `toml:"sadc_path"`
|
||||
|
||||
// Sadf represents the path to the sadf cmd.
|
||||
Sadf string `toml:"sadf_path"`
|
||||
|
||||
// Activities is a list of activities that are passed as argument to the
|
||||
// collector utility (e.g: DISK, SNMP etc...)
|
||||
// The more activities that are added, the more data is collected.
|
||||
Activities []string
|
||||
|
||||
// Options is a map of options.
|
||||
//
|
||||
// The key represents the actual option that the Sadf command is called with and
|
||||
// the value represents the description for that option.
|
||||
//
|
||||
// For example, if you have the following options map:
|
||||
// map[string]string{"-C": "cpu", "-d": "disk"}
|
||||
// The Sadf command is run with the options -C and -d to extract cpu and
|
||||
// disk metrics from the collected binary file.
|
||||
//
|
||||
// If Group is false (see below), each metric will be prefixed with the corresponding description
|
||||
// and represents itself a measurement.
|
||||
//
|
||||
// If Group is true, metrics are grouped to a single measurement with the corresponding description as name.
|
||||
Options map[string]string
|
||||
|
||||
// Group determines if metrics are grouped or not.
|
||||
Group bool
|
||||
|
||||
// DeviceTags adds the possibility to add additional tags for devices.
|
||||
DeviceTags map[string][]map[string]string `toml:"device_tags"`
|
||||
tmpFile string
|
||||
}
|
||||
|
||||
func (*Sysstat) Description() string {
|
||||
return "Sysstat metrics collector"
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
## Collect interval in seconds. This value has to be equal
|
||||
## to the telegraf collect interval.
|
||||
collect_interval = 5 # required
|
||||
#
|
||||
#
|
||||
## Path to the sadc command.
|
||||
sadc_path = "/usr/lib/sa/sadc" # required
|
||||
#
|
||||
#
|
||||
## Path to the sadf command, if it is not in PATH
|
||||
# sadf_path = "/usr/bin/sadf"
|
||||
#
|
||||
#
|
||||
## Activities is a list of activities, that are passed as argument to the
|
||||
## sadc collector utility (e.g: DISK, SNMP etc...)
|
||||
## The more activities that are added, the more data is collected.
|
||||
# activities = ["DISK"]
|
||||
#
|
||||
#
|
||||
## Group metrics to measurements.
|
||||
##
|
||||
## If group is false each metric will be prefixed with a description
|
||||
## and represents itself a measurement.
|
||||
##
|
||||
## If Group is true, corresponding metrics are grouped to a single measurement.
|
||||
# group = false
|
||||
#
|
||||
#
|
||||
## Options for the sasf command. The values on the left represent the sadf options and
|
||||
## the values on the right their description (wich are used for grouping and prefixing metrics).
|
||||
[inputs.sysstat.options]
|
||||
-C = "cpu"
|
||||
-B = "paging"
|
||||
-b = "io"
|
||||
-d = "disk" # requires DISK activity
|
||||
-H = "hugepages"
|
||||
"-I ALL" = "interrupts" # requires INT activity
|
||||
"-n ALL" = "network"
|
||||
"-P ALL" = "per_cpu"
|
||||
-q = "queue"
|
||||
-R = "mem"
|
||||
"-r ALL" = "mem_util"
|
||||
-S = "swap_util"
|
||||
-u = "cpu_util"
|
||||
-v = "inode"
|
||||
-W = "swap"
|
||||
-w = "task"
|
||||
#
|
||||
#
|
||||
## Device tags can be used to add additional tags for devices. For example the configuration below
|
||||
## adds a tag vg with value rootvg for all metrics with sda devices.
|
||||
# [[inputs.sysstat.device_tags.sda]]
|
||||
# vg = "rootvg"
|
||||
`
|
||||
|
||||
func (*Sysstat) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
func (s *Sysstat) Gather(acc telegraf.Accumulator) error {
|
||||
ts := time.Now().Add(time.Duration(s.Interval) * time.Second)
|
||||
if err := s.collect(); err != nil {
|
||||
return err
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
errorChannel := make(chan error, len(s.Options)*2)
|
||||
for option := range s.Options {
|
||||
wg.Add(1)
|
||||
go func(acc telegraf.Accumulator, option string) {
|
||||
defer wg.Done()
|
||||
if err := s.parse(acc, option, ts); err != nil {
|
||||
errorChannel <- err
|
||||
}
|
||||
}(acc, option)
|
||||
}
|
||||
wg.Wait()
|
||||
close(errorChannel)
|
||||
|
||||
errorStrings := []string{}
|
||||
for err := range errorChannel {
|
||||
errorStrings = append(errorStrings, err.Error())
|
||||
}
|
||||
|
||||
if _, err := os.Stat(s.tmpFile); err == nil {
|
||||
if err := os.Remove(s.tmpFile); err != nil {
|
||||
errorStrings = append(errorStrings, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
if len(errorStrings) == 0 {
|
||||
return nil
|
||||
}
|
||||
return errors.New(strings.Join(errorStrings, "\n"))
|
||||
}
|
||||
|
||||
// collect collects sysstat data with the collector utility sadc. It runs the following command:
|
||||
// Sadc -S <Activity1> -S <Activity2> ... <collectInterval> 2 tmpFile
|
||||
// The above command collects system metrics during <collectInterval> and saves it in binary form to tmpFile.
|
||||
func (s *Sysstat) collect() error {
|
||||
if len(s.Activities) == 0 {
|
||||
s.Activities = dfltActivities
|
||||
}
|
||||
options := []string{}
|
||||
for _, act := range s.Activities {
|
||||
options = append(options, "-S", act)
|
||||
}
|
||||
s.tmpFile = path.Join("/tmp", fmt.Sprintf("sysstat-%d", time.Now().Unix()))
|
||||
collectInterval := s.Interval - parseInterval // collectInterval has to be smaller than the telegraf data collection interval
|
||||
options = append(options, strconv.Itoa(collectInterval), "2", s.tmpFile)
|
||||
cmd := execCommand(s.Sadc, options...)
|
||||
out, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to run command %s: %s", strings.Join(cmd.Args, " "), string(out))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// parse runs Sadf on the previously saved tmpFile:
|
||||
// Sadf -p -- -p <option> tmpFile
|
||||
// and parses the output to add it to the telegraf.Accumulator acc.
|
||||
func (s *Sysstat) parse(acc telegraf.Accumulator, option string, ts time.Time) error {
|
||||
if len(s.Sadf) == 0 {
|
||||
sadf, err := exec.LookPath("sadf")
|
||||
if err != nil {
|
||||
return errors.New("sadf not in $PATH, configure path to sadf")
|
||||
}
|
||||
s.Sadf = sadf
|
||||
}
|
||||
cmd := execCommand(s.Sadf, s.sadfOptions(option)...)
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := cmd.Start(); err != nil {
|
||||
return fmt.Errorf("running command '%s' failed: %s", strings.Join(cmd.Args, " "), err)
|
||||
}
|
||||
|
||||
r := bufio.NewReader(stdout)
|
||||
csv := csv.NewReader(r)
|
||||
csv.Comma = '\t'
|
||||
csv.FieldsPerRecord = 6
|
||||
var measurement string
|
||||
// groupData to accumulate data when Group=true
|
||||
type groupData struct {
|
||||
tags map[string]string
|
||||
fields map[string]interface{}
|
||||
}
|
||||
m := make(map[string]groupData)
|
||||
for {
|
||||
record, err := csv.Read()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
device := record[3]
|
||||
value, err := strconv.ParseFloat(record[5], 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tags := map[string]string{}
|
||||
if device != "-" {
|
||||
tags["device"] = device
|
||||
if addTags, ok := s.DeviceTags[device]; ok {
|
||||
for _, tag := range addTags {
|
||||
for k, v := range tag {
|
||||
tags[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
if s.Group {
|
||||
measurement = s.Options[option]
|
||||
if _, ok := m[device]; !ok {
|
||||
m[device] = groupData{
|
||||
fields: make(map[string]interface{}),
|
||||
tags: make(map[string]string),
|
||||
}
|
||||
}
|
||||
g, _ := m[device]
|
||||
if len(g.tags) == 0 {
|
||||
for k, v := range tags {
|
||||
g.tags[k] = v
|
||||
}
|
||||
}
|
||||
g.fields[escape(record[4])] = value
|
||||
} else {
|
||||
measurement = s.Options[option] + "_" + escape(record[4])
|
||||
fields := map[string]interface{}{
|
||||
"value": value,
|
||||
}
|
||||
acc.AddFields(measurement, fields, tags, ts)
|
||||
}
|
||||
|
||||
}
|
||||
if s.Group {
|
||||
for _, v := range m {
|
||||
acc.AddFields(measurement, v.fields, v.tags, ts)
|
||||
}
|
||||
}
|
||||
if err := cmd.Wait(); err != nil {
|
||||
return fmt.Errorf("command %s failed with %s", strings.Join(cmd.Args, " "), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// sadfOptions creates the correct options for the sadf utility.
|
||||
func (s *Sysstat) sadfOptions(activityOption string) []string {
|
||||
options := []string{
|
||||
"-p",
|
||||
"--",
|
||||
"-p",
|
||||
}
|
||||
|
||||
opts := strings.Split(activityOption, " ")
|
||||
options = append(options, opts...)
|
||||
options = append(options, s.tmpFile)
|
||||
|
||||
return options
|
||||
}
|
||||
|
||||
// escape removes % and / chars in field names
|
||||
func escape(dirty string) string {
|
||||
var fieldEscaper = strings.NewReplacer(
|
||||
`%`, "pct_",
|
||||
`/`, "_per_",
|
||||
)
|
||||
return fieldEscaper.Replace(dirty)
|
||||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("sysstat", func() telegraf.Input {
|
||||
return &Sysstat{}
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user