Adding Jobstats support to Lustre2 input plugin
Lustre Jobstats allows for RPCs to be tagged with a value, such as a job's ID. This allows for per job statistics. This plugin collects statistics and tags the data with the jobid.
This commit is contained in:
parent
1c4043ab39
commit
c8626b6455
|
@ -34,9 +34,13 @@ var sampleConfig = `
|
|||
##
|
||||
# ost_procfiles = [
|
||||
# "/proc/fs/lustre/obdfilter/*/stats",
|
||||
# "/proc/fs/lustre/osd-ldiskfs/*/stats"
|
||||
# "/proc/fs/lustre/osd-ldiskfs/*/stats",
|
||||
# "/proc/fs/lustre/obdfilter/*/job_stats",
|
||||
# ]
|
||||
# mds_procfiles = [
|
||||
# "/proc/fs/lustre/mdt/*/md_stats",
|
||||
# "/proc/fs/lustre/mdt/*/job_stats",
|
||||
# ]
|
||||
# mds_procfiles = ["/proc/fs/lustre/mdt/*/md_stats"]
|
||||
`
|
||||
|
||||
/* The wanted fields would be a []string if not for the
|
||||
|
@ -82,6 +86,139 @@ var wanted_ost_fields = []*mapping{
|
|||
},
|
||||
}
|
||||
|
||||
var wanted_ost_jobstats_fields = []*mapping{
|
||||
{ // The read line has several fields, so we need to differentiate what they are
|
||||
inProc: "read",
|
||||
field: 3,
|
||||
reportAs: "jobstats_read_calls",
|
||||
},
|
||||
{
|
||||
inProc: "read",
|
||||
field: 7,
|
||||
reportAs: "jobstats_read_min_size",
|
||||
},
|
||||
{
|
||||
inProc: "read",
|
||||
field: 9,
|
||||
reportAs: "jobstats_read_max_size",
|
||||
},
|
||||
{
|
||||
inProc: "read",
|
||||
field: 11,
|
||||
reportAs: "jobstats_read_bytes",
|
||||
},
|
||||
{ // Different inProc for newer versions
|
||||
inProc: "read_bytes",
|
||||
field: 3,
|
||||
reportAs: "jobstats_read_calls",
|
||||
},
|
||||
{
|
||||
inProc: "read_bytes",
|
||||
field: 7,
|
||||
reportAs: "jobstats_read_min_size",
|
||||
},
|
||||
{
|
||||
inProc: "read_bytes",
|
||||
field: 9,
|
||||
reportAs: "jobstats_read_max_size",
|
||||
},
|
||||
{
|
||||
inProc: "read_bytes",
|
||||
field: 11,
|
||||
reportAs: "jobstats_read_bytes",
|
||||
},
|
||||
{ // We need to do the same for the write fields
|
||||
inProc: "write",
|
||||
field: 3,
|
||||
reportAs: "jobstats_write_calls",
|
||||
},
|
||||
{
|
||||
inProc: "write",
|
||||
field: 7,
|
||||
reportAs: "jobstats_write_min_size",
|
||||
},
|
||||
{
|
||||
inProc: "write",
|
||||
field: 9,
|
||||
reportAs: "jobstats_write_max_size",
|
||||
},
|
||||
{
|
||||
inProc: "write",
|
||||
field: 11,
|
||||
reportAs: "jobstats_write_bytes",
|
||||
},
|
||||
{ // Different inProc for newer versions
|
||||
inProc: "write_bytes",
|
||||
field: 3,
|
||||
reportAs: "jobstats_write_calls",
|
||||
},
|
||||
{
|
||||
inProc: "write_bytes",
|
||||
field: 7,
|
||||
reportAs: "jobstats_write_min_size",
|
||||
},
|
||||
{
|
||||
inProc: "write_bytes",
|
||||
field: 9,
|
||||
reportAs: "jobstats_write_max_size",
|
||||
},
|
||||
{
|
||||
inProc: "write_bytes",
|
||||
field: 11,
|
||||
reportAs: "jobstats_write_bytes",
|
||||
},
|
||||
{
|
||||
inProc: "getattr",
|
||||
field: 3,
|
||||
reportAs: "jobstats_ost_getattr",
|
||||
},
|
||||
{
|
||||
inProc: "setattr",
|
||||
field: 3,
|
||||
reportAs: "jobstats_ost_setattr",
|
||||
},
|
||||
{
|
||||
inProc: "punch",
|
||||
field: 3,
|
||||
reportAs: "jobstats_punch",
|
||||
},
|
||||
{
|
||||
inProc: "sync",
|
||||
field: 3,
|
||||
reportAs: "jobstats_ost_sync",
|
||||
},
|
||||
{
|
||||
inProc: "destroy",
|
||||
field: 3,
|
||||
reportAs: "jobstats_destroy",
|
||||
},
|
||||
{
|
||||
inProc: "create",
|
||||
field: 3,
|
||||
reportAs: "jobstats_create",
|
||||
},
|
||||
{
|
||||
inProc: "statfs",
|
||||
field: 3,
|
||||
reportAs: "jobstats_ost_statfs",
|
||||
},
|
||||
{
|
||||
inProc: "get_info",
|
||||
field: 3,
|
||||
reportAs: "jobstats_get_info",
|
||||
},
|
||||
{
|
||||
inProc: "set_info",
|
||||
field: 3,
|
||||
reportAs: "jobstats_set_info",
|
||||
},
|
||||
{
|
||||
inProc: "quotactl",
|
||||
field: 3,
|
||||
reportAs: "jobstats_quotactl",
|
||||
},
|
||||
}
|
||||
|
||||
var wanted_mds_fields = []*mapping{
|
||||
{
|
||||
inProc: "open",
|
||||
|
@ -133,6 +270,89 @@ var wanted_mds_fields = []*mapping{
|
|||
},
|
||||
}
|
||||
|
||||
var wanted_mdt_jobstats_fields = []*mapping{
|
||||
{
|
||||
inProc: "open",
|
||||
field: 3,
|
||||
reportAs: "jobstats_open",
|
||||
},
|
||||
{
|
||||
inProc: "close",
|
||||
field: 3,
|
||||
reportAs: "jobstats_close",
|
||||
},
|
||||
{
|
||||
inProc: "mknod",
|
||||
field: 3,
|
||||
reportAs: "jobstats_mknod",
|
||||
},
|
||||
{
|
||||
inProc: "link",
|
||||
field: 3,
|
||||
reportAs: "jobstats_link",
|
||||
},
|
||||
{
|
||||
inProc: "unlink",
|
||||
field: 3,
|
||||
reportAs: "jobstats_unlink",
|
||||
},
|
||||
{
|
||||
inProc: "mkdir",
|
||||
field: 3,
|
||||
reportAs: "jobstats_mkdir",
|
||||
},
|
||||
{
|
||||
inProc: "rmdir",
|
||||
field: 3,
|
||||
reportAs: "jobstats_rmdir",
|
||||
},
|
||||
{
|
||||
inProc: "rename",
|
||||
field: 3,
|
||||
reportAs: "jobstats_rename",
|
||||
},
|
||||
{
|
||||
inProc: "getattr",
|
||||
field: 3,
|
||||
reportAs: "jobstats_getattr",
|
||||
},
|
||||
{
|
||||
inProc: "setattr",
|
||||
field: 3,
|
||||
reportAs: "jobstats_setattr",
|
||||
},
|
||||
{
|
||||
inProc: "getxattr",
|
||||
field: 3,
|
||||
reportAs: "jobstats_getxattr",
|
||||
},
|
||||
{
|
||||
inProc: "setxattr",
|
||||
field: 3,
|
||||
reportAs: "jobstats_setxattr",
|
||||
},
|
||||
{
|
||||
inProc: "statfs",
|
||||
field: 3,
|
||||
reportAs: "jobstats_statfs",
|
||||
},
|
||||
{
|
||||
inProc: "sync",
|
||||
field: 3,
|
||||
reportAs: "jobstats_sync",
|
||||
},
|
||||
{
|
||||
inProc: "samedir_rename",
|
||||
field: 3,
|
||||
reportAs: "jobstats_samedir_rename",
|
||||
},
|
||||
{
|
||||
inProc: "crossdir_rename",
|
||||
field: 3,
|
||||
reportAs: "jobstats_crossdir_rename",
|
||||
},
|
||||
}
|
||||
|
||||
func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping, acc telegraf.Accumulator) error {
|
||||
files, err := filepath.Glob(fileglob)
|
||||
if err != nil {
|
||||
|
@ -143,7 +363,7 @@ func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping,
|
|||
/* Turn /proc/fs/lustre/obdfilter/<ost_name>/stats and similar
|
||||
* into just the object store target name
|
||||
* Assumpion: the target name is always second to last,
|
||||
* which is true in Lustre 2.1->2.5
|
||||
* which is true in Lustre 2.1->2.8
|
||||
*/
|
||||
path := strings.Split(file, "/")
|
||||
name := path[len(path)-2]
|
||||
|
@ -161,16 +381,21 @@ func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping,
|
|||
|
||||
for _, line := range lines {
|
||||
parts := strings.Fields(line)
|
||||
if strings.HasPrefix(line, "- job_id:") {
|
||||
// Set the job_id explicitly if present
|
||||
fields["jobid"] = parts[2]
|
||||
}
|
||||
|
||||
for _, wanted := range wanted_fields {
|
||||
var data uint64
|
||||
if parts[0] == wanted.inProc {
|
||||
if strings.TrimSuffix(parts[0],":") == wanted.inProc {
|
||||
wanted_field := wanted.field
|
||||
// if not set, assume field[1]. Shouldn't be field[0], as
|
||||
// that's a string
|
||||
if wanted_field == 0 {
|
||||
wanted_field = 1
|
||||
}
|
||||
data, err = strconv.ParseUint((parts[wanted_field]), 10, 64)
|
||||
data, err = strconv.ParseUint(strings.TrimSuffix((parts[wanted_field]), ","), 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -213,6 +438,12 @@ func (l *Lustre2) Gather(acc telegraf.Accumulator) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// per job statistics are in obdfilter/<ost_name>/job_stats
|
||||
err = l.GetLustreProcStats("/proc/fs/lustre/obdfilter/*/job_stats",
|
||||
wanted_ost_jobstats_fields, acc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if len(l.Mds_procfiles) == 0 {
|
||||
|
@ -222,16 +453,31 @@ func (l *Lustre2) Gather(acc telegraf.Accumulator) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Metadata target job stats
|
||||
err = l.GetLustreProcStats("/proc/fs/lustre/mdt/*/job_stats",
|
||||
wanted_mdt_jobstats_fields, acc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, procfile := range l.Ost_procfiles {
|
||||
err := l.GetLustreProcStats(procfile, wanted_ost_fields, acc)
|
||||
ost_fields := wanted_ost_fields
|
||||
if strings.HasSuffix(procfile, "job_stats") {
|
||||
ost_fields = wanted_ost_jobstats_fields
|
||||
}
|
||||
err := l.GetLustreProcStats(procfile, ost_fields, acc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, procfile := range l.Mds_procfiles {
|
||||
err := l.GetLustreProcStats(procfile, wanted_mds_fields, acc)
|
||||
mdt_fields := wanted_mds_fields
|
||||
if strings.HasSuffix(procfile, "job_stats") {
|
||||
mdt_fields = wanted_mdt_jobstats_fields
|
||||
}
|
||||
err := l.GetLustreProcStats(procfile, mdt_fields, acc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -241,6 +487,12 @@ func (l *Lustre2) Gather(acc telegraf.Accumulator) error {
|
|||
tags := map[string]string{
|
||||
"name": name,
|
||||
}
|
||||
if _, ok := fields["jobid"]; ok {
|
||||
if jobid, ok := fields["jobid"].(string); ok {
|
||||
tags["jobid"] = jobid
|
||||
}
|
||||
delete(fields, "jobid")
|
||||
}
|
||||
acc.AddFields("lustre2", fields, tags)
|
||||
}
|
||||
|
||||
|
|
|
@ -38,6 +38,23 @@ cache_hit 7393729777 samples [pages] 1 1 7393729777
|
|||
cache_miss 11653333250 samples [pages] 1 1 11653333250
|
||||
`
|
||||
|
||||
const obdfilterJobStatsContents = `job_stats:
|
||||
- job_id: testjob1
|
||||
snapshot_time: 1461772761
|
||||
read_bytes: { samples: 1, unit: bytes, min: 4096, max: 4096, sum: 4096 }
|
||||
write_bytes: { samples: 25, unit: bytes, min: 1048576, max: 1048576, sum: 26214400 }
|
||||
getattr: { samples: 0, unit: reqs }
|
||||
setattr: { samples: 0, unit: reqs }
|
||||
punch: { samples: 1, unit: reqs }
|
||||
sync: { samples: 0, unit: reqs }
|
||||
destroy: { samples: 0, unit: reqs }
|
||||
create: { samples: 0, unit: reqs }
|
||||
statfs: { samples: 0, unit: reqs }
|
||||
get_info: { samples: 0, unit: reqs }
|
||||
set_info: { samples: 0, unit: reqs }
|
||||
quotactl: { samples: 0, unit: reqs }
|
||||
`
|
||||
|
||||
const mdtProcContents = `snapshot_time 1438693238.20113 secs.usecs
|
||||
open 1024577037 samples [reqs]
|
||||
close 873243496 samples [reqs]
|
||||
|
@ -57,6 +74,27 @@ samedir_rename 259625 samples [reqs]
|
|||
crossdir_rename 369571 samples [reqs]
|
||||
`
|
||||
|
||||
const mdtJobStatsContents = `job_stats:
|
||||
- job_id: testjob1
|
||||
snapshot_time: 1461772761
|
||||
open: { samples: 5, unit: reqs }
|
||||
close: { samples: 4, unit: reqs }
|
||||
mknod: { samples: 6, unit: reqs }
|
||||
link: { samples: 8, unit: reqs }
|
||||
unlink: { samples: 90, unit: reqs }
|
||||
mkdir: { samples: 521, unit: reqs }
|
||||
rmdir: { samples: 520, unit: reqs }
|
||||
rename: { samples: 9, unit: reqs }
|
||||
getattr: { samples: 11, unit: reqs }
|
||||
setattr: { samples: 1, unit: reqs }
|
||||
getxattr: { samples: 3, unit: reqs }
|
||||
setxattr: { samples: 4, unit: reqs }
|
||||
statfs: { samples: 1205, unit: reqs }
|
||||
sync: { samples: 2, unit: reqs }
|
||||
samedir_rename: { samples: 705, unit: reqs }
|
||||
crossdir_rename: { samples: 200, unit: reqs }
|
||||
`
|
||||
|
||||
func TestLustre2GeneratesMetrics(t *testing.T) {
|
||||
|
||||
tempdir := os.TempDir() + "/telegraf/proc/fs/lustre/"
|
||||
|
@ -83,6 +121,7 @@ func TestLustre2GeneratesMetrics(t *testing.T) {
|
|||
err = ioutil.WriteFile(obddir+"/"+ost_name+"/stats", []byte(obdfilterProcContents), 0644)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Begin by testing standard Lustre stats
|
||||
m := &Lustre2{
|
||||
Ost_procfiles: []string{obddir + "/*/stats", osddir + "/*/stats"},
|
||||
Mds_procfiles: []string{mdtdir + "/*/md_stats"},
|
||||
|
@ -128,3 +167,82 @@ func TestLustre2GeneratesMetrics(t *testing.T) {
|
|||
err = os.RemoveAll(os.TempDir() + "/telegraf")
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestLustre2GeneratesJobstatsMetrics(t *testing.T) {
|
||||
|
||||
tempdir := os.TempDir() + "/telegraf/proc/fs/lustre/"
|
||||
ost_name := "OST0001"
|
||||
job_name := "testjob1"
|
||||
|
||||
mdtdir := tempdir + "/mdt/"
|
||||
err := os.MkdirAll(mdtdir+"/"+ost_name, 0755)
|
||||
require.NoError(t, err)
|
||||
|
||||
obddir := tempdir + "/obdfilter/"
|
||||
err = os.MkdirAll(obddir+"/"+ost_name, 0755)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = ioutil.WriteFile(mdtdir+"/"+ost_name+"/job_stats", []byte(mdtJobStatsContents), 0644)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = ioutil.WriteFile(obddir+"/"+ost_name+"/job_stats", []byte(obdfilterJobStatsContents), 0644)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Test Lustre Jobstats
|
||||
m := &Lustre2{
|
||||
Ost_procfiles: []string{obddir + "/*/job_stats"},
|
||||
Mds_procfiles: []string{mdtdir + "/*/job_stats"},
|
||||
}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
||||
err = m.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
tags := map[string]string{
|
||||
"name": ost_name,
|
||||
"jobid": job_name,
|
||||
}
|
||||
|
||||
fields := map[string]interface{}{
|
||||
"jobstats_read_calls": uint64(1),
|
||||
"jobstats_read_min_size": uint64(4096),
|
||||
"jobstats_read_max_size": uint64(4096),
|
||||
"jobstats_read_bytes": uint64(4096),
|
||||
"jobstats_write_calls": uint64(25),
|
||||
"jobstats_write_min_size": uint64(1048576),
|
||||
"jobstats_write_max_size": uint64(1048576),
|
||||
"jobstats_write_bytes": uint64(26214400),
|
||||
"jobstats_ost_getattr": uint64(0),
|
||||
"jobstats_ost_setattr": uint64(0),
|
||||
"jobstats_punch": uint64(1),
|
||||
"jobstats_ost_sync": uint64(0),
|
||||
"jobstats_destroy": uint64(0),
|
||||
"jobstats_create": uint64(0),
|
||||
"jobstats_ost_statfs": uint64(0),
|
||||
"jobstats_get_info": uint64(0),
|
||||
"jobstats_set_info": uint64(0),
|
||||
"jobstats_quotactl": uint64(0),
|
||||
"jobstats_open": uint64(5),
|
||||
"jobstats_close": uint64(4),
|
||||
"jobstats_mknod": uint64(6),
|
||||
"jobstats_link": uint64(8),
|
||||
"jobstats_unlink": uint64(90),
|
||||
"jobstats_mkdir": uint64(521),
|
||||
"jobstats_rmdir": uint64(520),
|
||||
"jobstats_rename": uint64(9),
|
||||
"jobstats_getattr": uint64(11),
|
||||
"jobstats_setattr": uint64(1),
|
||||
"jobstats_getxattr": uint64(3),
|
||||
"jobstats_setxattr": uint64(4),
|
||||
"jobstats_statfs": uint64(1205),
|
||||
"jobstats_sync": uint64(2),
|
||||
"jobstats_samedir_rename": uint64(705),
|
||||
"jobstats_crossdir_rename": uint64(200),
|
||||
}
|
||||
|
||||
acc.AssertContainsTaggedFields(t, "lustre2", fields, tags)
|
||||
|
||||
err = os.RemoveAll(os.TempDir() + "/telegraf")
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue