From 381b79e187a4bdb34381c8bcefdd6451718994a7 Mon Sep 17 00:00:00 2001 From: Eugene Dementiev Date: Wed, 16 Mar 2016 21:44:11 +0300 Subject: [PATCH] [amqp output] Allow external auth (cert-based tls auth) --- internal/internal.go | 1 + plugins/outputs/amqp/amqp.go | 32 ++++++++++++++++++++++++++++---- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/internal/internal.go b/internal/internal.go index 82758e5e8..9c3696c3d 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -109,6 +109,7 @@ func GetTLSConfig( RootCAs: caCertPool, InsecureSkipVerify: InsecureSkipVerify, } + t.BuildNameToCertificate() } else { if InsecureSkipVerify { t.InsecureSkipVerify = true diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go index 948007117..c9531b2a5 100644 --- a/plugins/outputs/amqp/amqp.go +++ b/plugins/outputs/amqp/amqp.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "log" + "strings" "sync" "time" @@ -20,6 +21,8 @@ type AMQP struct { URL string // AMQP exchange Exchange string + // AMQP Auth method + AuthMethod string // Routing Key Tag RoutingTag string `toml:"routing_tag"` // InfluxDB database @@ -45,7 +48,17 @@ type AMQP struct { serializer serializers.Serializer } +type externalAuth struct{} + +func (a *externalAuth) Mechanism() string { + return "EXTERNAL" +} +func (a *externalAuth) Response() string { + return fmt.Sprintf("\000") +} + const ( + DefaultAuthMethod = "PLAIN" DefaultRetentionPolicy = "default" DefaultDatabase = "telegraf" DefaultPrecision = "s" @@ -56,6 +69,8 @@ var sampleConfig = ` url = "amqp://localhost:5672/influxdb" ## AMQP exchange exchange = "telegraf" + ## Auth method. PLAIN and EXTERNAL are supported + # auth_method = "PLAIN" ## Telegraf tag to use as a routing key ## ie, if this tag exists, it's value will be used as the routing key routing_tag = "host" @@ -103,11 +118,19 @@ func (q *AMQP) Connect() error { return err } - if tls != nil { - connection, err = amqp.DialTLS(q.URL, tls) - } else { - connection, err = amqp.Dial(q.URL) + // parse auth method + var sasl []amqp.Authentication // nil by default + + if strings.ToUpper(q.AuthMethod) == "EXTERNAL" { + sasl = []amqp.Authentication{&externalAuth{}} } + + amqpConf := amqp.Config{ + TLSClientConfig: tls, + SASL: sasl, // if nil, it will be PLAIN + } + + connection, err = amqp.DialConfig(q.URL, amqpConf) if err != nil { return err } @@ -200,6 +223,7 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error { func init() { outputs.Add("amqp", func() telegraf.Output { return &AMQP{ + AuthMethod: DefaultAuthMethod, Database: DefaultDatabase, Precision: DefaultPrecision, RetentionPolicy: DefaultRetentionPolicy,