package amqp_consumer

import (
	"fmt"
	"log"
	"strings"
	"sync"
	"time"

	"github.com/streadway/amqp"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/internal"
	"github.com/influxdata/telegraf/plugins/inputs"
	"github.com/influxdata/telegraf/plugins/parsers"
)

// AMQPConsumer is the top level struct for this plugin
type AMQPConsumer struct {
	URL string
	// AMQP exchange
	Exchange string
	// Queue Name
	Queue string
	// Binding Key
	BindingKey string `toml:"binding_key"`

	// Controls how many messages the server will try to keep on the network
	// for consumers before receiving delivery acks.
	PrefetchCount int

	// AMQP Auth method
	AuthMethod string
	// Path to CA file
	SSLCA string `toml:"ssl_ca"`
	// Path to host cert file
	SSLCert string `toml:"ssl_cert"`
	// Path to cert key file
	SSLKey string `toml:"ssl_key"`
	// Use SSL but skip chain & host verification
	InsecureSkipVerify bool

	parser parsers.Parser
	conn   *amqp.Connection
	wg     *sync.WaitGroup
}

type externalAuth struct{}

func (a *externalAuth) Mechanism() string {
	return "EXTERNAL"
}
func (a *externalAuth) Response() string {
	return fmt.Sprintf("\000")
}

const (
	DefaultAuthMethod    = "PLAIN"
	DefaultPrefetchCount = 50
)

func (a *AMQPConsumer) SampleConfig() string {
	return `
  ## AMQP url
  url = "amqp://localhost:5672/influxdb"
  ## AMQP exchange
  exchange = "telegraf"
  ## AMQP queue name
  queue = "telegraf"
  ## Binding Key
  binding_key = "#"

  ## Maximum number of messages server should give to the worker.
  prefetch_count = 50

  ## Auth method. PLAIN and EXTERNAL are supported
  ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
  ## described here: https://www.rabbitmq.com/plugins.html
  # auth_method = "PLAIN"

  ## Optional SSL Config
  # ssl_ca = "/etc/telegraf/ca.pem"
  # ssl_cert = "/etc/telegraf/cert.pem"
  # ssl_key = "/etc/telegraf/key.pem"
  ## Use SSL but skip chain & host verification
  # insecure_skip_verify = false

  ## Data format to output.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
  data_format = "influx"
`
}

func (a *AMQPConsumer) Description() string {
	return "AMQP consumer plugin"
}

func (a *AMQPConsumer) SetParser(parser parsers.Parser) {
	a.parser = parser
}

// All gathering is done in the Start function
func (a *AMQPConsumer) Gather(_ telegraf.Accumulator) error {
	return nil
}

func (a *AMQPConsumer) createConfig() (*amqp.Config, error) {
	// make new tls config
	tls, err := internal.GetTLSConfig(
		a.SSLCert, a.SSLKey, a.SSLCA, a.InsecureSkipVerify)
	if err != nil {
		return nil, err
	}

	// parse auth method
	var sasl []amqp.Authentication // nil by default

	if strings.ToUpper(a.AuthMethod) == "EXTERNAL" {
		sasl = []amqp.Authentication{&externalAuth{}}
	}

	config := amqp.Config{
		TLSClientConfig: tls,
		SASL:            sasl, // if nil, it will be PLAIN
	}
	return &config, nil
}

// Start satisfies the telegraf.ServiceInput interface
func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error {
	amqpConf, err := a.createConfig()
	if err != nil {
		return err
	}

	msgs, err := a.connect(amqpConf)
	if err != nil {
		return err
	}

	a.wg = &sync.WaitGroup{}
	a.wg.Add(1)
	go a.process(msgs, acc)

	go func() {
		err := <-a.conn.NotifyClose(make(chan *amqp.Error))
		if err == nil {
			return
		}

		log.Printf("I! AMQP consumer connection closed: %s; trying to reconnect", err)
		for {
			msgs, err := a.connect(amqpConf)
			if err != nil {
				log.Printf("E! AMQP connection failed: %s", err)
				time.Sleep(10 * time.Second)
				continue
			}

			a.wg.Add(1)
			go a.process(msgs, acc)
			break
		}
	}()

	return nil
}

func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, error) {
	conn, err := amqp.DialConfig(a.URL, *amqpConf)
	if err != nil {
		return nil, err
	}
	a.conn = conn

	ch, err := conn.Channel()
	if err != nil {
		return nil, fmt.Errorf("Failed to open a channel: %s", err)
	}

	err = ch.ExchangeDeclare(
		a.Exchange, // name
		"topic",    // type
		true,       // durable
		false,      // auto-deleted
		false,      // internal
		false,      // no-wait
		nil,        // arguments
	)
	if err != nil {
		return nil, fmt.Errorf("Failed to declare an exchange: %s", err)
	}

	q, err := ch.QueueDeclare(
		a.Queue, // queue
		true,    // durable
		false,   // delete when unused
		false,   // exclusive
		false,   // no-wait
		nil,     // arguments
	)
	if err != nil {
		return nil, fmt.Errorf("Failed to declare a queue: %s", err)
	}

	err = ch.QueueBind(
		q.Name,       // queue
		a.BindingKey, // binding-key
		a.Exchange,   // exchange
		false,
		nil,
	)
	if err != nil {
		return nil, fmt.Errorf("Failed to bind a queue: %s", err)
	}

	err = ch.Qos(
		a.PrefetchCount,
		0,     // prefetch-size
		false, // global
	)
	if err != nil {
		return nil, fmt.Errorf("Failed to set QoS: %s", err)
	}

	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		false,  // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // arguments
	)
	if err != nil {
		return nil, fmt.Errorf("Failed establishing connection to queue: %s", err)
	}

	log.Println("I! Started AMQP consumer")
	return msgs, err
}

// Read messages from queue and add them to the Accumulator
func (a *AMQPConsumer) process(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) {
	defer a.wg.Done()
	for d := range msgs {
		metrics, err := a.parser.Parse(d.Body)
		if err != nil {
			log.Printf("E! %v: error parsing metric - %v", err, string(d.Body))
		} else {
			for _, m := range metrics {
				acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
			}
		}

		d.Ack(false)
	}
	log.Printf("I! AMQP consumer queue closed")
}

func (a *AMQPConsumer) Stop() {
	err := a.conn.Close()
	if err != nil && err != amqp.ErrClosed {
		log.Printf("E! Error closing AMQP connection: %s", err)
		return
	}
	a.wg.Wait()
	log.Println("I! Stopped AMQP service")
}

func init() {
	inputs.Add("amqp_consumer", func() telegraf.Input {
		return &AMQPConsumer{
			AuthMethod:    DefaultAuthMethod,
			PrefetchCount: DefaultPrefetchCount,
		}
	})
}