From f4032fc78de27fb49e3f026f349c1e2f63f8f74b Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 31 Jul 2018 15:09:30 -0700 Subject: [PATCH] Add support for lz4 compression to kafka output (#4492) --- plugins/outputs/kafka/README.md | 10 ++++++++-- plugins/outputs/kafka/kafka.go | 16 ++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/plugins/outputs/kafka/README.md b/plugins/outputs/kafka/README.md index 562f3fd5d..bb410a1d5 100644 --- a/plugins/outputs/kafka/README.md +++ b/plugins/outputs/kafka/README.md @@ -10,9 +10,15 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm ## Kafka topic for producer messages topic = "telegraf" - ## Optional client id + ## Optional Client id # client_id = "Telegraf" + ## Set the minimal supported Kafka version. Setting this enables the use of new + ## Kafka features and APIs. Of particular interested, lz4 compression + ## requires at least version 0.10.0.0. + ## ex: version = "1.1.0" + # version = "" + ## Optional topic suffix configuration. ## If the section is omitted, no suffix is used. ## Following topic suffix methods are supported: @@ -20,7 +26,7 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm ## tags - suffix equals to separator + specified tags' values ## interleaved with separator - ## Suffix equals to "_" + measurement's name + ## Suffix equals to "_" + measurement name # [outputs.kafka.topic_suffix] # method = "measurement" # separator = "_" diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index a99c8e1c2..5fdb8d857 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -38,6 +38,8 @@ type ( // MaxRetry Tag MaxRetry int + Version string `toml:"version"` + // Legacy TLS config options // TLS client certificate Certificate string @@ -74,6 +76,12 @@ var sampleConfig = ` ## Optional Client id # client_id = "Telegraf" + ## Set the minimal supported Kafka version. Setting this enables the use of new + ## Kafka features and APIs. Of particular interested, lz4 compression + ## requires at least version 0.10.0.0. + ## ex: version = "1.1.0" + # version = "" + ## Optional topic suffix configuration. ## If the section is omitted, no suffix is used. ## Following topic suffix methods are supported: @@ -191,6 +199,14 @@ func (k *Kafka) Connect() error { } config := sarama.NewConfig() + if k.Version != "" { + version, err := sarama.ParseKafkaVersion(k.Version) + if err != nil { + return err + } + config.Version = version + } + if k.ClientID != "" { config.ClientID = k.ClientID } else {