diff --git a/plugins/all/all.go b/plugins/all/all.go index 595fdcaf4..2ca3c8cb3 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -3,6 +3,7 @@ package all import ( _ "github.com/influxdb/telegraf/plugins/elasticsearch" _ "github.com/influxdb/telegraf/plugins/kafka_consumer" + _ "github.com/influxdb/telegraf/plugins/lustre2" _ "github.com/influxdb/telegraf/plugins/memcached" _ "github.com/influxdb/telegraf/plugins/mongodb" _ "github.com/influxdb/telegraf/plugins/mysql" diff --git a/plugins/lustre2/lustre2.go b/plugins/lustre2/lustre2.go new file mode 100644 index 000000000..72a83e5f1 --- /dev/null +++ b/plugins/lustre2/lustre2.go @@ -0,0 +1,236 @@ +// +build linux + +/* +Lustre 2.x telegraf plugin + +Lustre (http://lustre.org/) is an open-source, parallel file system +for HPC environments. It stores statistics about its activity in +/proc + +*/ +package lustre2 + +import ( + "path/filepath" + "strconv" + "strings" + + "github.com/influxdb/telegraf/plugins" + common "github.com/influxdb/telegraf/plugins/system/ps/common" +) + +// Lustre proc files can change between versions, so we want to future-proof +// by letting people choose what to look at. +type Lustre2 struct { + Ost_procfiles []string + Mds_procfiles []string +} + +var sampleConfig = ` +# An array of /proc globs to search for Lustre stats +# If not specified, the default will work on Lustre 2.5.x +# +# ost_procfiles = ["/proc/fs/lustre/obdfilter/*/stats", "/proc/fs/lustre/osd-ldiskfs/*/stats"] +# mds_procfiles = ["/proc/fs/lustre/mdt/*/md_stats"]` + +/* The wanted fields would be a []string if not for the +lines that start with read_bytes/write_bytes and contain + both the byte count and the function call count +*/ +type mapping struct { + inProc string // What to look for at the start of a line in /proc/fs/lustre/* + field uint32 // which field to extract from that line + reportAs string // What measurement name to use + tag string // Additional tag to add for this metric +} + +var wanted_ost_fields = []*mapping{ + { + inProc: "write_bytes", + field: 6, + reportAs: "write_bytes", + }, + { // line starts with 'write_bytes', but value write_calls is in second column + inProc: "write_bytes", + field: 1, + reportAs: "write_calls", + }, + { + inProc: "read_bytes", + field: 6, + reportAs: "read_bytes", + }, + { // line starts with 'read_bytes', but value read_calls is in second column + inProc: "read_bytes", + field: 1, + reportAs: "read_calls", + }, + { + inProc: "cache_hit", + }, + { + inProc: "cache_miss", + }, + { + inProc: "cache_access", + }, +} + +var wanted_mds_fields = []*mapping{ + { + inProc: "open", + }, + { + inProc: "close", + }, + { + inProc: "mknod", + }, + { + inProc: "link", + }, + { + inProc: "unlink", + }, + { + inProc: "mkdir", + }, + { + inProc: "rmdir", + }, + { + inProc: "rename", + }, + { + inProc: "getattr", + }, + { + inProc: "setattr", + }, + { + inProc: "getxattr", + }, + { + inProc: "setxattr", + }, + { + inProc: "statfs", + }, + { + inProc: "sync", + }, + { + inProc: "samedir_rename", + }, + { + inProc: "crossdir_rename", + }, +} + +func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping, acc plugins.Accumulator) error { + files, err := filepath.Glob(fileglob) + if err != nil { + return err + } + + for _, file := range files { + /* Turn /proc/fs/lustre/obdfilter//stats and similar + * into just the object store target name + * Assumpion: the target name is always second to last, + * which is true in Lustre 2.1->2.5 + */ + path := strings.Split(file, "/") + name := path[len(path)-2] + tags := map[string]string{ + "name": name, + } + + lines, err := common.ReadLines(file) + if err != nil { + return err + } + + for _, line := range lines { + fields := strings.Fields(line) + + for _, wanted := range wanted_fields { + var data uint64 + if fields[0] == wanted.inProc { + wanted_field := wanted.field + // if not set, assume field[1]. Shouldn't be field[0], as + // that's a string + if wanted_field == 0 { + wanted_field = 1 + } + data, err = strconv.ParseUint((fields[wanted_field]), 10, 64) + if err != nil { + return err + } + report_name := wanted.inProc + if wanted.reportAs != "" { + report_name = wanted.reportAs + } + acc.Add(report_name, data, tags) + + } + } + } + } + return nil +} + +// SampleConfig returns sample configuration message +func (l *Lustre2) SampleConfig() string { + return sampleConfig +} + +// Description returns description of Lustre2 plugin +func (l *Lustre2) Description() string { + return "Read metrics from local Lustre service on OST, MDS" +} + +// Gather reads stats from all lustre targets +func (l *Lustre2) Gather(acc plugins.Accumulator) error { + + if len(l.Ost_procfiles) == 0 { + // read/write bytes are in obdfilter//stats + err := l.GetLustreProcStats("/proc/fs/lustre/obdfilter/*/stats", wanted_ost_fields, acc) + if err != nil { + return err + } + // cache counters are in osd-ldiskfs//stats + err = l.GetLustreProcStats("/proc/fs/lustre/osd-ldiskfs/*/stats", wanted_ost_fields, acc) + if err != nil { + return err + } + } + + if len(l.Mds_procfiles) == 0 { + // Metadata server stats + err := l.GetLustreProcStats("/proc/fs/lustre/mdt/*/md_stats", wanted_mds_fields, acc) + if err != nil { + return err + } + } + + for _, procfile := range l.Ost_procfiles { + err := l.GetLustreProcStats(procfile, wanted_ost_fields, acc) + if err != nil { + return err + } + } + for _, procfile := range l.Mds_procfiles { + err := l.GetLustreProcStats(procfile, wanted_mds_fields, acc) + if err != nil { + return err + } + } + + return nil +} + +func init() { + plugins.Add("lustre2", func() plugins.Plugin { + return &Lustre2{} + }) +} diff --git a/plugins/lustre2/lustre2_test.go b/plugins/lustre2/lustre2_test.go new file mode 100644 index 000000000..850a4ff32 --- /dev/null +++ b/plugins/lustre2/lustre2_test.go @@ -0,0 +1,144 @@ +package lustre2 + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Set config file variables to point to fake directory structure instead of /proc? + +const obdfilterProcContents = `snapshot_time 1438693064.430544 secs.usecs +read_bytes 203238095 samples [bytes] 4096 1048576 78026117632000 +write_bytes 71893382 samples [bytes] 1 1048576 15201500833981 +get_info 1182008495 samples [reqs] +set_info_async 2 samples [reqs] +connect 1117 samples [reqs] +reconnect 1160 samples [reqs] +disconnect 1084 samples [reqs] +statfs 3575885 samples [reqs] +create 698 samples [reqs] +destroy 3190060 samples [reqs] +setattr 605647 samples [reqs] +punch 805187 samples [reqs] +sync 6608753 samples [reqs] +preprw 275131477 samples [reqs] +commitrw 275131477 samples [reqs] +quotactl 229231 samples [reqs] +ping 78020757 samples [reqs] +` + +const osdldiskfsProcContents = `snapshot_time 1438693135.640551 secs.usecs +get_page 275132812 samples [usec] 0 3147 1320420955 22041662259 +cache_access 19047063027 samples [pages] 1 1 19047063027 +cache_hit 7393729777 samples [pages] 1 1 7393729777 +cache_miss 11653333250 samples [pages] 1 1 11653333250 +` + +const mdtProcContents = `snapshot_time 1438693238.20113 secs.usecs +open 1024577037 samples [reqs] +close 873243496 samples [reqs] +mknod 349042 samples [reqs] +link 445 samples [reqs] +unlink 3549417 samples [reqs] +mkdir 705499 samples [reqs] +rmdir 227434 samples [reqs] +rename 629196 samples [reqs] +getattr 1503663097 samples [reqs] +setattr 1898364 samples [reqs] +getxattr 6145349681 samples [reqs] +setxattr 83969 samples [reqs] +statfs 2916320 samples [reqs] +sync 434081 samples [reqs] +samedir_rename 259625 samples [reqs] +crossdir_rename 369571 samples [reqs] +` + +type metrics struct { + name string + value uint64 +} + +func TestLustre2GeneratesMetrics(t *testing.T) { + + tempdir := os.TempDir() + "/telegraf/proc/fs/lustre/" + ost_name := "OST0001" + + mdtdir := tempdir + "/mdt/" + err := os.MkdirAll(mdtdir+"/"+ost_name, 0755) + require.NoError(t, err) + + osddir := tempdir + "/osd-ldiskfs/" + err = os.MkdirAll(osddir+"/"+ost_name, 0755) + require.NoError(t, err) + + obddir := tempdir + "/obdfilter/" + err = os.MkdirAll(obddir+"/"+ost_name, 0755) + require.NoError(t, err) + + err = ioutil.WriteFile(mdtdir+"/"+ost_name+"/md_stats", []byte(mdtProcContents), 0644) + require.NoError(t, err) + + err = ioutil.WriteFile(osddir+"/"+ost_name+"/stats", []byte(osdldiskfsProcContents), 0644) + require.NoError(t, err) + + err = ioutil.WriteFile(obddir+"/"+ost_name+"/stats", []byte(obdfilterProcContents), 0644) + require.NoError(t, err) + + m := &Lustre2{ + Ost_procfiles: []string{obddir + "/*/stats", osddir + "/*/stats"}, + Mds_procfiles: []string{mdtdir + "/*/md_stats"}, + } + + var acc testutil.Accumulator + + err = m.Gather(&acc) + require.NoError(t, err) + + tags := map[string]string{ + "name": ost_name, + } + + intMetrics := []*metrics{ + { + name: "write_bytes", + value: 15201500833981, + }, + { + name: "read_bytes", + value: 78026117632000, + }, + { + name: "write_calls", + value: 71893382, + }, + { + name: "read_calls", + value: 203238095, + }, + { + name: "cache_hit", + value: 7393729777, + }, + { + name: "cache_access", + value: 19047063027, + }, + { + name: "cache_miss", + value: 11653333250, + }, + } + + for _, metric := range intMetrics { + assert.True(t, acc.HasUIntValue(metric.name), metric.name) + assert.True(t, acc.CheckTaggedValue(metric.name, metric.value, tags)) + } + + err = os.RemoveAll(os.TempDir() + "/telegraf") + require.NoError(t, err) +}