diff --git a/CHANGELOG.md b/CHANGELOG.md index 82a447e0f..080ce5870 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ - AMQP SSL support. Thanks @ekini! - [#539](https://github.com/influxdata/telegraf/pull/539): Reload config on SIGHUP. Thanks @titilambert! - [#522](https://github.com/influxdata/telegraf/pull/522): Phusion passenger input plugin. Thanks @kureikain! +- [#541](https://github.com/influxdata/telegraf/pull/541): Kafka output TLS cert support. Thanks @Ormod! ### Bugfixes - [#506](https://github.com/influxdb/telegraf/pull/506): Ping input doesn't return response time metric when timeout. Thanks @titilambert! diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 8e53cc511..55ef35fb4 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -1,12 +1,14 @@ package kafka import ( + "crypto/tls" + "crypto/x509" "errors" "fmt" - "github.com/Shopify/sarama" "github.com/influxdb/influxdb/client/v2" "github.com/influxdb/telegraf/plugins/outputs" + "io/ioutil" ) type Kafka struct { @@ -16,8 +18,17 @@ type Kafka struct { Topic string // Routing Key Tag RoutingTag string `toml:"routing_tag"` + // TLS client certificate + Certificate string + // TLS client key + Key string + // TLS certificate authority + CA string + // Verfiy SSL certificate chain + VerifySsl bool - producer sarama.SyncProducer + tlsConfig tls.Config + producer sarama.SyncProducer } var sampleConfig = ` @@ -28,10 +39,60 @@ var sampleConfig = ` # Telegraf tag to use as a routing key # ie, if this tag exists, it's value will be used as the routing key routing_tag = "host" + + # Optional TLS configuration: + # Client certificate + certificate = "" + # Client key + key = "" + # Certificate authority file + ca = "" + # Verify SSL certificate chain + verify_ssl = false ` +func createTlsConfiguration(k *Kafka) (t *tls.Config, err error) { + if k.Certificate != "" && k.Key != "" && k.CA != "" { + cert, err := tls.LoadX509KeyPair(k.Certificate, k.Key) + if err != nil { + return nil, errors.New(fmt.Sprintf("Cout not load Kafka TLS client key/certificate: %s", + err)) + } + + caCert, err := ioutil.ReadFile(k.CA) + if err != nil { + return nil, errors.New(fmt.Sprintf("Cout not load Kafka TLS CA: %s", + err)) + } + + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + t = &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + InsecureSkipVerify: k.VerifySsl, + } + } + // will be nil by default if nothing is provided + return t, nil +} + func (k *Kafka) Connect() error { - producer, err := sarama.NewSyncProducer(k.Brokers, nil) + config := sarama.NewConfig() + config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message + config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message + tlsConfig, err := createTlsConfiguration(k) + if err != nil { + return err + } + + if tlsConfig != nil { + config.Net.TLS.Config = tlsConfig + config.Net.TLS.Enable = true + } + + producer, err := sarama.NewSyncProducer(k.Brokers, config) if err != nil { return err }