package amqp

import (
	"bytes"
	"fmt"
	"log"
	"strings"
	"time"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/internal"
	"github.com/influxdata/telegraf/internal/tls"
	"github.com/influxdata/telegraf/plugins/outputs"
	"github.com/influxdata/telegraf/plugins/serializers"
	"github.com/streadway/amqp"
)

const (
	DefaultURL             = "amqp://localhost:5672/influxdb"
	DefaultAuthMethod      = "PLAIN"
	DefaultExchangeType    = "topic"
	DefaultRetentionPolicy = "default"
	DefaultDatabase        = "telegraf"
)

type externalAuth struct{}

func (a *externalAuth) Mechanism() string {
	return "EXTERNAL"
}

func (a *externalAuth) Response() string {
	return fmt.Sprintf("\000")
}

type AMQP struct {
	URL                string            `toml:"url"` // deprecated in 1.7; use brokers
	Brokers            []string          `toml:"brokers"`
	Exchange           string            `toml:"exchange"`
	ExchangeType       string            `toml:"exchange_type"`
	ExchangePassive    bool              `toml:"exchange_passive"`
	ExchangeDurability string            `toml:"exchange_durability"`
	ExchangeArguments  map[string]string `toml:"exchange_arguments"`
	Username           string            `toml:"username"`
	Password           string            `toml:"password"`
	MaxMessages        int               `toml:"max_messages"`
	AuthMethod         string            `toml:"auth_method"`
	RoutingTag         string            `toml:"routing_tag"`
	RoutingKey         string            `toml:"routing_key"`
	DeliveryMode       string            `toml:"delivery_mode"`
	Database           string            `toml:"database"`         // deprecated in 1.7; use headers
	RetentionPolicy    string            `toml:"retention_policy"` // deprecated in 1.7; use headers
	Precision          string            `toml:"precision"`        // deprecated; has no effect
	Headers            map[string]string `toml:"headers"`
	Timeout            internal.Duration `toml:"timeout"`
	UseBatchFormat     bool              `toml:"use_batch_format"`
	ContentEncoding    string            `toml:"content_encoding"`
	tls.ClientConfig

	serializer   serializers.Serializer
	connect      func(*ClientConfig) (Client, error)
	client       Client
	config       *ClientConfig
	sentMessages int
	encoder      internal.ContentEncoder
}

type Client interface {
	Publish(key string, body []byte) error
	Close() error
}

var sampleConfig = `
  ## Broker to publish to.
  ##   deprecated in 1.7; use the brokers option
  # url = "amqp://localhost:5672/influxdb"

  ## Brokers to publish to.  If multiple brokers are specified a random broker
  ## will be selected anytime a connection is established.  This can be
  ## helpful for load balancing when not using a dedicated load balancer.
  brokers = ["amqp://localhost:5672/influxdb"]

  ## Maximum messages to send over a connection.  Once this is reached, the
  ## connection is closed and a new connection is made.  This can be helpful for
  ## load balancing when not using a dedicated load balancer.
  # max_messages = 0

  ## Exchange to declare and publish to.
  exchange = "telegraf"

  ## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash".
  # exchange_type = "topic"

  ## If true, exchange will be passively declared.
  # exchange_passive = false

  ## Exchange durability can be either "transient" or "durable".
  # exchange_durability = "durable"

  ## Additional exchange arguments.
  # exchange_arguments = { }
  # exchange_arguments = {"hash_propery" = "timestamp"}

  ## Authentication credentials for the PLAIN auth_method.
  # username = ""
  # password = ""

  ## Auth method. PLAIN and EXTERNAL are supported
  ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
  ## described here: https://www.rabbitmq.com/plugins.html
  # auth_method = "PLAIN"

  ## Metric tag to use as a routing key.
  ##   ie, if this tag exists, its value will be used as the routing key
  # routing_tag = "host"

  ## Static routing key.  Used when no routing_tag is set or as a fallback
  ## when the tag specified in routing tag is not found.
  # routing_key = ""
  # routing_key = "telegraf"

  ## Delivery Mode controls if a published message is persistent.
  ##   One of "transient" or "persistent".
  # delivery_mode = "transient"

  ## InfluxDB database added as a message header.
  ##   deprecated in 1.7; use the headers option
  # database = "telegraf"

  ## InfluxDB retention policy added as a message header
  ##   deprecated in 1.7; use the headers option
  # retention_policy = "default"

  ## Static headers added to each published message.
  # headers = { }
  # headers = {"database" = "telegraf", "retention_policy" = "default"}

  ## Connection timeout.  If not provided, will default to 5s.  0s means no
  ## timeout (not recommended).
  # timeout = "5s"

  ## Optional TLS Config
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## If true use batch serialization format instead of line based delimiting.
  ## Only applies to data formats which are not line based such as JSON.
  ## Recommended to set to true.
  # use_batch_format = false

  ## Content encoding for message payloads, can be set to "gzip" to or
  ## "identity" to apply no encoding.
  ##
  ## Please note that when use_batch_format = false each amqp message contains only
  ## a single metric, it is recommended to use compression with batch format
  ## for best results.
  # content_encoding = "identity"

  ## Data format to output.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
  # data_format = "influx"
`

func (q *AMQP) SampleConfig() string {
	return sampleConfig
}

func (q *AMQP) Description() string {
	return "Publishes metrics to an AMQP broker"
}

func (q *AMQP) SetSerializer(serializer serializers.Serializer) {
	q.serializer = serializer
}

