telegraf/plugins/inputs/nfs/nfs.go

344 lines
7.9 KiB
Go
Raw Normal View History

package nfs
import (
"bufio"
"log"
"os"
"strconv"
"strings"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
type NFS struct {
Iostat bool
Fullstat bool
}
var sampleConfig = `
# iostat = true
# Read iostat metrics
iostat = true
# fullstat = true
# Read all metrics
fullstat = true
`
func (n *NFS) SampleConfig() string {
return sampleConfig
}
func (n *NFS) Description() string {
return "Read NFS metrics from /proc/self/mountstats"
}
var eventsFields = []string{
"inoderevalidates",
"dentryrevalidates",
"datainvalidates",
"attrinvalidates",
"vfsopen",
"vfslookup",
"vfspermission",
"vfsupdatepage",
"vfsreadpage",
"vfsreadpages",
"vfswritepage",
"vfswritepages",
"vfsreaddir",
"vfssetattr",
"vfsflush",
"vfsfsync",
"vfslock",
"vfsrelease",
"congestionwait",
"setattrtrunc",
"extendwrite",
"sillyrenames",
"shortreads",
"shortwrites",
"delay",
"pnfsreads",
"pnfswrites",
}
var bytesFields = []string{
"normalreadbytes",
"normalwritebytes",
"directreadbytes",
"directwritebytes",
"serverreadbytes",
"serverwritebytes",
"readpages",
"writepages",
}
var xprtudpFields = []string{
// "port",
"bind_count",
"rpcsends",
"rpcreceives",
"badxids",
"inflightsends",
"backlogutil",
}
var xprttcpFields = []string{
// "port",
"bind_count",
"connect_count",
"connect_time",
"idle_time",
"rpcsends",
"rpcreceives",
"badxids",
"inflightsends",
"backlogutil",
}
var nfs3Fields = []string{
"NULL",
"GETATTR",
"SETATTR",
"LOOKUP",
"ACCESS",
"READLINK",
"READ",
"WRITE",
"CREATE",
"MKDIR",
"SYMLINK",
"MKNOD",
"REMOVE",
"RMDIR",
"RENAME",
"LINK",
"READDIR",
"READDIRPLUS",
"FSSTAT",
"FSINFO",
"PATHCONF",
"COMMIT",
}
var nfs4Fields = []string{
"NULL",
"READ",
"WRITE",
"COMMIT",
"OPEN",
"OPEN_CONFIRM",
"OPEN_NOATTR",
"OPEN_DOWNGRADE",
"CLOSE",
"SETATTR",
"FSINFO",
"RENEW",
"SETCLIENTID",
"SETCLIENTID_CONFIRM",
"LOCK",
"LOCKT",
"LOCKU",
"ACCESS",
"GETATTR",
"LOOKUP",
"LOOKUP_ROOT",
"REMOVE",
"RENAME",
"LINK",
"SYMLINK",
"CREATE",
"PATHCONF",
"STATFS",
"READLINK",
"READDIR",
"SERVER_CAPS",
"DELEGRETURN",
"GETACL",
"SETACL",
"FS_LOCATIONS",
"RELEASE_LOCKOWNER",
"SECINFO",
"FSID_PRESENT",
"EXCHANGE_ID",
"CREATE_SESSION",
"DESTROY_SESSION",
"SEQUENCE",
"GET_LEASE_TIME",
"RECLAIM_COMPLETE",
"LAYOUTGET",
"GETDEVICEINFO",
"LAYOUTCOMMIT",
"LAYOUTRETURN",
"SECINFO_NO_NAME",
"TEST_STATEID",
"FREE_STATEID",
"GETDEVICELIST",
"BIND_CONN_TO_SESSION",
"DESTROY_CLIENTID",
"SEEK",
"ALLOCATE",
"DEALLOCATE",
"LAYOUTSTATS",
"CLONE",
}
var nfsopFields = []string {
"ops",
"trans",
"timeouts",
"bytes_sent",
"bytes_recv",
"queue_time",
"response_time",
"total_time",
}
func convert(line []string) []float64 {
var nline []float64
for _, l := range line[1:] {
f, _ := strconv.ParseFloat(l, 64)
nline = append(nline, f)
}
return nline
}
func In(list []string, val string) bool {
for _, v := range list {
if v == val {
return true
}
}
return false
}
func (n *NFS) parseStat(mountpoint string, version string, line []string, acc telegraf.Accumulator) error {
tags := map[string]string{"mountpoint": mountpoint}
nline := convert(line)
first := strings.Replace(line[0], ":", "", 1)
var fields = make(map[string]interface{})
if version == "3" || version == "4" {
if In(nfs3Fields, first) {
if first == "READ" {
fields["read_ops"] = nline[0]
fields["read_retrans"] = (nline[1] - nline[0])
fields["read_bytes"] = (nline[3] + nline[4])
fields["read_rtt"] = nline[6]
fields["read_exe"] = nline[7]
acc.AddFields("nfsstat_read", fields, tags)
} else if first == "WRITE" {
fields["write_ops"] = nline[0]
fields["write_retrans"] = (nline[1] - nline[0])
fields["write_bytes"] = (nline[3] + nline[4])
fields["write_rtt"] = nline[6]
fields["write_exe"] = nline[7]
acc.AddFields("nfsstat_write", fields, tags)
}
}
}
return nil
}
func (n *NFS) parseData(mountpoint string, version string, line []string, acc telegraf.Accumulator) error {
tags := map[string]string{"mountpoint": mountpoint}
nline := convert(line)
first := strings.Replace(line[0], ":", "", 1)
var fields = make(map[string]interface{})
if first == "events" {
for i,t := range eventsFields {
fields[t] = nline[i]
}
acc.AddFields("nfs_events", fields, tags)
} else if first == "bytes" {
for i,t := range bytesFields {
fields[t] = nline[i]
}
acc.AddFields("nfs_bytes", fields, tags)
} else if first == "xprt" {
switch line[1] {
case "tcp": {
for i,t := range xprttcpFields {
fields[t] = nline[i+2]
}
acc.AddFields("nfs_xprttcp", fields, tags)
}
case "udp": {
for i,t := range xprtudpFields {
fields[t] = nline[i+2]
}
acc.AddFields("nfs_xprtudp", fields, tags)
}
}
} else if version == "3" {
if In(nfs3Fields, first) {
for i , t := range nline {
item := fmt.Sprintf("%s_%s", first, nfsopFields[i])
fields[item] = t
}
acc.AddFields("nfs_ops", fields, tags)
}
} else if version == "4" {
if In(nfs4Fields, first) {
for i , t := range nline {
item := fmt.Sprintf("%s_%s", first, nfsopFields[i])
fields[item] = t
}
acc.AddFields("nfs_ops", fields, tags)
}
}
return nil
}
func (n *NFS) processText(scanner *bufio.Scanner, acc telegraf.Accumulator) error {
var device string
var version string
for scanner.Scan() {
line := strings.Fields(scanner.Text())
if In(line, "fstype") && In(line, "nfs") {
device = fmt.Sprintf("%s %s", line[1], line[4])
} else if In(line, "(nfs)") {
version = strings.Split(line[5], "/")[1]
}
if (len(line) > 0) {
if n.Iostat == true {
n.parseStat(device, version, line, acc)
}
if n.Fullstat == true {
n.parseData(device, version, line, acc)
}
}
}
return nil
}
func (n *NFS) Gather(acc telegraf.Accumulator) error {
var outerr error
file, err := os.Open("/proc/self/mountstats")
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
n.processText(scanner, acc)
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
return outerr
}
func init() {
inputs.Add("nfs", func() telegraf.Input {
return &NFS{}
})
}