Add LeoFS plugin

This commit is contained in:
mocchira 2015-08-07 08:58:24 +00:00 committed by Cameron Sparr
parent b70f821a10
commit fc95e8401a
3 changed files with 402 additions and 0 deletions

View File

@ -6,6 +6,7 @@ import (
_ "github.com/influxdb/telegraf/plugins/exec" _ "github.com/influxdb/telegraf/plugins/exec"
_ "github.com/influxdb/telegraf/plugins/haproxy" _ "github.com/influxdb/telegraf/plugins/haproxy"
_ "github.com/influxdb/telegraf/plugins/kafka_consumer" _ "github.com/influxdb/telegraf/plugins/kafka_consumer"
_ "github.com/influxdb/telegraf/plugins/leofs"
_ "github.com/influxdb/telegraf/plugins/lustre2" _ "github.com/influxdb/telegraf/plugins/lustre2"
_ "github.com/influxdb/telegraf/plugins/memcached" _ "github.com/influxdb/telegraf/plugins/memcached"
_ "github.com/influxdb/telegraf/plugins/mongodb" _ "github.com/influxdb/telegraf/plugins/mongodb"

228
plugins/leofs/leofs.go Normal file
View File

@ -0,0 +1,228 @@
package leofs
import (
"bufio"
"fmt"
"github.com/influxdb/telegraf/plugins"
"net/url"
"os/exec"
"strconv"
"strings"
"sync"
)
const oid = ".1.3.6.1.4.1.35450"
// For Manager Master
const defaultEndpoint = "127.0.0.1:4020"
type ServerType int
const (
ServerTypeManagerMaster ServerType = iota
ServerTypeManagerSlave
ServerTypeStorage
ServerTypeGateway
)
type LeoFS struct {
Servers []string
}
var KeyMapping = map[ServerType][]string{
ServerTypeManagerMaster: {
"num_of_processes",
"total_memory_usage",
"system_memory_usage",
"processes_memory_usage",
"ets_memory_usage",
"num_of_processes_5min",
"total_memory_usage_5min",
"system_memory_usage_5min",
"processes_memory_usage_5min",
"ets_memory_usage_5min",
"used_allocated_memory",
"allocated_memory",
"used_allocated_memory_5min",
"allocated_memory_5min",
},
ServerTypeManagerSlave: {
"num_of_processes",
"total_memory_usage",
"system_memory_usage",
"processes_memory_usage",
"ets_memory_usage",
"num_of_processes_5min",
"total_memory_usage_5min",
"system_memory_usage_5min",
"processes_memory_usage_5min",
"ets_memory_usage_5min",
"used_allocated_memory",
"allocated_memory",
"used_allocated_memory_5min",
"allocated_memory_5min",
},
ServerTypeStorage: {
"num_of_processes",
"total_memory_usage",
"system_memory_usage",
"processes_memory_usage",
"ets_memory_usage",
"num_of_processes_5min",
"total_memory_usage_5min",
"system_memory_usage_5min",
"processes_memory_usage_5min",
"ets_memory_usage_5min",
"num_of_writes",
"num_of_reads",
"num_of_deletes",
"num_of_writes_5min",
"num_of_reads_5min",
"num_of_deletes_5min",
"num_of_active_objects",
"total_objects",
"total_size_of_active_objects",
"total_size",
"num_of_replication_messages",
"num_of_sync-vnode_messages",
"num_of_rebalance_messages",
"used_allocated_memory",
"allocated_memory",
"used_allocated_memory_5min",
"allocated_memory_5min",
},
ServerTypeGateway: {
"num_of_processes",
"total_memory_usage",
"system_memory_usage",
"processes_memory_usage",
"ets_memory_usage",
"num_of_processes_5min",
"total_memory_usage_5min",
"system_memory_usage_5min",
"processes_memory_usage_5min",
"ets_memory_usage_5min",
"num_of_writes",
"num_of_reads",
"num_of_deletes",
"num_of_writes_5min",
"num_of_reads_5min",
"num_of_deletes_5min",
"count_of_cache-hit",
"count_of_cache-miss",
"total_of_files",
"total_cached_size",
"used_allocated_memory",
"allocated_memory",
"used_allocated_memory_5min",
"allocated_memory_5min",
},
}
var serverTypeMapping = map[string]ServerType{
"4020": ServerTypeManagerMaster,
"4021": ServerTypeManagerSlave,
"4010": ServerTypeStorage,
"4011": ServerTypeStorage,
"4012": ServerTypeStorage,
"4013": ServerTypeStorage,
"4000": ServerTypeGateway,
"4001": ServerTypeGateway,
}
var sampleConfig = `
# An array of URI to gather stats about LeoFS.
# Specify an ip or hostname with port. ie 127.0.0.1:4020
#
# If no servers are specified, then 127.0.0.1 is used as the host and 4020 as the port.
servers = ["127.0.0.1:4021"]
`
func (l *LeoFS) SampleConfig() string {
return sampleConfig
}
func (l *LeoFS) Description() string {
return "Read metrics from a LeoFS Server via SNMP"
}
func (l *LeoFS) Gather(acc plugins.Accumulator) error {
if len(l.Servers) == 0 {
l.gatherServer(defaultEndpoint, ServerTypeManagerMaster, acc)
return nil
}
var wg sync.WaitGroup
var outerr error
for _, endpoint := range l.Servers {
_, err := url.Parse(endpoint)
if err != nil {
return fmt.Errorf("Unable to parse the address:%s, err:%s", endpoint, err)
}
port, err := retrieveTokenAfterColon(endpoint)
if err != nil {
return err
}
st, ok := serverTypeMapping[port]
if !ok {
st = ServerTypeStorage
}
wg.Add(1)
go func(endpoint string, st ServerType) {
defer wg.Done()
outerr = l.gatherServer(endpoint, st, acc)
}(endpoint, st)
}
wg.Wait()
return outerr
}
func (l *LeoFS) gatherServer(endpoint string, serverType ServerType, acc plugins.Accumulator) error {
cmd := exec.Command("snmpwalk", "-v2c", "-cpublic", endpoint, oid)
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
cmd.Start()
defer cmd.Wait()
scanner := bufio.NewScanner(stdout)
if !scanner.Scan() {
return fmt.Errorf("Unable to retrieve the node name")
}
nodeName, err := retrieveTokenAfterColon(scanner.Text())
if err != nil {
return err
}
nodeNameTrimmed := strings.Trim(nodeName, "\"")
tags := map[string]string{
"node": nodeNameTrimmed,
}
i := 0
for scanner.Scan() {
key := KeyMapping[serverType][i]
val, err := retrieveTokenAfterColon(scanner.Text())
if err != nil {
return err
}
fVal, err := strconv.ParseFloat(val, 64)
if err != nil {
return fmt.Errorf("Unable to parse the value:%s, err:%s", val, err)
}
acc.Add(key, fVal, tags)
i++
}
return nil
}
func retrieveTokenAfterColon(line string) (string, error) {
tokens := strings.Split(line, ":")
if len(tokens) != 2 {
return "", fmt.Errorf("':' not found in the line:%s", line)
}
return strings.TrimSpace(tokens[1]), nil
}
func init() {
plugins.Add("leofs", func() plugins.Plugin {
return &LeoFS{}
})
}

173
plugins/leofs/leofs_test.go Normal file
View File

@ -0,0 +1,173 @@
package leofs
import (
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"io/ioutil"
"log"
"os"
"os/exec"
"testing"
)
var fakeSNMP4Manager = `
package main
import "fmt"
const output = ` + "`" + `iso.3.6.1.4.1.35450.15.1.0 = STRING: "manager_888@127.0.0.1"
iso.3.6.1.4.1.35450.15.2.0 = Gauge32: 186
iso.3.6.1.4.1.35450.15.3.0 = Gauge32: 46235519
iso.3.6.1.4.1.35450.15.4.0 = Gauge32: 32168525
iso.3.6.1.4.1.35450.15.5.0 = Gauge32: 14066068
iso.3.6.1.4.1.35450.15.6.0 = Gauge32: 5512968
iso.3.6.1.4.1.35450.15.7.0 = Gauge32: 186
iso.3.6.1.4.1.35450.15.8.0 = Gauge32: 46269006
iso.3.6.1.4.1.35450.15.9.0 = Gauge32: 32202867
iso.3.6.1.4.1.35450.15.10.0 = Gauge32: 14064995
iso.3.6.1.4.1.35450.15.11.0 = Gauge32: 5492634
iso.3.6.1.4.1.35450.15.12.0 = Gauge32: 60
iso.3.6.1.4.1.35450.15.13.0 = Gauge32: 43515904
iso.3.6.1.4.1.35450.15.14.0 = Gauge32: 60
iso.3.6.1.4.1.35450.15.15.0 = Gauge32: 43533983` + "`" +
`
func main() {
fmt.Println(output)
}
`
var fakeSNMP4Storage = `
package main
import "fmt"
const output = ` + "`" + `iso.3.6.1.4.1.35450.34.1.0 = STRING: "storage_0@127.0.0.1"
iso.3.6.1.4.1.35450.34.2.0 = Gauge32: 512
iso.3.6.1.4.1.35450.34.3.0 = Gauge32: 38126307
iso.3.6.1.4.1.35450.34.4.0 = Gauge32: 22308716
iso.3.6.1.4.1.35450.34.5.0 = Gauge32: 15816448
iso.3.6.1.4.1.35450.34.6.0 = Gauge32: 5232008
iso.3.6.1.4.1.35450.34.7.0 = Gauge32: 512
iso.3.6.1.4.1.35450.34.8.0 = Gauge32: 38113176
iso.3.6.1.4.1.35450.34.9.0 = Gauge32: 22313398
iso.3.6.1.4.1.35450.34.10.0 = Gauge32: 15798779
iso.3.6.1.4.1.35450.34.11.0 = Gauge32: 5237315
iso.3.6.1.4.1.35450.34.12.0 = Gauge32: 191
iso.3.6.1.4.1.35450.34.13.0 = Gauge32: 824
iso.3.6.1.4.1.35450.34.14.0 = Gauge32: 0
iso.3.6.1.4.1.35450.34.15.0 = Gauge32: 50105
iso.3.6.1.4.1.35450.34.16.0 = Gauge32: 196654
iso.3.6.1.4.1.35450.34.17.0 = Gauge32: 0
iso.3.6.1.4.1.35450.34.18.0 = Gauge32: 2052
iso.3.6.1.4.1.35450.34.19.0 = Gauge32: 50296
iso.3.6.1.4.1.35450.34.20.0 = Gauge32: 35
iso.3.6.1.4.1.35450.34.21.0 = Gauge32: 898
iso.3.6.1.4.1.35450.34.22.0 = Gauge32: 0
iso.3.6.1.4.1.35450.34.23.0 = Gauge32: 0
iso.3.6.1.4.1.35450.34.24.0 = Gauge32: 0
iso.3.6.1.4.1.35450.34.31.0 = Gauge32: 51
iso.3.6.1.4.1.35450.34.32.0 = Gauge32: 53219328
iso.3.6.1.4.1.35450.34.33.0 = Gauge32: 51
iso.3.6.1.4.1.35450.34.34.0 = Gauge32: 53351083` + "`" +
`
func main() {
fmt.Println(output)
}
`
var fakeSNMP4Gateway = `
package main
import "fmt"
const output = ` + "`" + `iso.3.6.1.4.1.35450.34.1.0 = STRING: "gateway_0@127.0.0.1"
iso.3.6.1.4.1.35450.34.2.0 = Gauge32: 465
iso.3.6.1.4.1.35450.34.3.0 = Gauge32: 61676335
iso.3.6.1.4.1.35450.34.4.0 = Gauge32: 46890415
iso.3.6.1.4.1.35450.34.5.0 = Gauge32: 14785011
iso.3.6.1.4.1.35450.34.6.0 = Gauge32: 5578855
iso.3.6.1.4.1.35450.34.7.0 = Gauge32: 465
iso.3.6.1.4.1.35450.34.8.0 = Gauge32: 61644426
iso.3.6.1.4.1.35450.34.9.0 = Gauge32: 46880358
iso.3.6.1.4.1.35450.34.10.0 = Gauge32: 14763002
iso.3.6.1.4.1.35450.34.11.0 = Gauge32: 5582125
iso.3.6.1.4.1.35450.34.12.0 = Gauge32: 191
iso.3.6.1.4.1.35450.34.13.0 = Gauge32: 827
iso.3.6.1.4.1.35450.34.14.0 = Gauge32: 0
iso.3.6.1.4.1.35450.34.15.0 = Gauge32: 50105
iso.3.6.1.4.1.35450.34.16.0 = Gauge32: 196650
iso.3.6.1.4.1.35450.34.17.0 = Gauge32: 0
iso.3.6.1.4.1.35450.34.18.0 = Gauge32: 30256
iso.3.6.1.4.1.35450.34.19.0 = Gauge32: 532158
iso.3.6.1.4.1.35450.34.20.0 = Gauge32: 34
iso.3.6.1.4.1.35450.34.21.0 = Gauge32: 1
iso.3.6.1.4.1.35450.34.31.0 = Gauge32: 53
iso.3.6.1.4.1.35450.34.32.0 = Gauge32: 55050240
iso.3.6.1.4.1.35450.34.33.0 = Gauge32: 53
iso.3.6.1.4.1.35450.34.34.0 = Gauge32: 55186538` + "`" +
`
func main() {
fmt.Println(output)
}
`
func makeFakeSNMPSrc(code string) string {
path := os.TempDir() + "/test.go"
err := ioutil.WriteFile(path, []byte(code), 0600)
if err != nil {
log.Fatalln(err)
}
return path
}
func buildFakeSNMPCmd(src string) {
err := exec.Command("go", "build", "-o", "snmpwalk", src).Run()
if err != nil {
log.Fatalln(err)
}
}
func testMain(t *testing.T, code string, endpoint string, serverType ServerType) {
// Build the fake snmpwalk for test
src := makeFakeSNMPSrc(code)
defer os.Remove(src)
buildFakeSNMPCmd(src)
defer os.Remove("./snmpwalk")
envPathOrigin := os.Getenv("PATH")
// Refer to the fake snmpwalk
os.Setenv("PATH", ".")
defer os.Setenv("PATH", envPathOrigin)
l := &LeoFS{
Servers: []string{endpoint},
}
var acc testutil.Accumulator
err := l.Gather(&acc)
require.NoError(t, err)
floatMetrics := KeyMapping[serverType]
for _, metric := range floatMetrics {
assert.True(t, acc.HasFloatValue(metric), metric)
}
}
func TestLeoFSManagerMasterMetrics(t *testing.T) {
testMain(t, fakeSNMP4Manager, "localhost:4020", ServerTypeManagerMaster)
}
func TestLeoFSManagerSlaveMetrics(t *testing.T) {
testMain(t, fakeSNMP4Manager, "localhost:4021", ServerTypeManagerSlave)
}
func TestLeoFSStorageMetrics(t *testing.T) {
testMain(t, fakeSNMP4Storage, "localhost:4010", ServerTypeStorage)
}
func TestLeoFSGatewayMetrics(t *testing.T) {
testMain(t, fakeSNMP4Gateway, "localhost:4000", ServerTypeGateway)
}