package kafka import ( "crypto/tls" "crypto/x509" "errors" "fmt" "github.com/Shopify/sarama" "github.com/influxdb/influxdb/client/v2" "github.com/influxdb/telegraf/plugins/outputs" "io/ioutil" ) type Kafka struct { // Kafka brokers to send metrics to Brokers []string // Kafka topic Topic string // Routing Key Tag RoutingTag string `toml:"routing_tag"` // TLS client certificate Certificate string // TLS client key Key string // TLS certificate authority CA string // Verfiy SSL certificate chain VerifySsl bool tlsConfig tls.Config producer sarama.SyncProducer } var sampleConfig = ` # URLs of kafka brokers brokers = ["localhost:9092"] # Kafka topic for producer messages topic = "telegraf" # Telegraf tag to use as a routing key # ie, if this tag exists, it's value will be used as the routing key routing_tag = "host" # Optional TLS configuration: # Client certificate certificate = "" # Client key key = "" # Certificate authority file ca = "" # Verify SSL certificate chain verify_ssl = false ` func createTlsConfiguration(k *Kafka) (t *tls.Config, err error) { if k.Certificate != "" && k.Key != "" && k.CA != "" { cert, err := tls.LoadX509KeyPair(k.Certificate, k.Key) if err != nil { return nil, errors.New(fmt.Sprintf("Cout not load Kafka TLS client key/certificate: %s", err)) } caCert, err := ioutil.ReadFile(k.CA) if err != nil { return nil, errors.New(fmt.Sprintf("Cout not load Kafka TLS CA: %s", err)) } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) t = &tls.Config{ Certificates: []tls.Certificate{cert}, RootCAs: caCertPool, InsecureSkipVerify: k.VerifySsl, } } // will be nil by default if nothing is provided return t, nil } func (k *Kafka) Connect() error { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message tlsConfig, err := createTlsConfiguration(k) if err != nil { return err } if tlsConfig != nil { config.Net.TLS.Config = tlsConfig config.Net.TLS.Enable = true } producer, err := sarama.NewSyncProducer(k.Brokers, config) if err != nil { return err } k.producer = producer return nil } func (k *Kafka) Close() error { return k.producer.Close() } func (k *Kafka) SampleConfig() string { return sampleConfig } func (k *Kafka) Description() string { return "Configuration for the Kafka server to send metrics to" } func (k *Kafka) Write(points []*client.Point) error { if len(points) == 0 { return nil } for _, p := range points { // Combine tags from Point and BatchPoints and grab the resulting // line-protocol output string to write to Kafka value := p.String() m := &sarama.ProducerMessage{ Topic: k.Topic, Value: sarama.StringEncoder(value), } if h, ok := p.Tags()[k.RoutingTag]; ok { m.Key = sarama.StringEncoder(h) } _, _, err := k.producer.SendMessage(m) if err != nil { return errors.New(fmt.Sprintf("FAILED to send kafka message: %s\n", err)) } } return nil } func init() { outputs.Add("kafka", func() outputs.Output { return &Kafka{} }) }