From 7291ef09d1d1702483f3c982f39390c1ab0987a3 Mon Sep 17 00:00:00 2001 From: Hannu Valtonen Date: Wed, 6 Jan 2016 22:50:33 +0200 Subject: [PATCH] kafka: Add support for using TLS authentication for the kafka output With the advent of Kafka 0.9.0+ it is possible to set up TLS client certificate based authentication to limit access to Kafka. Four new configuration variables are specified for setting up the authentication. If they're not set the behavior stays the same as before the change. --- outputs/kafka/kafka.go | 58 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 56 insertions(+), 2 deletions(-) diff --git a/outputs/kafka/kafka.go b/outputs/kafka/kafka.go index fae955210..f59e8a473 100644 --- a/outputs/kafka/kafka.go +++ b/outputs/kafka/kafka.go @@ -1,12 +1,15 @@ package kafka import ( + "crypto/tls" + "crypto/x509" "errors" "fmt" - "github.com/Shopify/sarama" "github.com/influxdb/influxdb/client/v2" "github.com/influxdb/telegraf/outputs" + "io/ioutil" + "log" ) type Kafka struct { @@ -16,7 +19,16 @@ type Kafka struct { Topic string // Routing Key Tag RoutingTag string `toml:"routing_tag"` + // TLS client certificate + Certificate string + // TLS client key + Key string + // TLS certificate authority + CA string + // Verfiy SSL certificate chain + VerifySsl bool + tlsConfig tls.Config producer sarama.SyncProducer } @@ -28,10 +40,52 @@ var sampleConfig = ` # Telegraf tag to use as a routing key # ie, if this tag exists, it's value will be used as the routing key routing_tag = "host" + # Client certificate + certificate = "" + # Client key + key = "" + # Certificate authority file + ca = "" + # Verify SSL certificate chain + verify_ssl = false ` +func createTlsConfiguration(k *Kafka) (t *tls.Config) { + if k.Certificate != "" && k.Key != "" && k.CA != "" { + cert, err := tls.LoadX509KeyPair(k.Certificate, k.Key) + if err != nil { + log.Fatal(err) + } + + caCert, err := ioutil.ReadFile(k.CA) + if err != nil { + log.Fatal(err) + } + + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + t = &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + InsecureSkipVerify: k.VerifySsl, + } + } + // will be nil by default if nothing is provided + return t +} + func (k *Kafka) Connect() error { - producer, err := sarama.NewSyncProducer(k.Brokers, nil) + config := sarama.NewConfig() + config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message + config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message + tlsConfig := createTlsConfiguration(k) + if tlsConfig != nil { + config.Net.TLS.Config = tlsConfig + config.Net.TLS.Enable = true + } + + producer, err := sarama.NewSyncProducer(k.Brokers, config) if err != nil { return err }