From e863c6f072bcd4ac8eac71d4812ed1187f425a12 Mon Sep 17 00:00:00 2001 From: Kirk Young Date: Wed, 27 Jul 2016 10:53:04 -0700 Subject: [PATCH] add kafka new consumer support add kafka new consumer support --- Godeps | 1 + plugins/inputs/kafka_consumer/README.md | 41 +++- .../inputs/kafka_consumer/kafka_consumer.go | 178 ++++++++++++++---- 3 files changed, 184 insertions(+), 36 deletions(-) diff --git a/Godeps b/Godeps index 2b4fce555..1dbf189d1 100644 --- a/Godeps +++ b/Godeps @@ -4,6 +4,7 @@ github.com/aerospike/aerospike-client-go 45863b7fd8640dc12f7fdd397104d97e1986f25 github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687 github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857 github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4 +github.com/bsm/sarama-cluster dc1a390cf63c40d0a20ee9f79c4be120f79110e5 github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99 github.com/couchbase/go-couchbase cb664315a324d87d19c879d9cc67fda6be8c2ac1 github.com/couchbase/gomemcached a5ea6356f648fec6ab89add00edd09151455b4b2 diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index afdb51e32..84f54cea2 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -6,11 +6,15 @@ line protocol. [Consumer Group](http://godoc.org/github.com/wvanbergen/kafka/con is used to talk to the Kafka cluster so multiple instances of telegraf can read from the same topic in parallel. -## Configuration +Now supports kafka new consumer (version 0.9+) with TLS + +## Configuration[0.8] ```toml # Read metrics from Kafka topic(s) [[inputs.kafka_consumer]] + ## is new consumer? + new_consumer = false ## topic(s) to consume topics = ["telegraf"] ## an array of Zookeeper connection strings @@ -30,6 +34,41 @@ from the same topic in parallel. data_format = "influx" ``` + + +## Configuration[0.9+] + +```toml +# Read metrics from Kafka topic(s) +[[inputs.kafka_consumer]] + ## is new consumer? + new_consumer = true + ## topic(s) to consume + topics = ["telegraf"] + ## an array of kafka 0.9+ brokers + broker_list = ["localhost:9092"] + ## the name of the consumer group + consumer_group = "telegraf_kafka_consumer_group" + ## Offset (must be either "oldest" or "newest") + offset = "oldest" + + ## Optional SSL Config + ssl_ca = "/etc/telegraf/ca.pem" + ssl_cert = "/etc/telegraf/cert.pem" + ssl_key = "/etc/telegraf/cert.key" + ## Use SSL but skip chain & host verification + insecure_skip_verify = false + + ## Data format to consume. + + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +``` + + + ## Testing Running integration tests requires running Zookeeper & Kafka. See Makefile diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 5600d82a4..072a71a4b 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -1,39 +1,65 @@ package kafka_consumer import ( + "crypto/tls" "log" "strings" "sync" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" "github.com/Shopify/sarama" + "github.com/bsm/sarama-cluster" "github.com/wvanbergen/kafka/consumergroup" ) type Kafka struct { - ConsumerGroup string - Topics []string + // new kafka consumer + NewConsumer bool + // common for both versions + ConsumerGroup string + Topics []string + Offset string + + // for 0.8 ZookeeperPeers []string ZookeeperChroot string Consumer *consumergroup.ConsumerGroup + // for 0.9+ + BrokerList []string + Consumer9 *cluster.Consumer + // Path to CA file + SSLCA string `toml:"ssl_ca"` + // Path to host cert file + SSLCert string `toml:"ssl_cert"` + // Path to cert key file + SSLKey string `toml:"ssl_key"` + + // Skip SSL verification + InsecureSkipVerify bool + + tlsConfig tls.Config + // Legacy metric buffer support MetricBuffer int // TODO remove PointBuffer, legacy support PointBuffer int - Offset string parser parsers.Parser sync.Mutex // channel for all incoming kafka messages in <-chan *sarama.ConsumerMessage + // channel for all kafka consumer errors - errs <-chan *sarama.ConsumerError + errs <-chan *sarama.ConsumerError + errs9 <-chan error + done chan struct{} // keep the accumulator internally: @@ -45,6 +71,8 @@ type Kafka struct { } var sampleConfig = ` + ## is new consumer? + new_consumer = false ## topic(s) to consume topics = ["telegraf"] ## an array of Zookeeper connection strings @@ -82,41 +110,88 @@ func (k *Kafka) Start(acc telegraf.Accumulator) error { k.acc = acc - config := consumergroup.NewConfig() - config.Zookeeper.Chroot = k.ZookeeperChroot - switch strings.ToLower(k.Offset) { - case "oldest", "": - config.Offsets.Initial = sarama.OffsetOldest - case "newest": - config.Offsets.Initial = sarama.OffsetNewest - default: - log.Printf("WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n", - k.Offset) - config.Offsets.Initial = sarama.OffsetOldest - } + log.Println(k.NewConsumer) - if k.Consumer == nil || k.Consumer.Closed() { - k.Consumer, consumerErr = consumergroup.JoinConsumerGroup( - k.ConsumerGroup, - k.Topics, - k.ZookeeperPeers, - config, - ) - if consumerErr != nil { - return consumerErr + if !k.NewConsumer { + config := consumergroup.NewConfig() + + config.Zookeeper.Chroot = k.ZookeeperChroot + switch strings.ToLower(k.Offset) { + case "oldest", "": + config.Offsets.Initial = sarama.OffsetOldest + case "newest": + config.Offsets.Initial = sarama.OffsetNewest + default: + log.Printf("WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n", + k.Offset) + config.Offsets.Initial = sarama.OffsetOldest } + if k.Consumer == nil || k.Consumer.Closed() { + k.Consumer, consumerErr = consumergroup.JoinConsumerGroup( + k.ConsumerGroup, + k.Topics, + k.ZookeeperPeers, + config, + ) + if consumerErr != nil { + return consumerErr + } + + // Setup message and error channels + k.in = k.Consumer.Messages() + k.errs = k.Consumer.Errors() + } + k.done = make(chan struct{}) + // Start the kafka message reader + go k.receiver() + log.Printf("Started the kafka consumer service, peers: %v, topics: %v\n", + k.ZookeeperPeers, k.Topics) + } else { + config := cluster.NewConfig() + + tlsConfig, err := internal.GetTLSConfig(k.SSLCert, k.SSLKey, k.SSLCA, k.InsecureSkipVerify) + if err != nil { + return err + } + + if tlsConfig != nil { + config.Net.TLS.Config = tlsConfig + config.Net.TLS.Enable = true + } + + switch strings.ToLower(k.Offset) { + case "oldest", "": + config.Consumer.Offsets.Initial = sarama.OffsetOldest + case "newest": + config.Consumer.Offsets.Initial = sarama.OffsetNewest + default: + log.Printf("WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n", + k.Offset) + config.Consumer.Offsets.Initial = sarama.OffsetOldest + } + + // TODO: make this configurable + config.Consumer.Return.Errors = true + + if err := config.Validate(); err != nil { + return err + } + + k.Consumer9, err = cluster.NewConsumer(k.BrokerList, k.ConsumerGroup, k.Topics, config) + if err != nil { + return err + } // Setup message and error channels - k.in = k.Consumer.Messages() - k.errs = k.Consumer.Errors() + k.in = k.Consumer9.Messages() + k.errs9 = k.Consumer9.Errors() + k.done = make(chan struct{}) + // Start the kafka message reader for 0.9 + go k.collector() + log.Printf("Started the kafka consumer service with new consumer, brokers: %v, topics: %v\n", + k.BrokerList, k.Topics) } - k.done = make(chan struct{}) - - // Start the kafka message reader - go k.receiver() - log.Printf("Started the kafka consumer service, peers: %v, topics: %v\n", - k.ZookeeperPeers, k.Topics) return nil } @@ -151,12 +226,45 @@ func (k *Kafka) receiver() { } } +// this is for kafka new consumer +func (k *Kafka) collector() { + for { + select { + case <-k.done: + return + case err := <-k.errs9: + log.Printf("Kafka Consumer Error: %s\n", err.Error()) + case msg := <-k.in: + metrics, err := k.parser.Parse(msg.Value) + + if err != nil { + log.Printf("KAFKA PARSE ERROR\nmessage: %s\nerror: %s", + string(msg.Value), err.Error()) + } + + for _, metric := range metrics { + k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) + } + + if !k.doNotCommitMsgs { + k.Consumer9.MarkOffset(msg, "") + } + } + } +} + func (k *Kafka) Stop() { k.Lock() defer k.Unlock() close(k.done) - if err := k.Consumer.Close(); err != nil { - log.Printf("Error closing kafka consumer: %s\n", err.Error()) + if !k.NewConsumer { + if err := k.Consumer.Close(); err != nil { + log.Printf("Error closing kafka consumer: %s\n", err.Error()) + } + } else { + if err := k.Consumer9.Close(); err != nil { + log.Printf("Error closing kafka consumer: %s\n", err.Error()) + } } }