diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index bc0d225c6..0d2a49f89 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -17,6 +17,7 @@ type Kafka struct { ConsumerGroup string Topics []string ZookeeperPeers []string + ZookeeperChroot string Consumer *consumergroup.ConsumerGroup // Legacy metric buffer support @@ -48,6 +49,8 @@ var sampleConfig = ` topics = ["telegraf"] ## an array of Zookeeper connection strings zookeeper_peers = ["localhost:2181"] + ## Zookeeper Chroot + zookeeper_chroot = "/" ## the name of the consumer group consumer_group = "telegraf_metrics_consumers" ## Offset (must be either "oldest" or "newest") @@ -80,6 +83,7 @@ func (k *Kafka) Start(acc telegraf.Accumulator) error { k.acc = acc config := consumergroup.NewConfig() + config.Zookeeper.Chroot = k.ZookeeperChroot switch strings.ToLower(k.Offset) { case "oldest", "": config.Offsets.Initial = sarama.OffsetOldest