From 9bdb3992d58fcb81e5d7b324a4dade6d404ee144 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 30 Jul 2019 21:33:29 -0700 Subject: [PATCH] Require Kafka 0.10.2.0 or later in kafka_consumer (#6181) --- plugins/inputs/kafka_consumer/README.md | 3 +-- plugins/inputs/kafka_consumer/kafka_consumer.go | 7 +++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index 26ebca39d..efd3ffad6 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -23,8 +23,7 @@ and use the old zookeeper connection method. # client_id = "Telegraf" ## Set the minimal supported Kafka version. Setting this enables the use of new - ## Kafka features and APIs. Of particular interest, lz4 compression - ## requires at least version 0.10.0.0. + ## Kafka features and APIs. Must be 0.10.2.0 or greater. ## ex: version = "1.1.0" # version = "" diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 10a6251be..2703bb52d 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -28,8 +28,7 @@ const sampleConfig = ` # client_id = "Telegraf" ## Set the minimal supported Kafka version. Setting this enables the use of new - ## Kafka features and APIs. Of particular interest, lz4 compression - ## requires at least version 0.10.0.0. + ## Kafka features and APIs. Must be 0.10.2.0 or greater. ## ex: version = "1.1.0" # version = "" @@ -143,11 +142,15 @@ func (k *KafkaConsumer) Init() error { config := sarama.NewConfig() config.Consumer.Return.Errors = true + // Kafka version 0.10.2.0 is required for consumer groups. + config.Version = sarama.V0_10_2_0 + if k.Version != "" { version, err := sarama.ParseKafkaVersion(k.Version) if err != nil { return err } + config.Version = version }