// build +linux package sysstat import ( "bufio" "encoding/csv" "errors" "fmt" "io" "os" "os/exec" "path" "strconv" "strings" "sync" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) var ( firstTimestamp time.Time execCommand = exec.Command // execCommand is used to mock commands in tests. dfltActivities = []string{"DISK"} ) const parseInterval = 1 // parseInterval is the interval (in seconds) where the parsing of the binary file takes place. type Sysstat struct { // Sadc represents the path to the sadc collector utility. Sadc string `toml:"sadc_path"` // Sadf represents the path to the sadf cmd. Sadf string `toml:"sadf_path"` // Activities is a list of activities that are passed as argument to the // collector utility (e.g: DISK, SNMP etc...) // The more activities that are added, the more data is collected. Activities []string // Options is a map of options. // // The key represents the actual option that the Sadf command is called with and // the value represents the description for that option. // // For example, if you have the following options map: // map[string]string{"-C": "cpu", "-d": "disk"} // The Sadf command is run with the options -C and -d to extract cpu and // disk metrics from the collected binary file. // // If Group is false (see below), each metric will be prefixed with the corresponding description // and represents itself a measurement. // // If Group is true, metrics are grouped to a single measurement with the corresponding description as name. Options map[string]string // Group determines if metrics are grouped or not. Group bool // DeviceTags adds the possibility to add additional tags for devices. DeviceTags map[string][]map[string]string `toml:"device_tags"` tmpFile string interval int } func (*Sysstat) Description() string { return "Sysstat metrics collector" } var sampleConfig = ` ## Path to the sadc command. sadc_path = "/usr/lib/sa/sadc" # required # # ## Path to the sadf command, if it is not in PATH # sadf_path = "/usr/bin/sadf" # # ## Activities is a list of activities, that are passed as argument to the ## sadc collector utility (e.g: DISK, SNMP etc...) ## The more activities that are added, the more data is collected. # activities = ["DISK"] # # ## Group metrics to measurements. ## ## If group is false each metric will be prefixed with a description ## and represents itself a measurement. ## ## If Group is true, corresponding metrics are grouped to a single measurement. # group = false # # ## Options for the sadf command. The values on the left represent the sadf options and ## the values on the right their description (wich are used for grouping and prefixing metrics). [inputs.sysstat.options] -C = "cpu" -B = "paging" -b = "io" -d = "disk" # requires DISK activity -H = "hugepages" "-n ALL" = "network" "-P ALL" = "per_cpu" -q = "queue" -R = "mem" "-r ALL" = "mem_util" -S = "swap_util" -u = "cpu_util" -v = "inode" -W = "swap" -w = "task" # "-I ALL" = "interrupts" # requires INT activity # # ## Device tags can be used to add additional tags for devices. For example the configuration below ## adds a tag vg with value rootvg for all metrics with sda devices. # [[inputs.sysstat.device_tags.sda]] # vg = "rootvg" ` func (*Sysstat) SampleConfig() string { return sampleConfig } func (s *Sysstat) Gather(acc telegraf.Accumulator) error { if s.interval == 0 { if firstTimestamp.IsZero() { firstTimestamp = time.Now() } else { s.interval = int(time.Since(firstTimestamp).Seconds()) } } ts := time.Now().Add(time.Duration(s.interval) * time.Second) if err := s.collect(); err != nil { return err } var wg sync.WaitGroup errorChannel := make(chan error, len(s.Options)*2) for option := range s.Options { wg.Add(1) go func(acc telegraf.Accumulator, option string) { defer wg.Done() if err := s.parse(acc, option, ts); err != nil { errorChannel <- err } }(acc, option) } wg.Wait() close(errorChannel) errorStrings := []string{} for err := range errorChannel { errorStrings = append(errorStrings, err.Error()) } if _, err := os.Stat(s.tmpFile); err == nil { if err := os.Remove(s.tmpFile); err != nil { errorStrings = append(errorStrings, err.Error()) } } if len(errorStrings) == 0 { return nil } return errors.New(strings.Join(errorStrings, "\n")) } // collect collects sysstat data with the collector utility sadc. It runs the following command: // Sadc -S -S ... 2 tmpFile // The above command collects system metrics during and saves it in binary form to tmpFile. func (s *Sysstat) collect() error { if len(s.Activities) == 0 { s.Activities = dfltActivities } if len(s.Sadf) == 0 { sadf, err := exec.LookPath("sadf") if err != nil { return errors.New("sadf not in $PATH, configure path to sadf") } s.Sadf = sadf } options := []string{} for _, act := range s.Activities { options = append(options, "-S", act) } s.tmpFile = path.Join("/tmp", fmt.Sprintf("sysstat-%d", time.Now().Unix())) collectInterval := s.interval - parseInterval // collectInterval has to be smaller than the telegraf data collection interval if collectInterval < 0 { // If true, interval is not defined yet and Gather is run for the first time. collectInterval = 1 // In that case we only collect for 1 second. } options = append(options, strconv.Itoa(collectInterval), "2", s.tmpFile) cmd := execCommand(s.Sadc, options...) out, err := cmd.CombinedOutput() if err != nil { return fmt.Errorf("failed to run command %s: %s", strings.Join(cmd.Args, " "), string(out)) } return nil } // parse runs Sadf on the previously saved tmpFile: // Sadf -p -- -p