added zookeeper_chroot option

added a plugin option zookeeper_chroot to set up the kafka endpoint in zookeeper, which may not be / (default).
This chroot is then configured in the consumergroup config.Zookeeper.Chroot

This is workaround the fact that this plugins does not handle the urls like    "zookeeper_server:port/chroot"
As the peers are stored in an array, it makes no sens to have them beeing URL. Peers should all be members of the same cluster, so they all have the same chroot.
This commit is contained in:
Prune Sebastien THOMAS 2016-03-01 14:11:18 -05:00 committed by Cameron Sparr
parent cd66e203bd
commit 0060df9877
1 changed files with 4 additions and 0 deletions

View File

@ -17,6 +17,7 @@ type Kafka struct {
ConsumerGroup string ConsumerGroup string
Topics []string Topics []string
ZookeeperPeers []string ZookeeperPeers []string
ZookeeperChroot string
Consumer *consumergroup.ConsumerGroup Consumer *consumergroup.ConsumerGroup
// Legacy metric buffer support // Legacy metric buffer support
@ -48,6 +49,8 @@ var sampleConfig = `
topics = ["telegraf"] topics = ["telegraf"]
## an array of Zookeeper connection strings ## an array of Zookeeper connection strings
zookeeper_peers = ["localhost:2181"] zookeeper_peers = ["localhost:2181"]
## Zookeeper Chroot
zookeeper_chroot = "/"
## the name of the consumer group ## the name of the consumer group
consumer_group = "telegraf_metrics_consumers" consumer_group = "telegraf_metrics_consumers"
## Offset (must be either "oldest" or "newest") ## Offset (must be either "oldest" or "newest")
@ -80,6 +83,7 @@ func (k *Kafka) Start(acc telegraf.Accumulator) error {
k.acc = acc k.acc = acc
config := consumergroup.NewConfig() config := consumergroup.NewConfig()
config.Zookeeper.Chroot = k.ZookeeperChroot
switch strings.ToLower(k.Offset) { switch strings.ToLower(k.Offset) {
case "oldest", "": case "oldest", "":
config.Offsets.Initial = sarama.OffsetOldest config.Offsets.Initial = sarama.OffsetOldest