This commit is contained in:
Hannu Valtonen 2016-01-06 23:29:56 +00:00
commit 8bf61cd891
1 changed files with 56 additions and 2 deletions

View File

@ -1,12 +1,15 @@
package kafka package kafka
import ( import (
"crypto/tls"
"crypto/x509"
"errors" "errors"
"fmt" "fmt"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/influxdb/influxdb/client/v2" "github.com/influxdb/influxdb/client/v2"
"github.com/influxdb/telegraf/outputs" "github.com/influxdb/telegraf/outputs"
"io/ioutil"
"log"
) )
type Kafka struct { type Kafka struct {
@ -16,7 +19,16 @@ type Kafka struct {
Topic string Topic string
// Routing Key Tag // Routing Key Tag
RoutingTag string `toml:"routing_tag"` RoutingTag string `toml:"routing_tag"`
// TLS client certificate
Certificate string
// TLS client key
Key string
// TLS certificate authority
CA string
// Verfiy SSL certificate chain
VerifySsl bool
tlsConfig tls.Config
producer sarama.SyncProducer producer sarama.SyncProducer
} }
@ -28,10 +40,52 @@ var sampleConfig = `
# Telegraf tag to use as a routing key # Telegraf tag to use as a routing key
# ie, if this tag exists, it's value will be used as the routing key # ie, if this tag exists, it's value will be used as the routing key
routing_tag = "host" routing_tag = "host"
# Client certificate
certificate = ""
# Client key
key = ""
# Certificate authority file
ca = ""
# Verify SSL certificate chain
verify_ssl = false
` `
func createTlsConfiguration(k *Kafka) (t *tls.Config) {
if k.Certificate != "" && k.Key != "" && k.CA != "" {
cert, err := tls.LoadX509KeyPair(k.Certificate, k.Key)
if err != nil {
log.Fatal(err)
}
caCert, err := ioutil.ReadFile(k.CA)
if err != nil {
log.Fatal(err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
t = &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: k.VerifySsl,
}
}
// will be nil by default if nothing is provided
return t
}
func (k *Kafka) Connect() error { func (k *Kafka) Connect() error {
producer, err := sarama.NewSyncProducer(k.Brokers, nil) config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
tlsConfig := createTlsConfiguration(k)
if tlsConfig != nil {
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
}
producer, err := sarama.NewSyncProducer(k.Brokers, config)
if err != nil { if err != nil {
return err return err
} }