kafka: Add support for using TLS authentication for the kafka output

With the advent of Kafka 0.9.0+ it is possible to set up TLS client
certificate based authentication to limit access to Kafka.

Four new configuration variables are specified for setting up the
authentication. If they're not set the behavior stays the same as
before the change.
This commit is contained in:
Hannu Valtonen 2016-01-06 22:50:33 +02:00
parent bdac9b7241
commit 7291ef09d1
1 changed files with 56 additions and 2 deletions

View File

@ -1,12 +1,15 @@
package kafka package kafka
import ( import (
"crypto/tls"
"crypto/x509"
"errors" "errors"
"fmt" "fmt"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/influxdb/influxdb/client/v2" "github.com/influxdb/influxdb/client/v2"
"github.com/influxdb/telegraf/outputs" "github.com/influxdb/telegraf/outputs"
"io/ioutil"
"log"
) )
type Kafka struct { type Kafka struct {
@ -16,7 +19,16 @@ type Kafka struct {
Topic string Topic string
// Routing Key Tag // Routing Key Tag
RoutingTag string `toml:"routing_tag"` RoutingTag string `toml:"routing_tag"`
// TLS client certificate
Certificate string
// TLS client key
Key string
// TLS certificate authority
CA string
// Verfiy SSL certificate chain
VerifySsl bool
tlsConfig tls.Config
producer sarama.SyncProducer producer sarama.SyncProducer
} }
@ -28,10 +40,52 @@ var sampleConfig = `
# Telegraf tag to use as a routing key # Telegraf tag to use as a routing key
# ie, if this tag exists, it's value will be used as the routing key # ie, if this tag exists, it's value will be used as the routing key
routing_tag = "host" routing_tag = "host"
# Client certificate
certificate = ""
# Client key
key = ""
# Certificate authority file
ca = ""
# Verify SSL certificate chain
verify_ssl = false
` `
func createTlsConfiguration(k *Kafka) (t *tls.Config) {
if k.Certificate != "" && k.Key != "" && k.CA != "" {
cert, err := tls.LoadX509KeyPair(k.Certificate, k.Key)
if err != nil {
log.Fatal(err)
}
caCert, err := ioutil.ReadFile(k.CA)
if err != nil {
log.Fatal(err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
t = &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: k.VerifySsl,
}
}
// will be nil by default if nothing is provided
return t
}
func (k *Kafka) Connect() error { func (k *Kafka) Connect() error {
producer, err := sarama.NewSyncProducer(k.Brokers, nil) config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
tlsConfig := createTlsConfiguration(k)
if tlsConfig != nil {
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
}
producer, err := sarama.NewSyncProducer(k.Brokers, config)
if err != nil { if err != nil {
return err return err
} }