diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index 26ebca39d..efd3ffad6 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -23,8 +23,7 @@ and use the old zookeeper connection method. # client_id = "Telegraf" ## Set the minimal supported Kafka version. Setting this enables the use of new - ## Kafka features and APIs. Of particular interest, lz4 compression - ## requires at least version 0.10.0.0. + ## Kafka features and APIs. Must be 0.10.2.0 or greater. ## ex: version = "1.1.0" # version = "" diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 10a6251be..2703bb52d 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -28,8 +28,7 @@ const sampleConfig = ` # client_id = "Telegraf" ## Set the minimal supported Kafka version. Setting this enables the use of new - ## Kafka features and APIs. Of particular interest, lz4 compression - ## requires at least version 0.10.0.0. + ## Kafka features and APIs. Must be 0.10.2.0 or greater. ## ex: version = "1.1.0" # version = "" @@ -143,11 +142,15 @@ func (k *KafkaConsumer) Init() error { config := sarama.NewConfig() config.Consumer.Return.Errors = true + // Kafka version 0.10.2.0 is required for consumer groups. + config.Version = sarama.V0_10_2_0 + if k.Version != "" { version, err := sarama.ParseKafkaVersion(k.Version) if err != nil { return err } + config.Version = version }