From 8abf8c10a7e75bab3195b86fda8f84061de90398 Mon Sep 17 00:00:00 2001 From: frroberts Date: Tue, 7 May 2019 02:57:01 +0300 Subject: [PATCH] Fix only one job per storage target reported in lustre2 input (#5771) --- plugins/inputs/lustre2/lustre2.go | 100 ++++++++++++++----------- plugins/inputs/lustre2/lustre2_test.go | 96 ++++++++++++++++++++++-- 2 files changed, 148 insertions(+), 48 deletions(-) diff --git a/plugins/inputs/lustre2/lustre2.go b/plugins/inputs/lustre2/lustre2.go index 8ef9223b5..4ccb90115 100644 --- a/plugins/inputs/lustre2/lustre2.go +++ b/plugins/inputs/lustre2/lustre2.go @@ -9,23 +9,27 @@ for HPC environments. It stores statistics about its activity in package lustre2 import ( + "io/ioutil" "path/filepath" "strconv" "strings" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" ) +type tags struct { + name, job string +} + // Lustre proc files can change between versions, so we want to future-proof // by letting people choose what to look at. type Lustre2 struct { - Ost_procfiles []string - Mds_procfiles []string + Ost_procfiles []string `toml:"ost_jobstat"` + Mds_procfiles []string `toml:"mds_jobstat"` // allFields maps and OST name to the metric fields associated with that OST - allFields map[string]map[string]interface{} + allFields map[tags]map[string]interface{} } var sampleConfig = ` @@ -353,7 +357,7 @@ var wanted_mdt_jobstats_fields = []*mapping{ }, } -func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping, acc telegraf.Accumulator) error { +func (l *Lustre2) GetLustreProcStats(fileglob string, wantedFields []*mapping, acc telegraf.Accumulator) error { files, err := filepath.Glob(fileglob) if err != nil { return err @@ -367,43 +371,56 @@ func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping, */ path := strings.Split(file, "/") name := path[len(path)-2] - var fields map[string]interface{} - fields, ok := l.allFields[name] - if !ok { - fields = make(map[string]interface{}) - l.allFields[name] = fields - } - lines, err := internal.ReadLines(file) + //lines, err := internal.ReadLines(file) + wholeFile, err := ioutil.ReadFile(file) if err != nil { return err } + jobs := strings.Split(string(wholeFile), "-") + for _, job := range jobs { + lines := strings.Split(string(job), "\n") + jobid := "" - for _, line := range lines { - parts := strings.Fields(line) - if strings.HasPrefix(line, "- job_id:") { - // Set the job_id explicitly if present - fields["jobid"] = parts[2] + // figure out if the data should be tagged with job_id here + parts := strings.Fields(lines[0]) + if strings.TrimSuffix(parts[0], ":") == "job_id" { + jobid = parts[1] } - for _, wanted := range wanted_fields { - var data uint64 - if strings.TrimSuffix(parts[0], ":") == wanted.inProc { - wanted_field := wanted.field - // if not set, assume field[1]. Shouldn't be field[0], as - // that's a string - if wanted_field == 0 { - wanted_field = 1 + for _, line := range lines { + // skip any empty lines + if len(line) < 1 { + continue + } + parts := strings.Fields(line) + + var fields map[string]interface{} + fields, ok := l.allFields[tags{name, jobid}] + if !ok { + fields = make(map[string]interface{}) + l.allFields[tags{name, jobid}] = fields + } + + for _, wanted := range wantedFields { + var data uint64 + if strings.TrimSuffix(parts[0], ":") == wanted.inProc { + wantedField := wanted.field + // if not set, assume field[1]. Shouldn't be field[0], as + // that's a string + if wantedField == 0 { + wantedField = 1 + } + data, err = strconv.ParseUint(strings.TrimSuffix((parts[wantedField]), ","), 10, 64) + if err != nil { + return err + } + reportName := wanted.inProc + if wanted.reportAs != "" { + reportName = wanted.reportAs + } + fields[reportName] = data } - data, err = strconv.ParseUint(strings.TrimSuffix((parts[wanted_field]), ","), 10, 64) - if err != nil { - return err - } - report_name := wanted.inProc - if wanted.reportAs != "" { - report_name = wanted.reportAs - } - fields[report_name] = data } } } @@ -423,7 +440,8 @@ func (l *Lustre2) Description() string { // Gather reads stats from all lustre targets func (l *Lustre2) Gather(acc telegraf.Accumulator) error { - l.allFields = make(map[string]map[string]interface{}) + //l.allFields = make(map[string]map[string]interface{}) + l.allFields = make(map[tags]map[string]interface{}) if len(l.Ost_procfiles) == 0 { // read/write bytes are in obdfilter//stats @@ -483,15 +501,13 @@ func (l *Lustre2) Gather(acc telegraf.Accumulator) error { } } - for name, fields := range l.allFields { + for tgs, fields := range l.allFields { + tags := map[string]string{ - "name": name, + "name": tgs.name, } - if _, ok := fields["jobid"]; ok { - if jobid, ok := fields["jobid"].(string); ok { - tags["jobid"] = jobid - } - delete(fields, "jobid") + if len(tgs.job) > 0 { + tags["jobid"] = tgs.job } acc.AddFields("lustre2", fields, tags) } diff --git a/plugins/inputs/lustre2/lustre2_test.go b/plugins/inputs/lustre2/lustre2_test.go index 5cc9c0e43..6d0fd61f5 100644 --- a/plugins/inputs/lustre2/lustre2_test.go +++ b/plugins/inputs/lustre2/lustre2_test.go @@ -53,6 +53,20 @@ const obdfilterJobStatsContents = `job_stats: get_info: { samples: 0, unit: reqs } set_info: { samples: 0, unit: reqs } quotactl: { samples: 0, unit: reqs } +- job_id: testjob2 + snapshot_time: 1461772761 + read_bytes: { samples: 1, unit: bytes, min: 1024, max: 1024, sum: 1024 } + write_bytes: { samples: 25, unit: bytes, min: 2048, max: 2048, sum: 51200 } + getattr: { samples: 0, unit: reqs } + setattr: { samples: 0, unit: reqs } + punch: { samples: 1, unit: reqs } + sync: { samples: 0, unit: reqs } + destroy: { samples: 0, unit: reqs } + create: { samples: 0, unit: reqs } + statfs: { samples: 0, unit: reqs } + get_info: { samples: 0, unit: reqs } + set_info: { samples: 0, unit: reqs } + quotactl: { samples: 0, unit: reqs } ` const mdtProcContents = `snapshot_time 1438693238.20113 secs.usecs @@ -93,6 +107,24 @@ const mdtJobStatsContents = `job_stats: sync: { samples: 2, unit: reqs } samedir_rename: { samples: 705, unit: reqs } crossdir_rename: { samples: 200, unit: reqs } +- job_id: testjob2 + snapshot_time: 1461772761 + open: { samples: 6, unit: reqs } + close: { samples: 7, unit: reqs } + mknod: { samples: 8, unit: reqs } + link: { samples: 9, unit: reqs } + unlink: { samples: 20, unit: reqs } + mkdir: { samples: 200, unit: reqs } + rmdir: { samples: 210, unit: reqs } + rename: { samples: 8, unit: reqs } + getattr: { samples: 10, unit: reqs } + setattr: { samples: 2, unit: reqs } + getxattr: { samples: 4, unit: reqs } + setxattr: { samples: 5, unit: reqs } + statfs: { samples: 1207, unit: reqs } + sync: { samples: 3, unit: reqs } + samedir_rename: { samples: 706, unit: reqs } + crossdir_rename: { samples: 201, unit: reqs } ` func TestLustre2GeneratesMetrics(t *testing.T) { @@ -172,7 +204,7 @@ func TestLustre2GeneratesJobstatsMetrics(t *testing.T) { tempdir := os.TempDir() + "/telegraf/proc/fs/lustre/" ost_name := "OST0001" - job_name := "testjob1" + job_names := []string{"testjob1", "testjob2"} mdtdir := tempdir + "/mdt/" err := os.MkdirAll(mdtdir+"/"+ost_name, 0755) @@ -199,12 +231,23 @@ func TestLustre2GeneratesJobstatsMetrics(t *testing.T) { err = m.Gather(&acc) require.NoError(t, err) - tags := map[string]string{ - "name": ost_name, - "jobid": job_name, + // make this two tags + // and even further make this dependent on summing per OST + tags := []map[string]string{ + { + "name": ost_name, + "jobid": job_names[0], + }, + { + "name": ost_name, + "jobid": job_names[1], + }, } - fields := map[string]interface{}{ + // make this for two tags + var fields []map[string]interface{} + + fields = append(fields, map[string]interface{}{ "jobstats_read_calls": uint64(1), "jobstats_read_min_size": uint64(4096), "jobstats_read_max_size": uint64(4096), @@ -239,9 +282,50 @@ func TestLustre2GeneratesJobstatsMetrics(t *testing.T) { "jobstats_sync": uint64(2), "jobstats_samedir_rename": uint64(705), "jobstats_crossdir_rename": uint64(200), + }) + + fields = append(fields, map[string]interface{}{ + "jobstats_read_calls": uint64(1), + "jobstats_read_min_size": uint64(1024), + "jobstats_read_max_size": uint64(1024), + "jobstats_read_bytes": uint64(1024), + "jobstats_write_calls": uint64(25), + "jobstats_write_min_size": uint64(2048), + "jobstats_write_max_size": uint64(2048), + "jobstats_write_bytes": uint64(51200), + "jobstats_ost_getattr": uint64(0), + "jobstats_ost_setattr": uint64(0), + "jobstats_punch": uint64(1), + "jobstats_ost_sync": uint64(0), + "jobstats_destroy": uint64(0), + "jobstats_create": uint64(0), + "jobstats_ost_statfs": uint64(0), + "jobstats_get_info": uint64(0), + "jobstats_set_info": uint64(0), + "jobstats_quotactl": uint64(0), + "jobstats_open": uint64(6), + "jobstats_close": uint64(7), + "jobstats_mknod": uint64(8), + "jobstats_link": uint64(9), + "jobstats_unlink": uint64(20), + "jobstats_mkdir": uint64(200), + "jobstats_rmdir": uint64(210), + "jobstats_rename": uint64(8), + "jobstats_getattr": uint64(10), + "jobstats_setattr": uint64(2), + "jobstats_getxattr": uint64(4), + "jobstats_setxattr": uint64(5), + "jobstats_statfs": uint64(1207), + "jobstats_sync": uint64(3), + "jobstats_samedir_rename": uint64(706), + "jobstats_crossdir_rename": uint64(201), + }) + + for index := 0; index < len(fields); index++ { + acc.AssertContainsTaggedFields(t, "lustre2", fields[index], tags[index]) } - acc.AssertContainsTaggedFields(t, "lustre2", fields, tags) + // run this over both tags err = os.RemoveAll(os.TempDir() + "/telegraf") require.NoError(t, err)