package execd

import (
	"bufio"
	"context"
	"fmt"
	"io"
	"log"
	"os/exec"
	"sync"
	"time"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/internal"
	"github.com/influxdata/telegraf/plugins/inputs"
	"github.com/influxdata/telegraf/plugins/parsers"
)

const sampleConfig = `
  ## Program to run as daemon
  command = ["telegraf-smartctl", "-d", "/dev/sda"]

  ## Define how the process is signaled on each collection interval.
  ## Valid values are:
  ##   "none"   : Do not signal anything.
  ##              The process must output metrics by itself.
  ##   "STDIN"   : Send a newline on STDIN.
  ##   "SIGHUP"  : Send a HUP signal. Not available on Windows.
  ##   "SIGUSR1" : Send a USR1 signal. Not available on Windows.
  ##   "SIGUSR2" : Send a USR2 signal. Not available on Windows.
  signal = "none"

  ## Delay before the process is restarted after an unexpected termination
  restart_delay = "10s"

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"
`

type Execd struct {
	Command      []string
	Signal       string
	RestartDelay internal.Duration

	acc    telegraf.Accumulator
	cmd    *exec.Cmd
	parser parsers.Parser
	stdin  io.WriteCloser
	cancel context.CancelFunc
	wg     sync.WaitGroup
}

func (e *Execd) SampleConfig() string {
	return sampleConfig
}

func (e *Execd) Description() string {
	return "Run executable as long-running input plugin"
}

func (e *Execd) SetParser(parser parsers.Parser) {
	e.parser = parser
}

func (e *Execd) Start(acc telegraf.Accumulator) error {
	e.acc = acc

	if len(e.Command) == 0 {
		return fmt.Errorf("E! [inputs.execd] FATAL no command specified")
	}

	e.wg.Add(1)

	var ctx context.Context
	ctx, e.cancel = context.WithCancel(context.Background())

	go func() {
		e.cmdLoop(ctx)
		e.wg.Done()
	}()

	return nil
}

func (e *Execd) Stop() {
	e.cancel()
	e.wg.Wait()
}

func (e *Execd) cmdLoop(ctx context.Context) {
	for {
		// Use a buffered channel to ensure goroutine below can exit
		// if `ctx.Done` is selected and nothing reads on `done` anymore
		done := make(chan error, 1)
		go func() {
			done <- e.cmdRun()
		}()

		select {
		case <-ctx.Done():
			e.stdin.Close()
			// Immediately exit process but with a graceful shutdown
			// period before killing
			internal.WaitTimeout(e.cmd, 200*time.Millisecond)
			return
		case err := <-done:
			log.Printf("E! [inputs.execd] Process %s terminated: %s", e.Command, err)
		}

		log.Printf("E! [inputs.execd] Restarting in %s...", e.RestartDelay.Duration)

		select {
		case <-ctx.Done():
			return
		case <-time.After(e.RestartDelay.Duration):
			// Continue the loop and restart the process
		}
	}
}

func (e *Execd) cmdRun() error {
	var wg sync.WaitGroup

	if len(e.Command) > 1 {
		e.cmd = exec.Command(e.Command[0], e.Command[1:]...)
	} else {
		e.cmd = exec.Command(e.Command[0])
	}

	stdin, err := e.cmd.StdinPipe()
	if err != nil {
		return fmt.Errorf("E! [inputs.execd] Error opening stdin pipe: %s", err)
	}

	e.stdin = stdin

	stdout, err := e.cmd.StdoutPipe()
	if err != nil {
		return fmt.Errorf("E! [inputs.execd] Error opening stdout pipe: %s", err)
	}

	stderr, err := e.cmd.StderrPipe()
	if err != nil {
		return fmt.Errorf("E! [inputs.execd] Error opening stderr pipe: %s", err)
	}

	log.Printf("D! [inputs.execd] Starting process: %s", e.Command)

	err = e.cmd.Start()
	if err != nil {
		return fmt.Errorf("E! [inputs.execd] Error starting process: %s", err)
	}

	wg.Add(2)

	go func() {
		e.cmdReadOut(stdout)
		wg.Done()
	}()

	go func() {
		e.cmdReadErr(stderr)
		wg.Done()
	}()

	wg.Wait()
	return e.cmd.Wait()
}

func (e *Execd) cmdReadOut(out io.Reader) {
	scanner := bufio.NewScanner(out)

	for scanner.Scan() {
		metrics, err := e.parser.Parse(scanner.Bytes())
		if err != nil {
			e.acc.AddError(fmt.Errorf("E! [inputs.execd] Parse error: %s", err))
		}

		for _, metric := range metrics {
			e.acc.AddMetric(metric)
		}
	}

	if err := scanner.Err(); err != nil {
		e.acc.AddError(fmt.Errorf("E! [inputs.execd] Error reading stdout: %s", err))
	}
}

func (e *Execd) cmdReadErr(out io.Reader) {
	scanner := bufio.NewScanner(out)

	for scanner.Scan() {
		log.Printf("E! [inputs.execd] stderr: %q", scanner.Text())
	}

	if err := scanner.Err(); err != nil {
		e.acc.AddError(fmt.Errorf("E! [inputs.execd] Error reading stderr: %s", err))
	}
}

func init() {
	inputs.Add("execd", func() telegraf.Input {
		return &Execd{
			Signal:       "none",
			RestartDelay: internal.Duration{Duration: 10 * time.Second},
		}
	})
}