package telegraf

import (
	"errors"
	"fmt"
	"log"
	"os"
	"sort"
	"strings"
	"sync"
	"time"

	"github.com/influxdb/telegraf/outputs"
	"github.com/influxdb/telegraf/plugins"
)

type runningOutput struct {
	name   string
	output outputs.Output
}

type runningPlugin struct {
	name   string
	plugin plugins.Plugin
	config *ConfiguredPlugin
}

// Agent runs telegraf and collects data based on the given config
type Agent struct {

	// Interval at which to gather information
	Interval Duration

	// Option for outputting data in UTC
	UTC bool `toml:"utc"`

	// Precision to write data at
	// Valid values for Precision are n, u, ms, s, m, and h
	Precision string

	// Option for running in debug mode
	Debug    bool
	Hostname string

	Config *Config

	outputs []*runningOutput
	plugins []*runningPlugin
}

// NewAgent returns an Agent struct based off the given Config
func NewAgent(config *Config) (*Agent, error) {
	agent := &Agent{
		Config:    config,
		Interval:  Duration{10 * time.Second},
		UTC:       true,
		Precision: "s",
	}

	// Apply the toml table to the agent config, overriding defaults
	err := config.ApplyAgent(agent)
	if err != nil {
		return nil, err
	}

	if agent.Hostname == "" {
		hostname, err := os.Hostname()
		if err != nil {
			return nil, err
		}

		agent.Hostname = hostname
	}

	if config.Tags == nil {
		config.Tags = map[string]string{}
	}

	config.Tags["host"] = agent.Hostname

	return agent, nil
}

// Connect connects to all configured outputs
func (a *Agent) Connect() error {
	for _, o := range a.outputs {
		if a.Debug {
			log.Printf("Attempting connection to output: %s\n", o.name)
		}
		err := o.output.Connect()
		if err != nil {
			return err
		}
		if a.Debug {
			log.Printf("Successfully connected to output: %s\n", o.name)
		}
	}
	return nil
}

// Close closes the connection to all configured outputs
func (a *Agent) Close() error {
	var err error
	for _, o := range a.outputs {
		err = o.output.Close()
	}
	return err
}

// LoadOutputs loads the agent's outputs
func (a *Agent) LoadOutputs() ([]string, error) {
	var names []string

	for _, name := range a.Config.OutputsDeclared() {
		creator, ok := outputs.Outputs[name]
		if !ok {
			return nil, fmt.Errorf("Undefined but requested output: %s", name)
		}

		output := creator()

		err := a.Config.ApplyOutput(name, output)
		if err != nil {
			return nil, err
		}

		a.outputs = append(a.outputs, &runningOutput{name, output})
		names = append(names, name)
	}

	sort.Strings(names)

	return names, nil
}

// LoadPlugins loads the agent's plugins
func (a *Agent) LoadPlugins(pluginsFilter string) ([]string, error) {
	var names []string
	var filters []string

	pluginsFilter = strings.TrimSpace(pluginsFilter)
	if pluginsFilter != "" {
		filters = strings.Split(":"+pluginsFilter+":", ":")
	}

	for _, name := range a.Config.PluginsDeclared() {
		creator, ok := plugins.Plugins[name]
		if !ok {
			return nil, fmt.Errorf("Undefined but requested plugin: %s", name)
		}

		isPluginEnabled := false
		if len(filters) > 0 {
			for _, runeValue := range filters {
				if runeValue != "" && strings.ToLower(runeValue) == strings.ToLower(name) {
					fmt.Printf("plugin [%s] is enabled (filter options)\n", name)
					isPluginEnabled = true
					break
				}
			}
		} else {
			// if no filter, we ALWAYS accept the plugin
			isPluginEnabled = true
		}

		if isPluginEnabled {
			plugin := creator()
			config, err := a.Config.ApplyPlugin(name, plugin)
			if err != nil {
				return nil, err
			}

			a.plugins = append(a.plugins, &runningPlugin{name, plugin, config})
			names = append(names, name)
		}
	}

	sort.Strings(names)

	return names, nil
}

// crankParallel runs the plugins that are using the same reporting interval
// as the telegraf agent.
func (a *Agent) crankParallel() error {
	points := make(chan *BatchPoints, len(a.plugins))

	var wg sync.WaitGroup

	for _, plugin := range a.plugins {
		if plugin.config.Interval != 0 {
			continue
		}

		wg.Add(1)
		go func(plugin *runningPlugin) {
			defer wg.Done()

			var bp BatchPoints
			bp.Debug = a.Debug
			bp.Prefix = plugin.name + "_"
			bp.Config = plugin.config
			bp.Precision = a.Precision

			if err := plugin.plugin.Gather(&bp); err != nil {
				log.Printf("Error in plugin [%s]: %s", plugin.name, err)
			}

			points <- &bp
		}(plugin)
	}

	wg.Wait()

	close(points)

	var bp BatchPoints
	bp.Time = time.Now()
	if a.UTC {
		bp.Time = bp.Time.UTC()
	}
	bp.Tags = a.Config.Tags
	bp.Precision = a.Precision

	for sub := range points {
		bp.Points = append(bp.Points, sub.Points...)
	}

	return a.flush(&bp)
}