func (q *AMQP) Connect() error {
	if q.config == nil {
		config, err := q.makeClientConfig()
		if err != nil {
			return err
		}
		q.config = config
	}

	var err error
	q.encoder, err = internal.NewContentEncoder(q.ContentEncoding)
	if err != nil {
		return err
	}

	q.client, err = q.connect(q.config)
	if err != nil {
		return err
	}

	return nil
}

func (q *AMQP) Close() error {
	if q.client != nil {
		return q.client.Close()
	}
	return nil
}

func (q *AMQP) routingKey(metric telegraf.Metric) string {
	if q.RoutingTag != "" {
		key, ok := metric.GetTag(q.RoutingTag)
		if ok {
			return key
		}
	}
	return q.RoutingKey
}

func (q *AMQP) Write(metrics []telegraf.Metric) error {
	batches := make(map[string][]telegraf.Metric)
	if q.ExchangeType == "header" {
		// Since the routing_key is ignored for this exchange type send as a
		// single batch.
		batches[""] = metrics
	} else {
		for _, metric := range metrics {
			routingKey := q.routingKey(metric)
			if _, ok := batches[routingKey]; !ok {
				batches[routingKey] = make([]telegraf.Metric, 0)
			}

			batches[routingKey] = append(batches[routingKey], metric)
		}
	}

	first := true
	for key, metrics := range batches {
		body, err := q.serialize(metrics)
		if err != nil {
			return err
		}

		body, err = q.encoder.Encode(body)
		if err != nil {
			return err
		}

		err = q.publish(key, body)
		if err != nil {
			// If this is the first attempt to publish and the connection is
			// closed, try to reconnect and retry once.
			if aerr, ok := err.(*amqp.Error); first && ok && aerr == amqp.ErrClosed {
				first = false
				q.client = nil
				err := q.publish(key, body)
				if err != nil {
					return err
				}
			} else {
				q.client = nil
				return err
			}
		}
		first = false
	}

	if q.sentMessages >= q.MaxMessages && q.MaxMessages > 0 {
		log.Printf("D! Output [amqp] sent MaxMessages; closing connection")
		q.client.Close()
		q.client = nil
	}

	return nil
}

func (q *AMQP) publish(key string, body []byte) error {
	if q.client == nil {
		client, err := q.connect(q.config)
		if err != nil {
			return err
		}
		q.sentMessages = 0
		q.client = client
	}

	err := q.client.Publish(key, body)
	if err != nil {
		return err
	}
	q.sentMessages++
	return nil
}

func (q *AMQP) serialize(metrics []telegraf.Metric) ([]byte, error) {
	if q.UseBatchFormat {
		return q.serializer.SerializeBatch(metrics)
	} else {
		var buf bytes.Buffer
		for _, metric := range metrics {
			octets, err := q.serializer.Serialize(metric)
			if err != nil {
				log.Printf("D! [outputs.amqp] Could not serialize metric: %v", err)
				continue
			}
			_, err = buf.Write(octets)
			if err != nil {
				return nil, err
			}
		}
		body := buf.Bytes()
		return body, nil
	}
}

func (q *AMQP) makeClientConfig() (*ClientConfig, error) {
	config := &ClientConfig{
		exchange:        q.Exchange,
		exchangeType:    q.ExchangeType,
		exchangePassive: q.ExchangePassive,
		encoding:        q.ContentEncoding,
		timeout:         q.Timeout.Duration,
	}

	switch q.ExchangeDurability {
	case "transient":
		config.exchangeDurable = false
	default:
		config.exchangeDurable = true
	}

	config.brokers = q.Brokers
	if len(config.brokers) == 0 {
		config.brokers = []string{q.URL}
	}

	switch q.DeliveryMode {
	case "transient":
		config.deliveryMode = amqp.Transient
	case "persistent":
		config.deliveryMode = amqp.Persistent
	default:
		config.deliveryMode = amqp.Transient
	}

	if len(q.Headers) > 0 {
		config.headers = make(amqp.Table, len(q.Headers))
		for k, v := range q.Headers {
			config.headers[k] = v
		}
	} else {
		// Copy deprecated fields into message header
		config.headers = amqp.Table{
			"database":         q.Database,
			"retention_policy": q.RetentionPolicy,
		}
	}

	if len(q.ExchangeArguments) > 0 {
		config.exchangeArguments = make(amqp.Table, len(q.ExchangeArguments))
		for k, v := range q.ExchangeArguments {
			config.exchangeArguments[k] = v
		}
	}

	tlsConfig, err := q.ClientConfig.TLSConfig()
	if err != nil {
		return nil, err
	}
	config.tlsConfig = tlsConfig

	var auth []amqp.Authentication
	if strings.ToUpper(q.AuthMethod) == "EXTERNAL" {
		auth = []amqp.Authentication{&externalAuth{}}
	} else if q.Username != "" || q.Password != "" {
		auth = []amqp.Authentication{
			&amqp.PlainAuth{
				Username: q.Username,
				Password: q.Password,
			},
		}
	}
	config.auth = auth

	return config, nil
}

func connect(config *ClientConfig) (Client, error) {
	return Connect(config)
}

func init() {
	outputs.Add("amqp", func() telegraf.Output {
		return &AMQP{
			URL:             DefaultURL,
			ExchangeType:    DefaultExchangeType,
			AuthMethod:      DefaultAuthMethod,
			Database:        DefaultDatabase,
			RetentionPolicy: DefaultRetentionPolicy,
			Timeout:         internal.Duration{Duration: time.Second * 5},
			connect:         connect,
		}
	})
}