package amqp

import (



type AMQP struct {
	// AMQP brokers to send metrics to
	URL string
	// AMQP exchange
	Exchange string
	// Routing Key Tag
	RoutingTag string `toml:"routing_tag"`
	// InfluxDB database
	Database string
	// InfluxDB retention policy
	RetentionPolicy string
	// InfluxDB precision
	Precision string

	// Path to CA file
	SSLCA string `toml:"ssl_ca"`
	// Path to host cert file
	SSLCert string `toml:"ssl_cert"`
	// Path to cert key file
	SSLKey string `toml:"ssl_key"`
	// Use SSL but skip chain & host verification
	InsecureSkipVerify bool

	channel *amqp.Channel
	headers amqp.Table

	serializer serializers.Serializer

const (
	DefaultRetentionPolicy = "default"
	DefaultDatabase        = "telegraf"
	DefaultPrecision       = "s"

var sampleConfig = `
  ## AMQP url
  url = "amqp://localhost:5672/influxdb"
  ## AMQP exchange
  exchange = "telegraf"
  ## Telegraf tag to use as a routing key
  ##  ie, if this tag exists, it's value will be used as the routing key
  routing_tag = "host"

  ## InfluxDB retention policy
  # retention_policy = "default"
  ## InfluxDB database
  # database = "telegraf"
  ## InfluxDB precision
  # precision = "s"

  ## Optional SSL Config
  # ssl_ca = "/etc/telegraf/ca.pem"
  # ssl_cert = "/etc/telegraf/cert.pem"
  # ssl_key = "/etc/telegraf/key.pem"
  ## Use SSL but skip chain & host verification
  # insecure_skip_verify = false

  ## Data format to output. This can be "influx" or "graphite"
  ## Each data format has it's own unique set of configuration options, read
  ## more about them here:
  data_format = "influx"

func (a *AMQP) SetSerializer(serializer serializers.Serializer) {
	a.serializer = serializer

func (q *AMQP) Connect() error {
	defer q.Unlock()

	q.headers = amqp.Table{
		"precision":        q.Precision,
		"database":         q.Database,
		"retention_policy": q.RetentionPolicy,

	var connection *amqp.Connection
	// make new tls config
	tls, err := internal.GetTLSConfig(
		q.SSLCert, q.SSLKey, q.SSLCA, q.InsecureSkipVerify)
	if err != nil {
		return err

	if tls != nil {
		connection, err = amqp.DialTLS(q.URL, tls)
	} else {
		connection, err = amqp.Dial(q.URL)
	if err != nil {
		return err
	channel, err := connection.Channel()
	if err != nil {
		return fmt.Errorf("Failed to open a channel: %s", err)

	err = channel.ExchangeDeclare(
		q.Exchange, // name
		"topic",    // type
		true,       // durable
		false,      // delete when unused
		false,      // internal
		false,      // no-wait
		nil,        // arguments
	if err != nil {
		return fmt.Errorf("Failed to declare an exchange: %s", err)
	} = channel
	go func() {
		log.Printf("Closing: %s", <-connection.NotifyClose(make(chan *amqp.Error)))
		log.Printf("Trying to reconnect")
		for err := q.Connect(); err != nil; err = q.Connect() {
			time.Sleep(10 * time.Second)

	return nil

func (q *AMQP) Close() error {

func (q *AMQP) SampleConfig() string {
	return sampleConfig

func (q *AMQP) Description() string {
	return "Configuration for the AMQP server to send metrics to"

func (q *AMQP) Write(metrics []telegraf.Metric) error {
	defer q.Unlock()
	if len(metrics) == 0 {
		return nil
	var outbuf = make(map[string][][]byte)

	for _, metric := range metrics {
		var key string
		if q.RoutingTag != "" {
			if h, ok := metric.Tags()[q.RoutingTag]; ok {
				key = h

		values, err := q.serializer.Serialize(metric)
		if err != nil {
			return err

		for _, value := range values {
			outbuf[key] = append(outbuf[key], []byte(value))

	for key, buf := range outbuf {
		err :=
			q.Exchange, // exchange
			key,        // routing key
			false,      // mandatory
			false,      // immediate
				Headers:     q.headers,
				ContentType: "text/plain",
				Body:        bytes.Join(buf, []byte("\n")),
		if err != nil {
			return fmt.Errorf("FAILED to send amqp message: %s", err)
	return nil

func init() {
	outputs.Add("amqp", func() telegraf.Output {
		return &AMQP{
			Database:        DefaultDatabase,
			Precision:       DefaultPrecision,
			RetentionPolicy: DefaultRetentionPolicy,