// crank is mostly for test purposes.
func (a *Agent) crank() error {
	var bp BatchPoints

	bp.Debug = a.Debug
	bp.Precision = a.Precision

	for _, plugin := range a.plugins {
		bp.Prefix = plugin.name + "_"
		bp.Config = plugin.config
		err := plugin.plugin.Gather(&bp)
		if err != nil {
			return err
		}
	}

	bp.Tags = a.Config.Tags
	bp.Time = time.Now()
	if a.UTC {
		bp.Time = bp.Time.UTC()
	}

	return a.flush(&bp)
}

// crankSeparate runs the plugins that have been configured with their own
// reporting interval.
func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error {
	ticker := time.NewTicker(plugin.config.Interval)

	for {
		var bp BatchPoints
		var outerr error

		bp.Debug = a.Debug

		bp.Prefix = plugin.name + "_"
		bp.Config = plugin.config
		bp.Precision = a.Precision

		if err := plugin.plugin.Gather(&bp); err != nil {
			log.Printf("Error in plugin [%s]: %s", plugin.name, err)
			outerr = errors.New("Error encountered processing plugins & outputs")
		}

		bp.Tags = a.Config.Tags
		bp.Time = time.Now()
		if a.UTC {
			bp.Time = bp.Time.UTC()
		}

		if err := a.flush(&bp); err != nil {
			outerr = errors.New("Error encountered processing plugins & outputs")
		}

		if outerr != nil {
			return outerr
		}

		select {
		case <-shutdown:
			return nil
		case <-ticker.C:
			continue
		}
	}
}

func (a *Agent) flush(bp *BatchPoints) error {
	var wg sync.WaitGroup
	var outerr error

	for _, o := range a.outputs {
		wg.Add(1)
		go func(ro *runningOutput) {
			defer wg.Done()
			// Log all output errors:
			if err := ro.output.Write(bp.BatchPoints); err != nil {
				log.Printf("Error in output [%s]: %s", ro.name, err)
				outerr = errors.New("Error encountered flushing outputs")
			}
		}(o)
	}

	wg.Wait()
	return outerr
}

// TestAllPlugins verifies that we can 'Gather' from all plugins with the
// default configuration
func (a *Agent) TestAllPlugins() error {
	var names []string

	for name := range plugins.Plugins {
		names = append(names, name)
	}

	sort.Strings(names)

	var acc BatchPoints
	acc.Debug = true

	fmt.Printf("* Testing all plugins with default configuration\n")

	for _, name := range names {
		plugin := plugins.Plugins[name]()

		fmt.Printf("* Plugin: %s\n", name)

		acc.Prefix = name + "_"
		if err := plugin.Gather(&acc); err != nil {
			return err
		}
	}

	return nil
}

// Test verifies that we can 'Gather' from all plugins with their configured
// Config struct
func (a *Agent) Test() error {
	var acc BatchPoints

	acc.Debug = true

	for _, plugin := range a.plugins {
		acc.Prefix = plugin.name + "_"
		acc.Config = plugin.config

		fmt.Printf("* Plugin: %s\n", plugin.name)
		if plugin.config.Interval != 0 {
			fmt.Printf("* Internal: %s\n", plugin.config.Interval)
		}

		if err := plugin.plugin.Gather(&acc); err != nil {
			return err
		}
	}

	return nil
}

// Run runs the agent daemon, gathering every Interval
func (a *Agent) Run(shutdown chan struct{}) error {
	var wg sync.WaitGroup

	for _, plugin := range a.plugins {
		if plugin.config.Interval != 0 {
			wg.Add(1)
			go func(plugin *runningPlugin) {
				defer wg.Done()
				if err := a.crankSeparate(shutdown, plugin); err != nil {
					log.Printf(err.Error())
				}
			}(plugin)
		}
	}

	defer wg.Wait()

	ticker := time.NewTicker(a.Interval.Duration)

	for {
		if err := a.crankParallel(); err != nil {
			log.Printf(err.Error())
		}

		select {
		case <-shutdown:
			return nil
		case <-ticker.C:
			continue
		}
	}
}