added zookeeper_chroot option

added a plugin option zookeeper_chroot to set up the kafka endpoint in zookeeper, which may not be / (default).
This chroot is then configured in the consumergroup config.Zookeeper.Chroot

This is workaround the fact that this plugins does not handle the urls like    "zookeeper_server:port/chroot"
As the peers are stored in an array, it makes no sens to have them beeing URL. Peers should all be members of the same cluster, so they all have the same chroot.
This commit is contained in:
Prune Sebastien THOMAS 2016-03-01 14:11:18 -05:00
parent b2a4d4a018
commit fd21f93358
1 changed files with 4 additions and 0 deletions

View File

@ -17,6 +17,7 @@ type Kafka struct {
ConsumerGroup string
Topics []string
ZookeeperPeers []string
ZookeeperChroot string
Consumer *consumergroup.ConsumerGroup
// Legacy metric buffer support
@ -48,6 +49,8 @@ var sampleConfig = `
topics = ["telegraf"]
## an array of Zookeeper connection strings
zookeeper_peers = ["localhost:2181"]
## Zookeeper Chroot
zookeeper_chroot = "/"
## the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
## Offset (must be either "oldest" or "newest")
@ -80,6 +83,7 @@ func (k *Kafka) Start(acc telegraf.Accumulator) error {
k.acc = acc
config := consumergroup.NewConfig()
config.Zookeeper.Chroot = k.ZookeeperChroot
switch strings.ToLower(k.Offset) {
case "oldest", "":
config.Offsets.Initial = sarama.OffsetOldest