diff --git a/Gopkg.lock b/Gopkg.lock index 3eb640780..ae730556e 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -100,12 +100,12 @@ version = "v0.4.9" [[projects]] - digest = "1:322bf7f4bb312294fc551f6e2c82d02f2ab8f94920f4163b3deeb07a8141ac79" + digest = "1:33f56caa9ab45fedc63d3d1d3e342d9f9d00726071f22c67d06b0cd26d49a55e" name = "github.com/Shopify/sarama" packages = ["."] pruneopts = "" - revision = "b12709e6ca29240128c89fe0b30b6a76be42b457" - source = "https://github.com/influxdata/sarama.git" + revision = "" + version = "v1.24.1" [[projects]] digest = "1:f82b8ac36058904227087141017bb82f4b0fc58272990a4cdae3e2d6d222644e" @@ -791,6 +791,20 @@ pruneopts = "" revision = "95032a82bc518f77982ea72343cc1ade730072f0" +[[projects]] + digest = "1:4ceab6231efd01210f2b8b6ab360d480d49c0f44df63841ca0465920a387495d" + name = "github.com/klauspost/compress" + packages = [ + "fse", + "huff0", + "snappy", + "zstd", + "zstd/internal/xxhash", + ] + pruneopts = "" + revision = "4e96aec082898e4dad17d8aca1a7e2d01362ff6c" + version = "v1.9.2" + [[projects]] branch = "master" digest = "1:1ed9eeebdf24aadfbca57eb50e6455bd1d2474525e0f0d4454de8c8e9bc7ee9a" diff --git a/Gopkg.toml b/Gopkg.toml index c6e510641..7ecfae425 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -148,8 +148,7 @@ [[constraint]] name = "github.com/Shopify/sarama" - revision = "b12709e6ca29240128c89fe0b30b6a76be42b457" - source = "https://github.com/influxdata/sarama.git" + version = "1.24.0" [[constraint]] name = "github.com/soniah/gosnmp" diff --git a/etc/telegraf.conf b/etc/telegraf.conf index bab1fb456..5f728579b 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -5116,6 +5116,9 @@ # ## Initial offset position; one of "oldest" or "newest". # # offset = "oldest" # +# ## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky". +# # balance_strategy = "range" +# # ## Maximum length of a message to consume, in bytes (default 0/unlimited); # ## larger messages are dropped # max_message_len = 1000000 diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index efd3ffad6..b0f2a4798 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -44,6 +44,9 @@ and use the old zookeeper connection method. ## Initial offset position; one of "oldest" or "newest". # offset = "oldest" + ## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky". + # balance_strategy = "range" + ## Maximum length of a message to consume, in bytes (default 0/unlimited); ## larger messages are dropped max_message_len = 1000000 diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 997988ca6..39f6f0e2b 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -49,6 +49,9 @@ const sampleConfig = ` ## Initial offset position; one of "oldest" or "newest". # offset = "oldest" + ## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky". + # balance_strategy = "range" + ## Maximum length of a message to consume, in bytes (default 0/unlimited); ## larger messages are dropped max_message_len = 1000000 @@ -86,6 +89,7 @@ type KafkaConsumer struct { MaxMessageLen int `toml:"max_message_len"` MaxUndeliveredMessages int `toml:"max_undelivered_messages"` Offset string `toml:"offset"` + BalanceStrategy string `toml:"balance_strategy"` Topics []string `toml:"topics"` TopicTag string `toml:"topic_tag"` Version string `toml:"version"` @@ -185,6 +189,17 @@ func (k *KafkaConsumer) Init() error { return fmt.Errorf("invalid offset %q", k.Offset) } + switch strings.ToLower(k.BalanceStrategy) { + case "range", "": + config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange + case "roundrobin": + config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin + case "sticky": + config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky + default: + return fmt.Errorf("invalid balance strategy %q", k.BalanceStrategy) + } + if k.ConsumerCreator == nil { k.ConsumerCreator = &SaramaCreator{} }