From c8f00030dd94155e0a94c9f6684e635f2c0ec0a8 Mon Sep 17 00:00:00 2001 From: Rion Date: Fri, 13 Jul 2018 13:53:56 -0700 Subject: [PATCH] Add support for setting kafka client id (#4418) --- plugins/inputs/kafka_consumer/README.md | 3 +++ plugins/inputs/kafka_consumer/kafka_consumer.go | 10 ++++++++++ plugins/outputs/kafka/README.md | 3 +++ plugins/outputs/kafka/kafka.go | 11 +++++++++++ 4 files changed, 27 insertions(+) diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index 67dbb539e..d794377fa 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -22,6 +22,9 @@ and use the old zookeeper connection method. ## Offset (must be either "oldest" or "newest") offset = "oldest" + ## Optional client id + # client_id = "my_client" + ## Optional TLS Config # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index bf74dd5ab..72172bcb6 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -17,6 +17,7 @@ import ( type Kafka struct { ConsumerGroup string + ClientID string `toml:"client_id"` Topics []string Brokers []string MaxMessageLen int @@ -59,6 +60,9 @@ var sampleConfig = ` brokers = ["localhost:9092"] ## topic(s) to consume topics = ["telegraf"] + + ## Optional Client id + # client_id = "Telegraf" ## Optional TLS Config # tls_ca = "/etc/telegraf/ca.pem" @@ -114,6 +118,12 @@ func (k *Kafka) Start(acc telegraf.Accumulator) error { return err } + if k.ClientID != "" { + config.ClientID = k.ClientID + } else { + config.ClientID = "Telegraf" + } + if tlsConfig != nil { log.Printf("D! TLS Enabled") config.Net.TLS.Config = tlsConfig diff --git a/plugins/outputs/kafka/README.md b/plugins/outputs/kafka/README.md index 196e2e914..00544e99c 100644 --- a/plugins/outputs/kafka/README.md +++ b/plugins/outputs/kafka/README.md @@ -10,6 +10,9 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm ## Kafka topic for producer messages topic = "telegraf" + ## Optional client id + # client_id = "my_client" + ## Optional topic suffix configuration. ## If the section is omitted, no suffix is used. ## Following topic suffix methods are supported: diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 716e06c44..d61aaadd8 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -25,6 +25,8 @@ type ( Brokers []string // Kafka topic Topic string + // Kafka client id + ClientID string `toml:"client_id"` // Kafka topic suffix option TopicSuffix TopicSuffix `toml:"topic_suffix"` // Routing Key Tag @@ -68,6 +70,9 @@ var sampleConfig = ` brokers = ["localhost:9092"] ## Kafka topic for producer messages topic = "telegraf" + + ## Optional Client id + # client_id = "Telegraf" ## Optional topic suffix configuration. ## If the section is omitted, no suffix is used. @@ -186,6 +191,12 @@ func (k *Kafka) Connect() error { } config := sarama.NewConfig() + if k.ClientID != "" { + config.ClientID = k.ClientID + } else { + config.ClientID = "Telegraf" + } + config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks) config.Producer.Compression = sarama.CompressionCodec(k.CompressionCodec) config.Producer.Retry.Max = k.MaxRetry