Support partition assignement strategy configuration in kafka_consumer (#6688)

This commit is contained in:
Enno Lohmeier
2019-11-27 19:54:29 +01:00
committed by Daniel Nelson
parent c58f0debb1
commit e04bb1e07f
5 changed files with 39 additions and 5 deletions

View File

@@ -44,6 +44,9 @@ and use the old zookeeper connection method.
## Initial offset position; one of "oldest" or "newest".
# offset = "oldest"
## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky".
# balance_strategy = "range"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 1000000

View File

@@ -49,6 +49,9 @@ const sampleConfig = `
## Initial offset position; one of "oldest" or "newest".
# offset = "oldest"
## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky".
# balance_strategy = "range"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 1000000
@@ -86,6 +89,7 @@ type KafkaConsumer struct {
MaxMessageLen int `toml:"max_message_len"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
Offset string `toml:"offset"`
BalanceStrategy string `toml:"balance_strategy"`
Topics []string `toml:"topics"`
TopicTag string `toml:"topic_tag"`
Version string `toml:"version"`
@@ -185,6 +189,17 @@ func (k *KafkaConsumer) Init() error {
return fmt.Errorf("invalid offset %q", k.Offset)
}
switch strings.ToLower(k.BalanceStrategy) {
case "range", "":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
case "roundrobin":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
case "sticky":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
default:
return fmt.Errorf("invalid balance strategy %q", k.BalanceStrategy)
}
if k.ConsumerCreator == nil {
k.ConsumerCreator = &SaramaCreator{}
}