package execd

import (
	"bufio"
	"context"
	"fmt"
	"io"
	"log"
	"os/exec"
	"sync"
	"time"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/config"
	"github.com/influxdata/telegraf/plugins/inputs"
	"github.com/influxdata/telegraf/plugins/parsers"
	"github.com/influxdata/telegraf/plugins/parsers/influx"
)

const sampleConfig = `
  ## Program to run as daemon
  command = ["telegraf-smartctl", "-d", "/dev/sda"]

  ## Define how the process is signaled on each collection interval.
  ## Valid values are:
  ##   "none"   : Do not signal anything.
  ##              The process must output metrics by itself.
  ##   "STDIN"   : Send a newline on STDIN.
  ##   "SIGHUP"  : Send a HUP signal. Not available on Windows.
  ##   "SIGUSR1" : Send a USR1 signal. Not available on Windows.
  ##   "SIGUSR2" : Send a USR2 signal. Not available on Windows.
  signal = "none"

  ## Delay before the process is restarted after an unexpected termination
  restart_delay = "10s"

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"
`

type Execd struct {
	Command      []string
	Signal       string
	RestartDelay config.Duration

	acc        telegraf.Accumulator
	cmd        *exec.Cmd
	parser     parsers.Parser
	stdin      io.WriteCloser
	stdout     io.ReadCloser
	stderr     io.ReadCloser
	cancel     context.CancelFunc
	mainLoopWg sync.WaitGroup
}

func (e *Execd) SampleConfig() string {
	return sampleConfig
}

func (e *Execd) Description() string {
	return "Run executable as long-running input plugin"
}

func (e *Execd) SetParser(parser parsers.Parser) {
	e.parser = parser
}

func (e *Execd) Start(acc telegraf.Accumulator) error {
	e.acc = acc

	if len(e.Command) == 0 {
		return fmt.Errorf("FATAL no command specified")
	}

	e.mainLoopWg.Add(1)

	ctx, cancel := context.WithCancel(context.Background())
	e.cancel = cancel

	if err := e.cmdStart(); err != nil {
		return err
	}

	go func() {
		if err := e.cmdLoop(ctx); err != nil {
			log.Printf("Process quit with message: %s", err.Error())
		}
		e.mainLoopWg.Done()
	}()

	return nil
}

func (e *Execd) Stop() {
	// don't try to stop before all stream readers have started.
	e.cancel()
	e.mainLoopWg.Wait()
}

// cmdLoop watches an already running process, restarting it when appropriate.
func (e *Execd) cmdLoop(ctx context.Context) error {
	for {
		// Use a buffered channel to ensure goroutine below can exit
		// if `ctx.Done` is selected and nothing reads on `done` anymore
		done := make(chan error, 1)
		go func() {
			done <- e.cmdWait()
		}()

		select {
		case <-ctx.Done():
			if e.stdin != nil {
				e.stdin.Close()
				gracefulStop(e.cmd, 5*time.Second)
			}
			return nil
		case err := <-done:
			log.Printf("Process %s terminated: %s", e.Command, err)
			if isQuitting(ctx) {
				return err
			}
		}

		log.Printf("Restarting in %s...", time.Duration(e.RestartDelay))

		select {
		case <-ctx.Done():
			return nil
		case <-time.After(time.Duration(e.RestartDelay)):
			// Continue the loop and restart the process
			if err := e.cmdStart(); err != nil {
				return err
			}
		}
	}
}

func isQuitting(ctx context.Context) bool {
	select {
	case <-ctx.Done():
		return true
	default:
		return false
	}
}

func (e *Execd) cmdStart() (err error) {
	if len(e.Command) > 1 {
		e.cmd = exec.Command(e.Command[0], e.Command[1:]...)
	} else {
		e.cmd = exec.Command(e.Command[0])
	}

	e.stdin, err = e.cmd.StdinPipe()
	if err != nil {
		return fmt.Errorf("Error opening stdin pipe: %s", err)
	}

	e.stdout, err = e.cmd.StdoutPipe()
	if err != nil {
		return fmt.Errorf("Error opening stdout pipe: %s", err)
	}

	e.stderr, err = e.cmd.StderrPipe()
	if err != nil {
		return fmt.Errorf("Error opening stderr pipe: %s", err)
	}

	log.Printf("Starting process: %s", e.Command)

	err = e.cmd.Start()
	if err != nil {
		return fmt.Errorf("Error starting process: %s", err)
	}

	return nil
}

func (e *Execd) cmdWait() error {
	var wg sync.WaitGroup
	wg.Add(2)

	go func() {
		e.cmdReadOut(e.stdout)
		wg.Done()
	}()

	go func() {
		e.cmdReadErr(e.stderr)
		wg.Done()
	}()

	wg.Wait()
	return e.cmd.Wait()
}

func (e *Execd) cmdReadOut(out io.Reader) {
	if _, isInfluxParser := e.parser.(*influx.Parser); isInfluxParser {
		// work around the lack of built-in streaming parser. :(
		e.cmdReadOutStream(out)
		return
	}

	scanner := bufio.NewScanner(out)

	for scanner.Scan() {
		metrics, err := e.parser.Parse(scanner.Bytes())
		if err != nil {
			e.acc.AddError(fmt.Errorf("Parse error: %s", err))
		}

		for _, metric := range metrics {
			e.acc.AddMetric(metric)
		}
	}

	if err := scanner.Err(); err != nil {
		e.acc.AddError(fmt.Errorf("Error reading stdout: %s", err))
	}
}

func (e *Execd) cmdReadOutStream(out io.Reader) {
	parser := influx.NewStreamParser(out)

	for {
		metric, err := parser.Next()
		if err != nil {
			if err == influx.EOF {
				break // stream ended
			}
			if parseErr, isParseError := err.(*influx.ParseError); isParseError {
				// parse error.
				e.acc.AddError(parseErr)
				continue
			}
			// some non-recoverable error?
			e.acc.AddError(err)
			return
		}

		e.acc.AddMetric(metric)
	}
}

func (e *Execd) cmdReadErr(out io.Reader) {
	scanner := bufio.NewScanner(out)

	for scanner.Scan() {
		log.Printf("stderr: %q", scanner.Text())
	}

	if err := scanner.Err(); err != nil {
		e.acc.AddError(fmt.Errorf("Error reading stderr: %s", err))
	}
}

func init() {
	inputs.Add("execd", func() telegraf.Input {
		return &Execd{
			Signal:       "none",
			RestartDelay: config.Duration(10 * time.Second),
		}
	})
}