Fix only one job per storage target reported in lustre2 input (#5771)

This commit is contained in:
frroberts 2019-05-07 02:57:01 +03:00 committed by Daniel Nelson
parent 1e1fa1a580
commit 8abf8c10a7
2 changed files with 148 additions and 48 deletions

View File

@ -9,23 +9,27 @@ for HPC environments. It stores statistics about its activity in
package lustre2
import (
"io/ioutil"
"path/filepath"
"strconv"
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)
type tags struct {
name, job string
}
// Lustre proc files can change between versions, so we want to future-proof
// by letting people choose what to look at.
type Lustre2 struct {
Ost_procfiles []string
Mds_procfiles []string
Ost_procfiles []string `toml:"ost_jobstat"`
Mds_procfiles []string `toml:"mds_jobstat"`
// allFields maps and OST name to the metric fields associated with that OST
allFields map[string]map[string]interface{}
allFields map[tags]map[string]interface{}
}
var sampleConfig = `
@ -353,7 +357,7 @@ var wanted_mdt_jobstats_fields = []*mapping{
},
}
func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping, acc telegraf.Accumulator) error {
func (l *Lustre2) GetLustreProcStats(fileglob string, wantedFields []*mapping, acc telegraf.Accumulator) error {
files, err := filepath.Glob(fileglob)
if err != nil {
return err
@ -367,43 +371,56 @@ func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping,
*/
path := strings.Split(file, "/")
name := path[len(path)-2]
var fields map[string]interface{}
fields, ok := l.allFields[name]
if !ok {
fields = make(map[string]interface{})
l.allFields[name] = fields
}
lines, err := internal.ReadLines(file)
//lines, err := internal.ReadLines(file)
wholeFile, err := ioutil.ReadFile(file)
if err != nil {
return err
}
jobs := strings.Split(string(wholeFile), "-")
for _, job := range jobs {
lines := strings.Split(string(job), "\n")
jobid := ""
for _, line := range lines {
parts := strings.Fields(line)
if strings.HasPrefix(line, "- job_id:") {
// Set the job_id explicitly if present
fields["jobid"] = parts[2]
// figure out if the data should be tagged with job_id here
parts := strings.Fields(lines[0])
if strings.TrimSuffix(parts[0], ":") == "job_id" {
jobid = parts[1]
}
for _, wanted := range wanted_fields {
var data uint64
if strings.TrimSuffix(parts[0], ":") == wanted.inProc {
wanted_field := wanted.field
// if not set, assume field[1]. Shouldn't be field[0], as
// that's a string
if wanted_field == 0 {
wanted_field = 1
for _, line := range lines {
// skip any empty lines
if len(line) < 1 {
continue
}
parts := strings.Fields(line)
var fields map[string]interface{}
fields, ok := l.allFields[tags{name, jobid}]
if !ok {
fields = make(map[string]interface{})
l.allFields[tags{name, jobid}] = fields
}
for _, wanted := range wantedFields {
var data uint64
if strings.TrimSuffix(parts[0], ":") == wanted.inProc {
wantedField := wanted.field
// if not set, assume field[1]. Shouldn't be field[0], as
// that's a string
if wantedField == 0 {
wantedField = 1
}
data, err = strconv.ParseUint(strings.TrimSuffix((parts[wantedField]), ","), 10, 64)
if err != nil {
return err
}
reportName := wanted.inProc
if wanted.reportAs != "" {
reportName = wanted.reportAs
}
fields[reportName] = data
}
data, err = strconv.ParseUint(strings.TrimSuffix((parts[wanted_field]), ","), 10, 64)
if err != nil {
return err
}
report_name := wanted.inProc
if wanted.reportAs != "" {
report_name = wanted.reportAs
}
fields[report_name] = data
}
}
}
@ -423,7 +440,8 @@ func (l *Lustre2) Description() string {
// Gather reads stats from all lustre targets
func (l *Lustre2) Gather(acc telegraf.Accumulator) error {
l.allFields = make(map[string]map[string]interface{})
//l.allFields = make(map[string]map[string]interface{})
l.allFields = make(map[tags]map[string]interface{})
if len(l.Ost_procfiles) == 0 {
// read/write bytes are in obdfilter/<ost_name>/stats
@ -483,15 +501,13 @@ func (l *Lustre2) Gather(acc telegraf.Accumulator) error {
}
}
for name, fields := range l.allFields {
for tgs, fields := range l.allFields {
tags := map[string]string{
"name": name,
"name": tgs.name,
}
if _, ok := fields["jobid"]; ok {
if jobid, ok := fields["jobid"].(string); ok {
tags["jobid"] = jobid
}
delete(fields, "jobid")
if len(tgs.job) > 0 {
tags["jobid"] = tgs.job
}
acc.AddFields("lustre2", fields, tags)
}

View File

@ -53,6 +53,20 @@ const obdfilterJobStatsContents = `job_stats:
get_info: { samples: 0, unit: reqs }
set_info: { samples: 0, unit: reqs }
quotactl: { samples: 0, unit: reqs }
- job_id: testjob2
snapshot_time: 1461772761
read_bytes: { samples: 1, unit: bytes, min: 1024, max: 1024, sum: 1024 }
write_bytes: { samples: 25, unit: bytes, min: 2048, max: 2048, sum: 51200 }
getattr: { samples: 0, unit: reqs }
setattr: { samples: 0, unit: reqs }
punch: { samples: 1, unit: reqs }
sync: { samples: 0, unit: reqs }
destroy: { samples: 0, unit: reqs }
create: { samples: 0, unit: reqs }
statfs: { samples: 0, unit: reqs }
get_info: { samples: 0, unit: reqs }
set_info: { samples: 0, unit: reqs }
quotactl: { samples: 0, unit: reqs }
`
const mdtProcContents = `snapshot_time 1438693238.20113 secs.usecs
@ -93,6 +107,24 @@ const mdtJobStatsContents = `job_stats:
sync: { samples: 2, unit: reqs }
samedir_rename: { samples: 705, unit: reqs }
crossdir_rename: { samples: 200, unit: reqs }
- job_id: testjob2
snapshot_time: 1461772761
open: { samples: 6, unit: reqs }
close: { samples: 7, unit: reqs }
mknod: { samples: 8, unit: reqs }
link: { samples: 9, unit: reqs }
unlink: { samples: 20, unit: reqs }
mkdir: { samples: 200, unit: reqs }
rmdir: { samples: 210, unit: reqs }
rename: { samples: 8, unit: reqs }
getattr: { samples: 10, unit: reqs }
setattr: { samples: 2, unit: reqs }
getxattr: { samples: 4, unit: reqs }
setxattr: { samples: 5, unit: reqs }
statfs: { samples: 1207, unit: reqs }
sync: { samples: 3, unit: reqs }
samedir_rename: { samples: 706, unit: reqs }
crossdir_rename: { samples: 201, unit: reqs }
`
func TestLustre2GeneratesMetrics(t *testing.T) {
@ -172,7 +204,7 @@ func TestLustre2GeneratesJobstatsMetrics(t *testing.T) {
tempdir := os.TempDir() + "/telegraf/proc/fs/lustre/"
ost_name := "OST0001"
job_name := "testjob1"
job_names := []string{"testjob1", "testjob2"}
mdtdir := tempdir + "/mdt/"
err := os.MkdirAll(mdtdir+"/"+ost_name, 0755)
@ -199,12 +231,23 @@ func TestLustre2GeneratesJobstatsMetrics(t *testing.T) {
err = m.Gather(&acc)
require.NoError(t, err)
tags := map[string]string{
"name": ost_name,
"jobid": job_name,
// make this two tags
// and even further make this dependent on summing per OST
tags := []map[string]string{
{
"name": ost_name,
"jobid": job_names[0],
},
{
"name": ost_name,
"jobid": job_names[1],
},
}
fields := map[string]interface{}{
// make this for two tags
var fields []map[string]interface{}
fields = append(fields, map[string]interface{}{
"jobstats_read_calls": uint64(1),
"jobstats_read_min_size": uint64(4096),
"jobstats_read_max_size": uint64(4096),
@ -239,9 +282,50 @@ func TestLustre2GeneratesJobstatsMetrics(t *testing.T) {
"jobstats_sync": uint64(2),
"jobstats_samedir_rename": uint64(705),
"jobstats_crossdir_rename": uint64(200),
})
fields = append(fields, map[string]interface{}{
"jobstats_read_calls": uint64(1),
"jobstats_read_min_size": uint64(1024),
"jobstats_read_max_size": uint64(1024),
"jobstats_read_bytes": uint64(1024),
"jobstats_write_calls": uint64(25),
"jobstats_write_min_size": uint64(2048),
"jobstats_write_max_size": uint64(2048),
"jobstats_write_bytes": uint64(51200),
"jobstats_ost_getattr": uint64(0),
"jobstats_ost_setattr": uint64(0),
"jobstats_punch": uint64(1),
"jobstats_ost_sync": uint64(0),
"jobstats_destroy": uint64(0),
"jobstats_create": uint64(0),
"jobstats_ost_statfs": uint64(0),
"jobstats_get_info": uint64(0),
"jobstats_set_info": uint64(0),
"jobstats_quotactl": uint64(0),
"jobstats_open": uint64(6),
"jobstats_close": uint64(7),
"jobstats_mknod": uint64(8),
"jobstats_link": uint64(9),
"jobstats_unlink": uint64(20),
"jobstats_mkdir": uint64(200),
"jobstats_rmdir": uint64(210),
"jobstats_rename": uint64(8),
"jobstats_getattr": uint64(10),
"jobstats_setattr": uint64(2),
"jobstats_getxattr": uint64(4),
"jobstats_setxattr": uint64(5),
"jobstats_statfs": uint64(1207),
"jobstats_sync": uint64(3),
"jobstats_samedir_rename": uint64(706),
"jobstats_crossdir_rename": uint64(201),
})
for index := 0; index < len(fields); index++ {
acc.AssertContainsTaggedFields(t, "lustre2", fields[index], tags[index])
}
acc.AssertContainsTaggedFields(t, "lustre2", fields, tags)
// run this over both tags
err = os.RemoveAll(os.TempDir() + "/telegraf")
require.NoError(t, err)