From fd21f93358c6ffbb5929fe0ad9365d27f74d6693 Mon Sep 17 00:00:00 2001 From: Prune Sebastien THOMAS Date: Tue, 1 Mar 2016 14:11:18 -0500 Subject: [PATCH] added zookeeper_chroot option added a plugin option zookeeper_chroot to set up the kafka endpoint in zookeeper, which may not be / (default). This chroot is then configured in the consumergroup config.Zookeeper.Chroot This is workaround the fact that this plugins does not handle the urls like "zookeeper_server:port/chroot" As the peers are stored in an array, it makes no sens to have them beeing URL. Peers should all be members of the same cluster, so they all have the same chroot. --- plugins/inputs/kafka_consumer/kafka_consumer.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index bc0d225c6..0d2a49f89 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -17,6 +17,7 @@ type Kafka struct { ConsumerGroup string Topics []string ZookeeperPeers []string + ZookeeperChroot string Consumer *consumergroup.ConsumerGroup // Legacy metric buffer support @@ -48,6 +49,8 @@ var sampleConfig = ` topics = ["telegraf"] ## an array of Zookeeper connection strings zookeeper_peers = ["localhost:2181"] + ## Zookeeper Chroot + zookeeper_chroot = "/" ## the name of the consumer group consumer_group = "telegraf_metrics_consumers" ## Offset (must be either "oldest" or "newest") @@ -80,6 +83,7 @@ func (k *Kafka) Start(acc telegraf.Accumulator) error { k.acc = acc config := consumergroup.NewConfig() + config.Zookeeper.Chroot = k.ZookeeperChroot switch strings.ToLower(k.Offset) { case "oldest", "": config.Offsets.Initial = sarama.OffsetOldest