[amqp output] Allow external auth (cert-based tls auth)
This commit is contained in:
parent
f8e9fafda3
commit
381b79e187
|
@ -109,6 +109,7 @@ func GetTLSConfig(
|
||||||
RootCAs: caCertPool,
|
RootCAs: caCertPool,
|
||||||
InsecureSkipVerify: InsecureSkipVerify,
|
InsecureSkipVerify: InsecureSkipVerify,
|
||||||
}
|
}
|
||||||
|
t.BuildNameToCertificate()
|
||||||
} else {
|
} else {
|
||||||
if InsecureSkipVerify {
|
if InsecureSkipVerify {
|
||||||
t.InsecureSkipVerify = true
|
t.InsecureSkipVerify = true
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -20,6 +21,8 @@ type AMQP struct {
|
||||||
URL string
|
URL string
|
||||||
// AMQP exchange
|
// AMQP exchange
|
||||||
Exchange string
|
Exchange string
|
||||||
|
// AMQP Auth method
|
||||||
|
AuthMethod string
|
||||||
// Routing Key Tag
|
// Routing Key Tag
|
||||||
RoutingTag string `toml:"routing_tag"`
|
RoutingTag string `toml:"routing_tag"`
|
||||||
// InfluxDB database
|
// InfluxDB database
|
||||||
|
@ -45,7 +48,17 @@ type AMQP struct {
|
||||||
serializer serializers.Serializer
|
serializer serializers.Serializer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type externalAuth struct{}
|
||||||
|
|
||||||
|
func (a *externalAuth) Mechanism() string {
|
||||||
|
return "EXTERNAL"
|
||||||
|
}
|
||||||
|
func (a *externalAuth) Response() string {
|
||||||
|
return fmt.Sprintf("\000")
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
DefaultAuthMethod = "PLAIN"
|
||||||
DefaultRetentionPolicy = "default"
|
DefaultRetentionPolicy = "default"
|
||||||
DefaultDatabase = "telegraf"
|
DefaultDatabase = "telegraf"
|
||||||
DefaultPrecision = "s"
|
DefaultPrecision = "s"
|
||||||
|
@ -56,6 +69,8 @@ var sampleConfig = `
|
||||||
url = "amqp://localhost:5672/influxdb"
|
url = "amqp://localhost:5672/influxdb"
|
||||||
## AMQP exchange
|
## AMQP exchange
|
||||||
exchange = "telegraf"
|
exchange = "telegraf"
|
||||||
|
## Auth method. PLAIN and EXTERNAL are supported
|
||||||
|
# auth_method = "PLAIN"
|
||||||
## Telegraf tag to use as a routing key
|
## Telegraf tag to use as a routing key
|
||||||
## ie, if this tag exists, it's value will be used as the routing key
|
## ie, if this tag exists, it's value will be used as the routing key
|
||||||
routing_tag = "host"
|
routing_tag = "host"
|
||||||
|
@ -103,11 +118,19 @@ func (q *AMQP) Connect() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if tls != nil {
|
// parse auth method
|
||||||
connection, err = amqp.DialTLS(q.URL, tls)
|
var sasl []amqp.Authentication // nil by default
|
||||||
} else {
|
|
||||||
connection, err = amqp.Dial(q.URL)
|
if strings.ToUpper(q.AuthMethod) == "EXTERNAL" {
|
||||||
|
sasl = []amqp.Authentication{&externalAuth{}}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
amqpConf := amqp.Config{
|
||||||
|
TLSClientConfig: tls,
|
||||||
|
SASL: sasl, // if nil, it will be PLAIN
|
||||||
|
}
|
||||||
|
|
||||||
|
connection, err = amqp.DialConfig(q.URL, amqpConf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -200,6 +223,7 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
|
||||||
func init() {
|
func init() {
|
||||||
outputs.Add("amqp", func() telegraf.Output {
|
outputs.Add("amqp", func() telegraf.Output {
|
||||||
return &AMQP{
|
return &AMQP{
|
||||||
|
AuthMethod: DefaultAuthMethod,
|
||||||
Database: DefaultDatabase,
|
Database: DefaultDatabase,
|
||||||
Precision: DefaultPrecision,
|
Precision: DefaultPrecision,
|
||||||
RetentionPolicy: DefaultRetentionPolicy,
|
RetentionPolicy: DefaultRetentionPolicy,
|
||||||
|
|
Loading…
Reference in New Issue