telegraf/plugins/inputs/leofs/leofs.go

256 lines
5.8 KiB
Go

package leofs
import (
"bufio"
"fmt"
"os/exec"
"strconv"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)
const oid = ".1.3.6.1.4.1.35450"
// For Manager Master
const defaultEndpoint = "127.0.0.1:4020"
type ServerType int
const (
ServerTypeManagerMaster ServerType = iota
ServerTypeManagerSlave
ServerTypeStorage
ServerTypeGateway
)
type LeoFS struct {
Servers []string
}
var KeyMapping = map[ServerType][]string{
ServerTypeManagerMaster: {
"num_of_processes",
"total_memory_usage",
"system_memory_usage",
"processes_memory_usage",
"ets_memory_usage",
"num_of_processes_5min",
"total_memory_usage_5min",
"system_memory_usage_5min",
"processes_memory_usage_5min",
"ets_memory_usage_5min",
"used_allocated_memory",
"allocated_memory",
"used_allocated_memory_5min",
"allocated_memory_5min",
},
ServerTypeManagerSlave: {
"num_of_processes",
"total_memory_usage",
"system_memory_usage",
"processes_memory_usage",
"ets_memory_usage",
"num_of_processes_5min",
"total_memory_usage_5min",
"system_memory_usage_5min",
"processes_memory_usage_5min",
"ets_memory_usage_5min",
"used_allocated_memory",
"allocated_memory",
"used_allocated_memory_5min",
"allocated_memory_5min",
},
ServerTypeStorage: {
"num_of_processes",
"total_memory_usage",
"system_memory_usage",
"processes_memory_usage",
"ets_memory_usage",
"num_of_processes_5min",
"total_memory_usage_5min",
"system_memory_usage_5min",
"processes_memory_usage_5min",
"ets_memory_usage_5min",
"num_of_writes",
"num_of_reads",
"num_of_deletes",
"num_of_writes_5min",
"num_of_reads_5min",
"num_of_deletes_5min",
"num_of_active_objects",
"total_objects",
"total_size_of_active_objects",
"total_size",
"num_of_replication_messages",
"num_of_sync-vnode_messages",
"num_of_rebalance_messages",
"used_allocated_memory",
"allocated_memory",
"used_allocated_memory_5min",
"allocated_memory_5min",
// following items are since LeoFS v1.4.0
"mq_num_of_msg_recovery_node",
"mq_num_of_msg_deletion_dir",
"mq_num_of_msg_async_deletion_dir",
"mq_num_of_msg_req_deletion_dir",
"mq_mdcr_num_of_msg_req_comp_metadata",
"mq_mdcr_num_of_msg_req_sync_obj",
"comp_state",
"comp_last_start_datetime",
"comp_last_end_datetime",
"comp_num_of_pending_targets",
"comp_num_of_ongoing_targets",
"comp_num_of_out_of_targets",
},
ServerTypeGateway: {
"num_of_processes",
"total_memory_usage",
"system_memory_usage",
"processes_memory_usage",
"ets_memory_usage",
"num_of_processes_5min",
"total_memory_usage_5min",
"system_memory_usage_5min",
"processes_memory_usage_5min",
"ets_memory_usage_5min",
"num_of_writes",
"num_of_reads",
"num_of_deletes",
"num_of_writes_5min",
"num_of_reads_5min",
"num_of_deletes_5min",
"count_of_cache-hit",
"count_of_cache-miss",
"total_of_files",
"total_cached_size",
"used_allocated_memory",
"allocated_memory",
"used_allocated_memory_5min",
"allocated_memory_5min",
},
}
var serverTypeMapping = map[string]ServerType{
"4020": ServerTypeManagerMaster,
"4021": ServerTypeManagerSlave,
"4010": ServerTypeStorage,
"4011": ServerTypeStorage,
"4012": ServerTypeStorage,
"4013": ServerTypeStorage,
"4000": ServerTypeGateway,
"4001": ServerTypeGateway,
}
var sampleConfig = `
## An array of URLs of the form:
## host [ ":" port]
servers = ["127.0.0.1:4020"]
`
func (l *LeoFS) SampleConfig() string {
return sampleConfig
}
func (l *LeoFS) Description() string {
return "Read metrics from a LeoFS Server via SNMP"
}
func (l *LeoFS) Gather(acc telegraf.Accumulator) error {
if len(l.Servers) == 0 {
l.gatherServer(defaultEndpoint, ServerTypeManagerMaster, acc)
return nil
}
var wg sync.WaitGroup
for _, endpoint := range l.Servers {
results := strings.Split(endpoint, ":")
port := "4020"
if len(results) > 2 {
acc.AddError(fmt.Errorf("Unable to parse address %q", endpoint))
continue
} else if len(results) == 2 {
if _, err := strconv.Atoi(results[1]); err == nil {
port = results[1]
} else {
acc.AddError(fmt.Errorf("Unable to parse port from %q", endpoint))
continue
}
}
st, ok := serverTypeMapping[port]
if !ok {
st = ServerTypeStorage
}
wg.Add(1)
go func(endpoint string, st ServerType) {
defer wg.Done()
acc.AddError(l.gatherServer(endpoint, st, acc))
}(endpoint, st)
}
wg.Wait()
return nil
}
func (l *LeoFS) gatherServer(
endpoint string,
serverType ServerType,
acc telegraf.Accumulator,
) error {
cmd := exec.Command("snmpwalk", "-v2c", "-cpublic", "-On", endpoint, oid)
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
cmd.Start()
defer internal.WaitTimeout(cmd, time.Second*5)
scanner := bufio.NewScanner(stdout)
if !scanner.Scan() {
return fmt.Errorf("Unable to retrieve the node name")
}
nodeName, err := retrieveTokenAfterColon(scanner.Text())
if err != nil {
return err
}
nodeNameTrimmed := strings.Trim(nodeName, "\"")
tags := map[string]string{
"node": nodeNameTrimmed,
}
i := 0
fields := make(map[string]interface{})
for scanner.Scan() {
key := KeyMapping[serverType][i]
val, err := retrieveTokenAfterColon(scanner.Text())
if err != nil {
return err
}
fVal, err := strconv.ParseFloat(val, 64)
if err != nil {
return fmt.Errorf("Unable to parse the value:%s, err:%s", val, err)
}
fields[key] = fVal
i++
}
acc.AddFields("leofs", fields, tags)
return nil
}
func retrieveTokenAfterColon(line string) (string, error) {
tokens := strings.Split(line, ":")
if len(tokens) != 2 {
return "", fmt.Errorf("':' not found in the line:%s", line)
}
return strings.TrimSpace(tokens[1]), nil
}
func init() {
inputs.Add("leofs", func() telegraf.Input {
return &LeoFS{}
})
}