Adding Jobstats support to Lustre2 input plugin

Lustre Jobstats allows for RPCs to be tagged with a value, such
as a job's ID.  This allows for per job statistics. This plugin
collects statistics and tags the data with the jobid.

closes #1107
This commit is contained in:
Jesse Hanley 2016-04-27 15:14:25 -04:00 committed by Cameron Sparr
parent 89f2c0b0a4
commit a7b0861436
3 changed files with 378 additions and 7 deletions

View File

@ -57,6 +57,7 @@ based on _prefix_ in addition to globs. This means that a filter like
- [#1086](https://github.com/influxdata/telegraf/pull/1086): Ability to specify AWS keys in config file. Thanks @johnrengleman!
- [#1096](https://github.com/influxdata/telegraf/pull/1096): Performance refactor of running output buffers.
- [#967](https://github.com/influxdata/telegraf/issues/967): Buffer logging improvements.
- [#1107](https://github.com/influxdata/telegraf/issues/1107): Support lustre2 job stats. Thanks @hanleyja!
### Bugfixes

View File

@ -34,9 +34,13 @@ var sampleConfig = `
##
# ost_procfiles = [
# "/proc/fs/lustre/obdfilter/*/stats",
# "/proc/fs/lustre/osd-ldiskfs/*/stats"
# "/proc/fs/lustre/osd-ldiskfs/*/stats",
# "/proc/fs/lustre/obdfilter/*/job_stats",
# ]
# mds_procfiles = [
# "/proc/fs/lustre/mdt/*/md_stats",
# "/proc/fs/lustre/mdt/*/job_stats",
# ]
# mds_procfiles = ["/proc/fs/lustre/mdt/*/md_stats"]
`
/* The wanted fields would be a []string if not for the
@ -82,6 +86,139 @@ var wanted_ost_fields = []*mapping{
},
}
var wanted_ost_jobstats_fields = []*mapping{
{ // The read line has several fields, so we need to differentiate what they are
inProc: "read",
field: 3,
reportAs: "jobstats_read_calls",
},
{
inProc: "read",
field: 7,
reportAs: "jobstats_read_min_size",
},
{
inProc: "read",
field: 9,
reportAs: "jobstats_read_max_size",
},
{
inProc: "read",
field: 11,
reportAs: "jobstats_read_bytes",
},
{ // Different inProc for newer versions
inProc: "read_bytes",
field: 3,
reportAs: "jobstats_read_calls",
},
{
inProc: "read_bytes",
field: 7,
reportAs: "jobstats_read_min_size",
},
{
inProc: "read_bytes",
field: 9,
reportAs: "jobstats_read_max_size",
},
{
inProc: "read_bytes",
field: 11,
reportAs: "jobstats_read_bytes",
},
{ // We need to do the same for the write fields
inProc: "write",
field: 3,
reportAs: "jobstats_write_calls",
},
{
inProc: "write",
field: 7,
reportAs: "jobstats_write_min_size",
},
{
inProc: "write",
field: 9,
reportAs: "jobstats_write_max_size",
},
{
inProc: "write",
field: 11,
reportAs: "jobstats_write_bytes",
},
{ // Different inProc for newer versions
inProc: "write_bytes",
field: 3,
reportAs: "jobstats_write_calls",
},
{
inProc: "write_bytes",
field: 7,
reportAs: "jobstats_write_min_size",
},
{
inProc: "write_bytes",
field: 9,
reportAs: "jobstats_write_max_size",
},
{
inProc: "write_bytes",
field: 11,
reportAs: "jobstats_write_bytes",
},
{
inProc: "getattr",
field: 3,
reportAs: "jobstats_ost_getattr",
},
{
inProc: "setattr",
field: 3,
reportAs: "jobstats_ost_setattr",
},
{
inProc: "punch",
field: 3,
reportAs: "jobstats_punch",
},
{
inProc: "sync",
field: 3,
reportAs: "jobstats_ost_sync",
},
{
inProc: "destroy",
field: 3,
reportAs: "jobstats_destroy",
},
{
inProc: "create",
field: 3,
reportAs: "jobstats_create",
},
{
inProc: "statfs",
field: 3,
reportAs: "jobstats_ost_statfs",
},
{
inProc: "get_info",
field: 3,
reportAs: "jobstats_get_info",
},
{
inProc: "set_info",
field: 3,
reportAs: "jobstats_set_info",
},
{
inProc: "quotactl",
field: 3,
reportAs: "jobstats_quotactl",
},
}
var wanted_mds_fields = []*mapping{
{
inProc: "open",
@ -133,6 +270,89 @@ var wanted_mds_fields = []*mapping{
},
}
var wanted_mdt_jobstats_fields = []*mapping{
{
inProc: "open",
field: 3,
reportAs: "jobstats_open",
},
{
inProc: "close",
field: 3,
reportAs: "jobstats_close",
},
{
inProc: "mknod",
field: 3,
reportAs: "jobstats_mknod",
},
{
inProc: "link",
field: 3,
reportAs: "jobstats_link",
},
{
inProc: "unlink",
field: 3,
reportAs: "jobstats_unlink",
},
{
inProc: "mkdir",
field: 3,
reportAs: "jobstats_mkdir",
},
{
inProc: "rmdir",
field: 3,
reportAs: "jobstats_rmdir",
},
{
inProc: "rename",
field: 3,
reportAs: "jobstats_rename",
},
{
inProc: "getattr",
field: 3,
reportAs: "jobstats_getattr",
},
{
inProc: "setattr",
field: 3,
reportAs: "jobstats_setattr",
},
{
inProc: "getxattr",
field: 3,
reportAs: "jobstats_getxattr",
},
{
inProc: "setxattr",
field: 3,
reportAs: "jobstats_setxattr",
},
{
inProc: "statfs",
field: 3,
reportAs: "jobstats_statfs",
},
{
inProc: "sync",
field: 3,
reportAs: "jobstats_sync",
},
{
inProc: "samedir_rename",
field: 3,
reportAs: "jobstats_samedir_rename",
},
{
inProc: "crossdir_rename",
field: 3,
reportAs: "jobstats_crossdir_rename",
},
}
func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping, acc telegraf.Accumulator) error {
files, err := filepath.Glob(fileglob)
if err != nil {
@ -143,7 +363,7 @@ func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping,
/* Turn /proc/fs/lustre/obdfilter/<ost_name>/stats and similar
* into just the object store target name
* Assumpion: the target name is always second to last,
* which is true in Lustre 2.1->2.5
* which is true in Lustre 2.1->2.8
*/
path := strings.Split(file, "/")
name := path[len(path)-2]
@ -161,16 +381,21 @@ func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping,
for _, line := range lines {
parts := strings.Fields(line)
if strings.HasPrefix(line, "- job_id:") {
// Set the job_id explicitly if present
fields["jobid"] = parts[2]
}
for _, wanted := range wanted_fields {
var data uint64
if parts[0] == wanted.inProc {
if strings.TrimSuffix(parts[0], ":") == wanted.inProc {
wanted_field := wanted.field
// if not set, assume field[1]. Shouldn't be field[0], as
// that's a string
if wanted_field == 0 {
wanted_field = 1
}
data, err = strconv.ParseUint((parts[wanted_field]), 10, 64)
data, err = strconv.ParseUint(strings.TrimSuffix((parts[wanted_field]), ","), 10, 64)
if err != nil {
return err
}
@ -213,6 +438,12 @@ func (l *Lustre2) Gather(acc telegraf.Accumulator) error {
if err != nil {
return err
}
// per job statistics are in obdfilter/<ost_name>/job_stats
err = l.GetLustreProcStats("/proc/fs/lustre/obdfilter/*/job_stats",
wanted_ost_jobstats_fields, acc)
if err != nil {
return err
}
}
if len(l.Mds_procfiles) == 0 {
@ -222,16 +453,31 @@ func (l *Lustre2) Gather(acc telegraf.Accumulator) error {
if err != nil {
return err
}
// Metadata target job stats
err = l.GetLustreProcStats("/proc/fs/lustre/mdt/*/job_stats",
wanted_mdt_jobstats_fields, acc)
if err != nil {
return err
}
}
for _, procfile := range l.Ost_procfiles {
err := l.GetLustreProcStats(procfile, wanted_ost_fields, acc)
ost_fields := wanted_ost_fields
if strings.HasSuffix(procfile, "job_stats") {
ost_fields = wanted_ost_jobstats_fields
}
err := l.GetLustreProcStats(procfile, ost_fields, acc)
if err != nil {
return err
}
}
for _, procfile := range l.Mds_procfiles {
err := l.GetLustreProcStats(procfile, wanted_mds_fields, acc)
mdt_fields := wanted_mds_fields
if strings.HasSuffix(procfile, "job_stats") {
mdt_fields = wanted_mdt_jobstats_fields
}
err := l.GetLustreProcStats(procfile, mdt_fields, acc)
if err != nil {
return err
}
@ -241,6 +487,12 @@ func (l *Lustre2) Gather(acc telegraf.Accumulator) error {
tags := map[string]string{
"name": name,
}
if _, ok := fields["jobid"]; ok {
if jobid, ok := fields["jobid"].(string); ok {
tags["jobid"] = jobid
}
delete(fields, "jobid")
}
acc.AddFields("lustre2", fields, tags)
}

View File

@ -38,6 +38,23 @@ cache_hit 7393729777 samples [pages] 1 1 7393729777
cache_miss 11653333250 samples [pages] 1 1 11653333250
`
const obdfilterJobStatsContents = `job_stats:
- job_id: testjob1
snapshot_time: 1461772761
read_bytes: { samples: 1, unit: bytes, min: 4096, max: 4096, sum: 4096 }
write_bytes: { samples: 25, unit: bytes, min: 1048576, max: 1048576, sum: 26214400 }
getattr: { samples: 0, unit: reqs }
setattr: { samples: 0, unit: reqs }
punch: { samples: 1, unit: reqs }
sync: { samples: 0, unit: reqs }
destroy: { samples: 0, unit: reqs }
create: { samples: 0, unit: reqs }
statfs: { samples: 0, unit: reqs }
get_info: { samples: 0, unit: reqs }
set_info: { samples: 0, unit: reqs }
quotactl: { samples: 0, unit: reqs }
`
const mdtProcContents = `snapshot_time 1438693238.20113 secs.usecs
open 1024577037 samples [reqs]
close 873243496 samples [reqs]
@ -57,6 +74,27 @@ samedir_rename 259625 samples [reqs]
crossdir_rename 369571 samples [reqs]
`
const mdtJobStatsContents = `job_stats:
- job_id: testjob1
snapshot_time: 1461772761
open: { samples: 5, unit: reqs }
close: { samples: 4, unit: reqs }
mknod: { samples: 6, unit: reqs }
link: { samples: 8, unit: reqs }
unlink: { samples: 90, unit: reqs }
mkdir: { samples: 521, unit: reqs }
rmdir: { samples: 520, unit: reqs }
rename: { samples: 9, unit: reqs }
getattr: { samples: 11, unit: reqs }
setattr: { samples: 1, unit: reqs }
getxattr: { samples: 3, unit: reqs }
setxattr: { samples: 4, unit: reqs }
statfs: { samples: 1205, unit: reqs }
sync: { samples: 2, unit: reqs }
samedir_rename: { samples: 705, unit: reqs }
crossdir_rename: { samples: 200, unit: reqs }
`
func TestLustre2GeneratesMetrics(t *testing.T) {
tempdir := os.TempDir() + "/telegraf/proc/fs/lustre/"
@ -83,6 +121,7 @@ func TestLustre2GeneratesMetrics(t *testing.T) {
err = ioutil.WriteFile(obddir+"/"+ost_name+"/stats", []byte(obdfilterProcContents), 0644)
require.NoError(t, err)
// Begin by testing standard Lustre stats
m := &Lustre2{
Ost_procfiles: []string{obddir + "/*/stats", osddir + "/*/stats"},
Mds_procfiles: []string{mdtdir + "/*/md_stats"},
@ -128,3 +167,82 @@ func TestLustre2GeneratesMetrics(t *testing.T) {
err = os.RemoveAll(os.TempDir() + "/telegraf")
require.NoError(t, err)
}
func TestLustre2GeneratesJobstatsMetrics(t *testing.T) {
tempdir := os.TempDir() + "/telegraf/proc/fs/lustre/"
ost_name := "OST0001"
job_name := "testjob1"
mdtdir := tempdir + "/mdt/"
err := os.MkdirAll(mdtdir+"/"+ost_name, 0755)
require.NoError(t, err)
obddir := tempdir + "/obdfilter/"
err = os.MkdirAll(obddir+"/"+ost_name, 0755)
require.NoError(t, err)
err = ioutil.WriteFile(mdtdir+"/"+ost_name+"/job_stats", []byte(mdtJobStatsContents), 0644)
require.NoError(t, err)
err = ioutil.WriteFile(obddir+"/"+ost_name+"/job_stats", []byte(obdfilterJobStatsContents), 0644)
require.NoError(t, err)
// Test Lustre Jobstats
m := &Lustre2{
Ost_procfiles: []string{obddir + "/*/job_stats"},
Mds_procfiles: []string{mdtdir + "/*/job_stats"},
}
var acc testutil.Accumulator
err = m.Gather(&acc)
require.NoError(t, err)
tags := map[string]string{
"name": ost_name,
"jobid": job_name,
}
fields := map[string]interface{}{
"jobstats_read_calls": uint64(1),
"jobstats_read_min_size": uint64(4096),
"jobstats_read_max_size": uint64(4096),
"jobstats_read_bytes": uint64(4096),
"jobstats_write_calls": uint64(25),
"jobstats_write_min_size": uint64(1048576),
"jobstats_write_max_size": uint64(1048576),
"jobstats_write_bytes": uint64(26214400),
"jobstats_ost_getattr": uint64(0),
"jobstats_ost_setattr": uint64(0),
"jobstats_punch": uint64(1),
"jobstats_ost_sync": uint64(0),
"jobstats_destroy": uint64(0),
"jobstats_create": uint64(0),
"jobstats_ost_statfs": uint64(0),
"jobstats_get_info": uint64(0),
"jobstats_set_info": uint64(0),
"jobstats_quotactl": uint64(0),
"jobstats_open": uint64(5),
"jobstats_close": uint64(4),
"jobstats_mknod": uint64(6),
"jobstats_link": uint64(8),
"jobstats_unlink": uint64(90),
"jobstats_mkdir": uint64(521),
"jobstats_rmdir": uint64(520),
"jobstats_rename": uint64(9),
"jobstats_getattr": uint64(11),
"jobstats_setattr": uint64(1),
"jobstats_getxattr": uint64(3),
"jobstats_setxattr": uint64(4),
"jobstats_statfs": uint64(1205),
"jobstats_sync": uint64(2),
"jobstats_samedir_rename": uint64(705),
"jobstats_crossdir_rename": uint64(200),
}
acc.AssertContainsTaggedFields(t, "lustre2", fields, tags)
err = os.RemoveAll(os.TempDir() + "/telegraf")
require.NoError(t, err)
}