package leofs import ( "bufio" "fmt" "os/exec" "strconv" "strings" "sync" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" ) const oid = ".1.3.6.1.4.1.35450" // For Manager Master const defaultEndpoint = "127.0.0.1:4020" type ServerType int const ( ServerTypeManagerMaster ServerType = iota ServerTypeManagerSlave ServerTypeStorage ServerTypeGateway ) type LeoFS struct { Servers []string } var KeyMapping = map[ServerType][]string{ ServerTypeManagerMaster: { "num_of_processes", "total_memory_usage", "system_memory_usage", "processes_memory_usage", "ets_memory_usage", "num_of_processes_5min", "total_memory_usage_5min", "system_memory_usage_5min", "processes_memory_usage_5min", "ets_memory_usage_5min", "used_allocated_memory", "allocated_memory", "used_allocated_memory_5min", "allocated_memory_5min", }, ServerTypeManagerSlave: { "num_of_processes", "total_memory_usage", "system_memory_usage", "processes_memory_usage", "ets_memory_usage", "num_of_processes_5min", "total_memory_usage_5min", "system_memory_usage_5min", "processes_memory_usage_5min", "ets_memory_usage_5min", "used_allocated_memory", "allocated_memory", "used_allocated_memory_5min", "allocated_memory_5min", }, ServerTypeStorage: { "num_of_processes", "total_memory_usage", "system_memory_usage", "processes_memory_usage", "ets_memory_usage", "num_of_processes_5min", "total_memory_usage_5min", "system_memory_usage_5min", "processes_memory_usage_5min", "ets_memory_usage_5min", "num_of_writes", "num_of_reads", "num_of_deletes", "num_of_writes_5min", "num_of_reads_5min", "num_of_deletes_5min", "num_of_active_objects", "total_objects", "total_size_of_active_objects", "total_size", "num_of_replication_messages", "num_of_sync-vnode_messages", "num_of_rebalance_messages", "used_allocated_memory", "allocated_memory", "used_allocated_memory_5min", "allocated_memory_5min", // following items are since LeoFS v1.4.0 "mq_num_of_msg_recovery_node", "mq_num_of_msg_deletion_dir", "mq_num_of_msg_async_deletion_dir", "mq_num_of_msg_req_deletion_dir", "mq_mdcr_num_of_msg_req_comp_metadata", "mq_mdcr_num_of_msg_req_sync_obj", "comp_state", "comp_last_start_datetime", "comp_last_end_datetime", "comp_num_of_pending_targets", "comp_num_of_ongoing_targets", "comp_num_of_out_of_targets", }, ServerTypeGateway: { "num_of_processes", "total_memory_usage", "system_memory_usage", "processes_memory_usage", "ets_memory_usage", "num_of_processes_5min", "total_memory_usage_5min", "system_memory_usage_5min", "processes_memory_usage_5min", "ets_memory_usage_5min", "num_of_writes", "num_of_reads", "num_of_deletes", "num_of_writes_5min", "num_of_reads_5min", "num_of_deletes_5min", "count_of_cache-hit", "count_of_cache-miss", "total_of_files", "total_cached_size", "used_allocated_memory", "allocated_memory", "used_allocated_memory_5min", "allocated_memory_5min", }, } var serverTypeMapping = map[string]ServerType{ "4020": ServerTypeManagerMaster, "4021": ServerTypeManagerSlave, "4010": ServerTypeStorage, "4011": ServerTypeStorage, "4012": ServerTypeStorage, "4013": ServerTypeStorage, "4000": ServerTypeGateway, "4001": ServerTypeGateway, } var sampleConfig = ` ## An array of URLs of the form: ## host [ ":" port] servers = ["127.0.0.1:4020"] ` func (l *LeoFS) SampleConfig() string { return sampleConfig } func (l *LeoFS) Description() string { return "Read metrics from a LeoFS Server via SNMP" } func (l *LeoFS) Gather(acc telegraf.Accumulator) error { if len(l.Servers) == 0 { l.gatherServer(defaultEndpoint, ServerTypeManagerMaster, acc) return nil } var wg sync.WaitGroup for _, endpoint := range l.Servers { results := strings.Split(endpoint, ":") port := "4020" if len(results) > 2 { acc.AddError(fmt.Errorf("Unable to parse address %q", endpoint)) continue } else if len(results) == 2 { if _, err := strconv.Atoi(results[1]); err == nil { port = results[1] } else { acc.AddError(fmt.Errorf("Unable to parse port from %q", endpoint)) continue } } st, ok := serverTypeMapping[port] if !ok { st = ServerTypeStorage } wg.Add(1) go func(endpoint string, st ServerType) { defer wg.Done() acc.AddError(l.gatherServer(endpoint, st, acc)) }(endpoint, st) } wg.Wait() return nil } func (l *LeoFS) gatherServer( endpoint string, serverType ServerType, acc telegraf.Accumulator, ) error { cmd := exec.Command("snmpwalk", "-v2c", "-cpublic", "-On", endpoint, oid) stdout, err := cmd.StdoutPipe() if err != nil { return err } cmd.Start() defer internal.WaitTimeout(cmd, time.Second*5) scanner := bufio.NewScanner(stdout) if !scanner.Scan() { return fmt.Errorf("Unable to retrieve the node name") } nodeName, err := retrieveTokenAfterColon(scanner.Text()) if err != nil { return err } nodeNameTrimmed := strings.Trim(nodeName, "\"") tags := map[string]string{ "node": nodeNameTrimmed, } i := 0 fields := make(map[string]interface{}) for scanner.Scan() { key := KeyMapping[serverType][i] val, err := retrieveTokenAfterColon(scanner.Text()) if err != nil { return err } fVal, err := strconv.ParseFloat(val, 64) if err != nil { return fmt.Errorf("Unable to parse the value:%s, err:%s", val, err) } fields[key] = fVal i++ } acc.AddFields("leofs", fields, tags) return nil } func retrieveTokenAfterColon(line string) (string, error) { tokens := strings.Split(line, ":") if len(tokens) != 2 { return "", fmt.Errorf("':' not found in the line:%s", line) } return strings.TrimSpace(tokens[1]), nil } func init() { inputs.Add("leofs", func() telegraf.Input { return &LeoFS{} }) }