package leofs

import (
	"bufio"
	"fmt"
	"log"
	"net/url"
	"os/exec"
	"strconv"
	"strings"
	"sync"
	"time"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/internal"
	"github.com/influxdata/telegraf/plugins/inputs"
)

const oid = ".1.3.6.1.4.1.35450"

// For Manager Master
const defaultEndpoint = "udp://127.0.0.1:4020"

type ServerType int

const (
	ServerTypeManagerMaster ServerType = iota
	ServerTypeManagerSlave
	ServerTypeStorage
	ServerTypeGateway
)

type LeoFS struct {
	Servers []string
}

var KeyMapping = map[ServerType][]string{
	ServerTypeManagerMaster: {
		"num_of_processes",
		"total_memory_usage",
		"system_memory_usage",
		"processes_memory_usage",
		"ets_memory_usage",
		"num_of_processes_5min",
		"total_memory_usage_5min",
		"system_memory_usage_5min",
		"processes_memory_usage_5min",
		"ets_memory_usage_5min",
		"used_allocated_memory",
		"allocated_memory",
		"used_allocated_memory_5min",
		"allocated_memory_5min",
	},
	ServerTypeManagerSlave: {
		"num_of_processes",
		"total_memory_usage",
		"system_memory_usage",
		"processes_memory_usage",
		"ets_memory_usage",
		"num_of_processes_5min",
		"total_memory_usage_5min",
		"system_memory_usage_5min",
		"processes_memory_usage_5min",
		"ets_memory_usage_5min",
		"used_allocated_memory",
		"allocated_memory",
		"used_allocated_memory_5min",
		"allocated_memory_5min",
	},
	ServerTypeStorage: {
		"num_of_processes",
		"total_memory_usage",
		"system_memory_usage",
		"processes_memory_usage",
		"ets_memory_usage",
		"num_of_processes_5min",
		"total_memory_usage_5min",
		"system_memory_usage_5min",
		"processes_memory_usage_5min",
		"ets_memory_usage_5min",
		"num_of_writes",
		"num_of_reads",
		"num_of_deletes",
		"num_of_writes_5min",
		"num_of_reads_5min",
		"num_of_deletes_5min",
		"num_of_active_objects",
		"total_objects",
		"total_size_of_active_objects",
		"total_size",
		"num_of_replication_messages",
		"num_of_sync-vnode_messages",
		"num_of_rebalance_messages",
		"used_allocated_memory",
		"allocated_memory",
		"used_allocated_memory_5min",
		"allocated_memory_5min",
	},
	ServerTypeGateway: {
		"num_of_processes",
		"total_memory_usage",
		"system_memory_usage",
		"processes_memory_usage",
		"ets_memory_usage",
		"num_of_processes_5min",
		"total_memory_usage_5min",
		"system_memory_usage_5min",
		"processes_memory_usage_5min",
		"ets_memory_usage_5min",
		"num_of_writes",
		"num_of_reads",
		"num_of_deletes",
		"num_of_writes_5min",
		"num_of_reads_5min",
		"num_of_deletes_5min",
		"count_of_cache-hit",
		"count_of_cache-miss",
		"total_of_files",
		"total_cached_size",
		"used_allocated_memory",
		"allocated_memory",
		"used_allocated_memory_5min",
		"allocated_memory_5min",
	},
}

var serverTypeMapping = map[string]ServerType{
	"4020": ServerTypeManagerMaster,
	"4021": ServerTypeManagerSlave,
	"4010": ServerTypeStorage,
	"4011": ServerTypeStorage,
	"4012": ServerTypeStorage,
	"4013": ServerTypeStorage,
	"4000": ServerTypeGateway,
	"4001": ServerTypeGateway,
}

var sampleConfig = `
  ## An array of URLs of the form:
  ##   "udp://" host [ ":" port]
  servers = ["udp://127.0.0.1:4020"]
`

func (l *LeoFS) SampleConfig() string {
	return sampleConfig
}

func (l *LeoFS) Description() string {
	return "Read metrics from a LeoFS Server via SNMP"
}

func (l *LeoFS) Gather(acc telegraf.Accumulator) error {
	if len(l.Servers) == 0 {
		l.gatherServer(defaultEndpoint, ServerTypeManagerMaster, acc)
		return nil
	}
	var wg sync.WaitGroup
	for i, endpoint := range l.Servers {
		if !strings.HasPrefix(endpoint, "udp://") {
			// Preserve backwards compatibility for hostnames without a
			// scheme, broken in go 1.8. Remove in Telegraf 2.0
			endpoint = "udp://" + endpoint
			log.Printf("W! [inputs.mongodb] Using %q as connection URL; please update your configuration to use an URL", endpoint)
			l.Servers[i] = endpoint
		}
		u, err := url.Parse(endpoint)
		if err != nil {
			acc.AddError(fmt.Errorf("Unable to parse address %q: %s", endpoint, err))
			continue
		}
		if u.Host == "" {
			acc.AddError(fmt.Errorf("Unable to parse address %q", endpoint))
			continue
		}

		port := u.Port()
		if port == "" {
			port = "4020"
		}
		st, ok := serverTypeMapping[port]
		if !ok {
			st = ServerTypeStorage
		}
		wg.Add(1)
		go func(endpoint string, st ServerType) {
			defer wg.Done()
			acc.AddError(l.gatherServer(endpoint, st, acc))
		}(endpoint, st)
	}
	wg.Wait()
	return nil
}

func (l *LeoFS) gatherServer(
	endpoint string,
	serverType ServerType,
	acc telegraf.Accumulator,
) error {
	cmd := exec.Command("snmpwalk", "-v2c", "-cpublic", endpoint, oid)
	stdout, err := cmd.StdoutPipe()
	if err != nil {
		return err
	}
	cmd.Start()
	defer internal.WaitTimeout(cmd, time.Second*5)
	scanner := bufio.NewScanner(stdout)
	if !scanner.Scan() {
		return fmt.Errorf("Unable to retrieve the node name")
	}
	nodeName, err := retrieveTokenAfterColon(scanner.Text())
	if err != nil {
		return err
	}
	nodeNameTrimmed := strings.Trim(nodeName, "\"")
	tags := map[string]string{
		"node": nodeNameTrimmed,
	}
	i := 0

	fields := make(map[string]interface{})
	for scanner.Scan() {
		key := KeyMapping[serverType][i]
		val, err := retrieveTokenAfterColon(scanner.Text())
		if err != nil {
			return err
		}
		fVal, err := strconv.ParseFloat(val, 64)
		if err != nil {
			return fmt.Errorf("Unable to parse the value:%s, err:%s", val, err)
		}
		fields[key] = fVal
		i++
	}
	acc.AddFields("leofs", fields, tags)
	return nil
}

func retrieveTokenAfterColon(line string) (string, error) {
	tokens := strings.Split(line, ":")
	if len(tokens) != 2 {
		return "", fmt.Errorf("':' not found in the line:%s", line)
	}
	return strings.TrimSpace(tokens[1]), nil
}

func init() {
	inputs.Add("leofs", func() telegraf.Input {
		return &LeoFS{}
	})
}