diff --git a/CHANGELOG.md b/CHANGELOG.md index f985f210d..d65b8c67c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - [#467](https://github.com/influxdata/telegraf/issues/467): Add option to disable statsd measurement name conversion. - [#534](https://github.com/influxdata/telegraf/pull/534): NSQ input plugin. Thanks @allingeek! - [#494](https://github.com/influxdata/telegraf/pull/494): Graphite output plugin. Thanks @titilambert! +- AMQP SSL support. Thanks @ekini! ### Bugfixes - [#506](https://github.com/influxdb/telegraf/pull/506): Ping input doesn't return response time metric when timeout. Thanks @titilambert! diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go index 6f0e0fde3..e1d6302a1 100644 --- a/plugins/outputs/amqp/amqp.go +++ b/plugins/outputs/amqp/amqp.go @@ -2,7 +2,10 @@ package amqp import ( "bytes" + "crypto/tls" + "crypto/x509" "fmt" + "io/ioutil" "log" "sync" "time" @@ -17,6 +20,12 @@ type AMQP struct { URL string // AMQP exchange Exchange string + // path to CA file + SslCa string + // path to host cert file + SslCert string + // path to cert key file + SslKey string // Routing Key Tag RoutingTag string `toml:"routing_tag"` // InfluxDB database @@ -46,6 +55,11 @@ var sampleConfig = ` # ie, if this tag exists, it's value will be used as the routing key routing_tag = "host" + # Use ssl + #ssl_ca = "/etc/telegraf/ca.pem" + #ssl_cert = "/etc/telegraf/cert.pem" + #ssl_key = "/etc/telegraf/key.pem" + # InfluxDB retention policy #retention_policy = "default" # InfluxDB database @@ -64,7 +78,32 @@ func (q *AMQP) Connect() error { "retention_policy": q.RetentionPolicy, } - connection, err := amqp.Dial(q.URL) + var connection *amqp.Connection + var err error + if q.SslCert != "" && q.SslKey != "" { + // make new tls config + cfg := new(tls.Config) + if q.SslCa != "" { + // create ca pool + cfg.RootCAs = x509.NewCertPool() + + // add self-signed cert + if ca, err := ioutil.ReadFile(q.SslCa); err == nil { + cfg.RootCAs.AppendCertsFromPEM(ca) + } else { + log.Println(err) + } + } + if cert, err := tls.LoadX509KeyPair(q.SslCert, q.SslKey); err == nil { + cfg.Certificates = append(cfg.Certificates, cert) + } else { + log.Println(err) + } + connection, err = amqp.DialTLS(q.URL, cfg) + + } else { + connection, err = amqp.Dial(q.URL) + } if err != nil { return err }