From a24f7a0a05976dafdbd8194c5dcd31bfe93ecc19 Mon Sep 17 00:00:00 2001 From: Seuf Date: Thu, 8 Jun 2017 03:22:28 +0200 Subject: [PATCH] Add Kafka 0.9+ consumer support (#2487) --- CHANGELOG.md | 1 + Godeps | 3 +- Makefile | 28 ++- docs/LICENSE_OF_DEPENDENCIES.md | 1 + etc/telegraf.conf | 35 +++- internal/config/testdata/telegraf-agent.toml | 20 +- plugins/inputs/all/all.go | 1 + plugins/inputs/kafka_consumer/README.md | 19 +- .../inputs/kafka_consumer/kafka_consumer.go | 105 +++++++--- .../kafka_consumer_integration_test.go | 11 +- .../kafka_consumer/kafka_consumer_test.go | 2 +- .../inputs/kafka_consumer_legacy/README.md | 39 ++++ .../kafka_consumer_legacy.go | 183 ++++++++++++++++++ .../kafka_consumer_legacy_integration_test.go | 96 +++++++++ .../kafka_consumer_legacy_test.go | 150 ++++++++++++++ 15 files changed, 637 insertions(+), 57 deletions(-) create mode 100644 plugins/inputs/kafka_consumer_legacy/README.md create mode 100644 plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy.go create mode 100644 plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_integration_test.go create mode 100644 plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 2216e6a8f..69a5fefee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Release Notes ### Features +- [#2487](https://github.com/influxdata/telegraf/pull/2487): Add Kafka 0.9+ consumer support - [#2773](https://github.com/influxdata/telegraf/pull/2773): Add support for self-signed certs to InfluxDB input plugin - [#2581](https://github.com/influxdata/telegraf/pull/2581): Add Docker container environment variables as tags. Only whitelisted - [#2817](https://github.com/influxdata/telegraf/pull/2817): Added timeout option to IPMI sensor plugin diff --git a/Godeps b/Godeps index da3ccdc35..b53f71848 100644 --- a/Godeps +++ b/Godeps @@ -1,5 +1,5 @@ collectd.org 2ce144541b8903101fb8f1483cc0497a68798122 -github.com/Shopify/sarama 574d3147eee384229bf96a5d12c207fe7b5234f3 +github.com/Shopify/sarama c01858abb625b73a3af51d0798e4ad42c8147093 github.com/Sirupsen/logrus 61e43dc76f7ee59a82bdf3d71033dc12bea4c77d github.com/aerospike/aerospike-client-go 95e1ad7791bdbca44707fedbb29be42024900d9c github.com/amir/raidman c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985 @@ -52,6 +52,7 @@ github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6 github.com/stretchr/testify 4d4bfba8f1d1027c4fdbe371823030df51419987 github.com/vjeantet/grok d73e972b60935c7fec0b4ffbc904ed39ecaf7efe github.com/wvanbergen/kafka bc265fedb9ff5b5c5d3c0fdcef4a819b3523d3ee +github.com/bsm/sarama-cluster ccdc0803695fbce22f1706d04ded46cd518fd832 github.com/wvanbergen/kazoo-go 968957352185472eacb69215fa3dbfcfdbac1096 github.com/yuin/gopher-lua 66c871e454fcf10251c61bf8eff02d0978cae75a github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363 diff --git a/Makefile b/Makefile index d2bad656d..1f4233ab0 100644 --- a/Makefile +++ b/Makefile @@ -46,11 +46,15 @@ prepare-windows: # Run all docker containers necessary for unit tests docker-run: docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server:3.9.0 + docker run --name zookeeper -p "2181:2181" -d wurstmeister/zookeeper docker run --name kafka \ - -e ADVERTISED_HOST=localhost \ - -e ADVERTISED_PORT=9092 \ - -p "2181:2181" -p "9092:9092" \ - -d spotify/kafka + --link zookeeper:zookeeper \ + -e KAFKA_ADVERTISED_HOST_NAME=localhost \ + -e KAFKA_ADVERTISED_PORT=9092 \ + -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ + -e KAFKA_CREATE_TOPICS="test:1:1" \ + -p "9092:9092" \ + -d wurstmeister/kafka docker run --name elasticsearch -p "9200:9200" -p "9300:9300" -d elasticsearch:5 docker run --name mysql -p "3306:3306" -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -d mysql docker run --name memcached -p "11211:11211" -d memcached @@ -65,11 +69,15 @@ docker-run: # Run docker containers necessary for CircleCI unit tests docker-run-circle: docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server:3.9.0 + docker run --name zookeeper -p "2181:2181" -d wurstmeister/zookeeper docker run --name kafka \ - -e ADVERTISED_HOST=localhost \ - -e ADVERTISED_PORT=9092 \ - -p "2181:2181" -p "9092:9092" \ - -d spotify/kafka + --link zookeeper:zookeeper \ + -e KAFKA_ADVERTISED_HOST_NAME=localhost \ + -e KAFKA_ADVERTISED_PORT=9092 \ + -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ + -e KAFKA_CREATE_TOPICS="test:1:1" \ + -p "9092:9092" \ + -d wurstmeister/kafka docker run --name elasticsearch -p "9200:9200" -p "9300:9300" -d elasticsearch:5 docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt @@ -78,8 +86,8 @@ docker-run-circle: # Kill all docker containers, ignore errors docker-kill: - -docker kill nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann nats elasticsearch - -docker rm nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann nats elasticsearch + -docker kill nsq aerospike redis rabbitmq postgres memcached mysql zookeeper kafka mqtt riemann nats elasticsearch + -docker rm nsq aerospike redis rabbitmq postgres memcached mysql zookeeper kafka mqtt riemann nats elasticsearch # Run full unit tests using docker containers (includes setup and teardown) test: vet docker-kill docker-run diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 3b4d0a34d..c62c262c0 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -10,6 +10,7 @@ works: - github.com/aws/aws-sdk-go [APACHE](https://github.com/aws/aws-sdk-go/blob/master/LICENSE.txt) - github.com/beorn7/perks [MIT](https://github.com/beorn7/perks/blob/master/LICENSE) - github.com/boltdb/bolt [MIT](https://github.com/boltdb/bolt/blob/master/LICENSE) +- github.com/bsm/sarama-cluster [MIT](https://github.com/bsm/sarama-cluster/blob/master/LICENSE) - github.com/cenkalti/backoff [MIT](https://github.com/cenkalti/backoff/blob/master/LICENSE) - github.com/couchbase/go-couchbase [MIT](https://github.com/couchbase/go-couchbase/blob/master/LICENSE) - github.com/couchbase/gomemcached [MIT](https://github.com/couchbase/gomemcached/blob/master/LICENSE) diff --git a/etc/telegraf.conf b/etc/telegraf.conf index d455b10af..5c105a30a 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -2198,11 +2198,42 @@ # ## 0 means to use the default of 65536 bytes (64 kibibytes) # max_line_size = 0 - -# # Read metrics from Kafka topic(s) +# # Read metrics from Kafka 0.9+ topic(s) # [[inputs.kafka_consumer]] # ## topic(s) to consume # topics = ["telegraf"] +# ## kafka servers +# brokers = ["localhost:9092"] +# ## the name of the consumer group +# consumer_group = "telegraf_metrics_consumers" +# ## Offset (must be either "oldest" or "newest") +# offset = "oldest" +# +# ## Optional SSL Config +# # ssl_ca = "/etc/telegraf/ca.pem" +# # ssl_cert = "/etc/telegraf/cert.pem" +# # ssl_key = "/etc/telegraf/key.pem" +# ## Use SSL but skip chain & host verification +# # insecure_skip_verify = false +# +# ## Optional SASL Config +# # sasl_username = "kafka" +# # sasl_password = "secret" +# +# ## Data format to consume. +# ## Each data format has its own unique set of configuration options, read +# ## more about them here: +# ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md +# data_format = "influx" +# +# ## Maximum length of a message to consume, in bytes (default 0/unlimited); +# ## larger messages are dropped +# max_message_len = 65536 + +# # Read metrics from Kafka (0.8 or less) topic(s) +# [[inputs.kafka_consumer_legacy]] +# ## topic(s) to consume +# topics = ["telegraf"] # ## an array of Zookeeper connection strings # zookeeper_peers = ["localhost:2181"] # ## Zookeeper Chroot diff --git a/internal/config/testdata/telegraf-agent.toml b/internal/config/testdata/telegraf-agent.toml index 5cf82af76..9da79605f 100644 --- a/internal/config/testdata/telegraf-agent.toml +++ b/internal/config/testdata/telegraf-agent.toml @@ -143,19 +143,31 @@ [[inputs.diskio]] # no configuration -# read metrics from a Kafka topic +# read metrics from a Kafka 0.9+ topic [[inputs.kafka_consumer]] - # topic(s) to consume + ## kafka brokers + brokers = ["localhost:9092"] + ## topic(s) to consume + topics = ["telegraf"] + ## the name of the consumer group + consumer_group = "telegraf_metrics_consumers" + ## Offset (must be either "oldest" or "newest") + offset = "oldest" + +# read metrics from a Kafka legacy topic +[[inputs.kafka_consumer_legacy]] + ## topic(s) to consume topics = ["telegraf"] # an array of Zookeeper connection strings zookeeper_peers = ["localhost:2181"] - # the name of the consumer group + ## the name of the consumer group consumer_group = "telegraf_metrics_consumers" # Maximum number of points to buffer between collection intervals point_buffer = 100000 - # Offset (must be either "oldest" or "newest") + ## Offset (must be either "oldest" or "newest") offset = "oldest" + # Read metrics from a LeoFS Server via SNMP [[inputs.leofs]] # An array of URI to gather stats about LeoFS. diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 10af14864..456852ff2 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -35,6 +35,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/iptables" _ "github.com/influxdata/telegraf/plugins/inputs/jolokia" _ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer" + _ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer_legacy" _ "github.com/influxdata/telegraf/plugins/inputs/kapacitor" _ "github.com/influxdata/telegraf/plugins/inputs/kubernetes" _ "github.com/influxdata/telegraf/plugins/inputs/leofs" diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index 31976788b..695001274 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -6,6 +6,9 @@ line protocol. [Consumer Group](http://godoc.org/github.com/wvanbergen/kafka/con is used to talk to the Kafka cluster so multiple instances of telegraf can read from the same topic in parallel. +For old kafka version (< 0.8), please use the kafka_consumer_legacy input plugin +and use the old zookeeper connection method. + ## Configuration ```toml @@ -13,15 +16,23 @@ from the same topic in parallel. [[inputs.kafka_consumer]] ## topic(s) to consume topics = ["telegraf"] - ## an array of Zookeeper connection strings - zookeeper_peers = ["localhost:2181"] - ## Zookeeper Chroot - zookeeper_chroot = "" + brokers = ["localhost:9092"] ## the name of the consumer group consumer_group = "telegraf_metrics_consumers" ## Offset (must be either "oldest" or "newest") offset = "oldest" + ## Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ## Use SSL but skip chain & host verification + # insecure_skip_verify = false + + ## Optional SASL Config + # sasl_username = "kafka" + # sasl_password = "secret" + ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 2f6933db0..4e4715617 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -7,20 +7,35 @@ import ( "sync" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" "github.com/Shopify/sarama" - "github.com/wvanbergen/kafka/consumergroup" + cluster "github.com/bsm/sarama-cluster" ) type Kafka struct { - ConsumerGroup string - Topics []string - MaxMessageLen int - ZookeeperPeers []string - ZookeeperChroot string - Consumer *consumergroup.ConsumerGroup + ConsumerGroup string + Topics []string + Brokers []string + MaxMessageLen int + + Cluster *cluster.Consumer + + // Verify Kafka SSL Certificate + InsecureSkipVerify bool + // Path to CA file + SSLCA string `toml:"ssl_ca"` + // Path to host cert file + SSLCert string `toml:"ssl_cert"` + // Path to cert key file + SSLKey string `toml:"ssl_key"` + + // SASL Username + SASLUsername string `toml:"sasl_username"` + // SASL Password + SASLPassword string `toml:"sasl_password"` // Legacy metric buffer support MetricBuffer int @@ -47,12 +62,22 @@ type Kafka struct { } var sampleConfig = ` + ## kafka servers + brokers = ["localhost:9092"] ## topic(s) to consume topics = ["telegraf"] - ## an array of Zookeeper connection strings - zookeeper_peers = ["localhost:2181"] - ## Zookeeper Chroot - zookeeper_chroot = "" + + ## Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ## Use SSL but skip chain & host verification + # insecure_skip_verify = false + + ## Optional SASL Config + # sasl_username = "kafka" + # sasl_password = "secret" + ## the name of the consumer group consumer_group = "telegraf_metrics_consumers" ## Offset (must be either "oldest" or "newest") @@ -84,45 +109,67 @@ func (k *Kafka) SetParser(parser parsers.Parser) { func (k *Kafka) Start(acc telegraf.Accumulator) error { k.Lock() defer k.Unlock() - var consumerErr error + var clusterErr error k.acc = acc - config := consumergroup.NewConfig() - config.Zookeeper.Chroot = k.ZookeeperChroot + config := cluster.NewConfig() + config.Consumer.Return.Errors = true + + tlsConfig, err := internal.GetTLSConfig( + k.SSLCert, k.SSLKey, k.SSLCA, k.InsecureSkipVerify) + if err != nil { + return err + } + + if tlsConfig != nil { + log.Printf("D! TLS Enabled") + config.Net.TLS.Config = tlsConfig + config.Net.TLS.Enable = true + } + if k.SASLUsername != "" && k.SASLPassword != "" { + log.Printf("D! Using SASL auth with username '%s',", + k.SASLUsername) + config.Net.SASL.User = k.SASLUsername + config.Net.SASL.Password = k.SASLPassword + config.Net.SASL.Enable = true + } + switch strings.ToLower(k.Offset) { case "oldest", "": - config.Offsets.Initial = sarama.OffsetOldest + config.Consumer.Offsets.Initial = sarama.OffsetOldest case "newest": - config.Offsets.Initial = sarama.OffsetNewest + config.Consumer.Offsets.Initial = sarama.OffsetNewest default: log.Printf("I! WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n", k.Offset) - config.Offsets.Initial = sarama.OffsetOldest + config.Consumer.Offsets.Initial = sarama.OffsetOldest } - if k.Consumer == nil || k.Consumer.Closed() { - k.Consumer, consumerErr = consumergroup.JoinConsumerGroup( + if k.Cluster == nil { + k.Cluster, clusterErr = cluster.NewConsumer( + k.Brokers, k.ConsumerGroup, k.Topics, - k.ZookeeperPeers, config, ) - if consumerErr != nil { - return consumerErr + + if clusterErr != nil { + log.Printf("E! Error when creating Kafka Consumer, brokers: %v, topics: %v\n", + k.Brokers, k.Topics) + return clusterErr } // Setup message and error channels - k.in = k.Consumer.Messages() - k.errs = k.Consumer.Errors() + k.in = k.Cluster.Messages() + k.errs = k.Cluster.Errors() } k.done = make(chan struct{}) - // Start the kafka message reader go k.receiver() - log.Printf("I! Started the kafka consumer service, peers: %v, topics: %v\n", - k.ZookeeperPeers, k.Topics) + log.Printf("I! Started the kafka consumer service, brokers: %v, topics: %v\n", + k.Brokers, k.Topics) return nil } @@ -156,7 +203,7 @@ func (k *Kafka) receiver() { // TODO(cam) this locking can be removed if this PR gets merged: // https://github.com/wvanbergen/kafka/pull/84 k.Lock() - k.Consumer.CommitUpto(msg) + k.Cluster.MarkOffset(msg, "") k.Unlock() } } @@ -167,7 +214,7 @@ func (k *Kafka) Stop() { k.Lock() defer k.Unlock() close(k.done) - if err := k.Consumer.Close(); err != nil { + if err := k.Cluster.Close(); err != nil { k.acc.AddError(fmt.Errorf("Error closing consumer: %s\n", err.Error())) } } diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go index 41ce10157..a145a938a 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go @@ -19,7 +19,6 @@ func TestReadsMetricsFromKafka(t *testing.T) { } brokerPeers := []string{testutil.GetLocalHost() + ":9092"} - zkPeers := []string{testutil.GetLocalHost() + ":2181"} testTopic := fmt.Sprintf("telegraf_test_topic_%d", time.Now().Unix()) // Send a Kafka message to the kafka host @@ -36,11 +35,11 @@ func TestReadsMetricsFromKafka(t *testing.T) { // Start the Kafka Consumer k := &Kafka{ - ConsumerGroup: "telegraf_test_consumers", - Topics: []string{testTopic}, - ZookeeperPeers: zkPeers, - PointBuffer: 100000, - Offset: "oldest", + ConsumerGroup: "telegraf_test_consumers", + Topics: []string{testTopic}, + Brokers: brokerPeers, + PointBuffer: 100000, + Offset: "oldest", } p, _ := parsers.NewInfluxParser() k.SetParser(p) diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index 5519dd0d1..9a585d6ed 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -23,7 +23,7 @@ func newTestKafka() (*Kafka, chan *sarama.ConsumerMessage) { k := Kafka{ ConsumerGroup: "test", Topics: []string{"telegraf"}, - ZookeeperPeers: []string{"localhost:2181"}, + Brokers: []string{"localhost:9092"}, Offset: "oldest", in: in, doNotCommitMsgs: true, diff --git a/plugins/inputs/kafka_consumer_legacy/README.md b/plugins/inputs/kafka_consumer_legacy/README.md new file mode 100644 index 000000000..31976788b --- /dev/null +++ b/plugins/inputs/kafka_consumer_legacy/README.md @@ -0,0 +1,39 @@ +# Kafka Consumer Input Plugin + +The [Kafka](http://kafka.apache.org/) consumer plugin polls a specified Kafka +topic and adds messages to InfluxDB. The plugin assumes messages follow the +line protocol. [Consumer Group](http://godoc.org/github.com/wvanbergen/kafka/consumergroup) +is used to talk to the Kafka cluster so multiple instances of telegraf can read +from the same topic in parallel. + +## Configuration + +```toml +# Read metrics from Kafka topic(s) +[[inputs.kafka_consumer]] + ## topic(s) to consume + topics = ["telegraf"] + ## an array of Zookeeper connection strings + zookeeper_peers = ["localhost:2181"] + ## Zookeeper Chroot + zookeeper_chroot = "" + ## the name of the consumer group + consumer_group = "telegraf_metrics_consumers" + ## Offset (must be either "oldest" or "newest") + offset = "oldest" + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" + + ## Maximum length of a message to consume, in bytes (default 0/unlimited); + ## larger messages are dropped + max_message_len = 65536 +``` + +## Testing + +Running integration tests requires running Zookeeper & Kafka. See Makefile +for kafka container command. diff --git a/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy.go b/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy.go new file mode 100644 index 000000000..d9558d5bd --- /dev/null +++ b/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy.go @@ -0,0 +1,183 @@ +package kafka_consumer_legacy + +import ( + "fmt" + "log" + "strings" + "sync" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" + + "github.com/Shopify/sarama" + "github.com/wvanbergen/kafka/consumergroup" +) + +type Kafka struct { + ConsumerGroup string + Topics []string + MaxMessageLen int + ZookeeperPeers []string + ZookeeperChroot string + Consumer *consumergroup.ConsumerGroup + + // Legacy metric buffer support + MetricBuffer int + // TODO remove PointBuffer, legacy support + PointBuffer int + + Offset string + parser parsers.Parser + + sync.Mutex + + // channel for all incoming kafka messages + in <-chan *sarama.ConsumerMessage + // channel for all kafka consumer errors + errs <-chan error + done chan struct{} + + // keep the accumulator internally: + acc telegraf.Accumulator + + // doNotCommitMsgs tells the parser not to call CommitUpTo on the consumer + // this is mostly for test purposes, but there may be a use-case for it later. + doNotCommitMsgs bool +} + +var sampleConfig = ` + ## topic(s) to consume + topics = ["telegraf"] + ## an array of Zookeeper connection strings + zookeeper_peers = ["localhost:2181"] + ## Zookeeper Chroot + zookeeper_chroot = "" + ## the name of the consumer group + consumer_group = "telegraf_metrics_consumers" + ## Offset (must be either "oldest" or "newest") + offset = "oldest" + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" + + ## Maximum length of a message to consume, in bytes (default 0/unlimited); + ## larger messages are dropped + max_message_len = 65536 +` + +func (k *Kafka) SampleConfig() string { + return sampleConfig +} + +func (k *Kafka) Description() string { + return "Read metrics from Kafka topic(s)" +} + +func (k *Kafka) SetParser(parser parsers.Parser) { + k.parser = parser +} + +func (k *Kafka) Start(acc telegraf.Accumulator) error { + k.Lock() + defer k.Unlock() + var consumerErr error + + k.acc = acc + + config := consumergroup.NewConfig() + config.Zookeeper.Chroot = k.ZookeeperChroot + switch strings.ToLower(k.Offset) { + case "oldest", "": + config.Offsets.Initial = sarama.OffsetOldest + case "newest": + config.Offsets.Initial = sarama.OffsetNewest + default: + log.Printf("I! WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n", + k.Offset) + config.Offsets.Initial = sarama.OffsetOldest + } + + if k.Consumer == nil || k.Consumer.Closed() { + k.Consumer, consumerErr = consumergroup.JoinConsumerGroup( + k.ConsumerGroup, + k.Topics, + k.ZookeeperPeers, + config, + ) + if consumerErr != nil { + return consumerErr + } + + // Setup message and error channels + k.in = k.Consumer.Messages() + k.errs = k.Consumer.Errors() + } + + k.done = make(chan struct{}) + + // Start the kafka message reader + go k.receiver() + log.Printf("I! Started the kafka consumer service, peers: %v, topics: %v\n", + k.ZookeeperPeers, k.Topics) + return nil +} + +// receiver() reads all incoming messages from the consumer, and parses them into +// influxdb metric points. +func (k *Kafka) receiver() { + for { + select { + case <-k.done: + return + case err := <-k.errs: + if err != nil { + k.acc.AddError(fmt.Errorf("Consumer Error: %s\n", err)) + } + case msg := <-k.in: + if k.MaxMessageLen != 0 && len(msg.Value) > k.MaxMessageLen { + k.acc.AddError(fmt.Errorf("Message longer than max_message_len (%d > %d)", + len(msg.Value), k.MaxMessageLen)) + } else { + metrics, err := k.parser.Parse(msg.Value) + if err != nil { + k.acc.AddError(fmt.Errorf("Message Parse Error\nmessage: %s\nerror: %s", + string(msg.Value), err.Error())) + } + for _, metric := range metrics { + k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) + } + } + + if !k.doNotCommitMsgs { + // TODO(cam) this locking can be removed if this PR gets merged: + // https://github.com/wvanbergen/kafka/pull/84 + k.Lock() + k.Consumer.CommitUpto(msg) + k.Unlock() + } + } + } +} + +func (k *Kafka) Stop() { + k.Lock() + defer k.Unlock() + close(k.done) + if err := k.Consumer.Close(); err != nil { + k.acc.AddError(fmt.Errorf("Error closing consumer: %s\n", err.Error())) + } +} + +func (k *Kafka) Gather(acc telegraf.Accumulator) error { + return nil +} + +func init() { + inputs.Add("kafka_consumer_legacy", func() telegraf.Input { + return &Kafka{} + }) +} diff --git a/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_integration_test.go b/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_integration_test.go new file mode 100644 index 000000000..3fe80c4a4 --- /dev/null +++ b/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_integration_test.go @@ -0,0 +1,96 @@ +package kafka_consumer_legacy + +import ( + "fmt" + "testing" + "time" + + "github.com/Shopify/sarama" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/plugins/parsers" +) + +func TestReadsMetricsFromKafka(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + brokerPeers := []string{testutil.GetLocalHost() + ":9092"} + zkPeers := []string{testutil.GetLocalHost() + ":2181"} + testTopic := fmt.Sprintf("telegraf_test_topic_legacy_%d", time.Now().Unix()) + + // Send a Kafka message to the kafka host + msg := "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257\n" + producer, err := sarama.NewSyncProducer(brokerPeers, nil) + require.NoError(t, err) + _, _, err = producer.SendMessage( + &sarama.ProducerMessage{ + Topic: testTopic, + Value: sarama.StringEncoder(msg), + }) + require.NoError(t, err) + defer producer.Close() + + // Start the Kafka Consumer + k := &Kafka{ + ConsumerGroup: "telegraf_test_consumers", + Topics: []string{testTopic}, + ZookeeperPeers: zkPeers, + PointBuffer: 100000, + Offset: "oldest", + } + p, _ := parsers.NewInfluxParser() + k.SetParser(p) + + // Verify that we can now gather the sent message + var acc testutil.Accumulator + + // Sanity check + assert.Equal(t, 0, len(acc.Metrics), "There should not be any points") + if err := k.Start(&acc); err != nil { + t.Fatal(err.Error()) + } else { + defer k.Stop() + } + + waitForPoint(&acc, t) + + // Gather points + err = acc.GatherError(k.Gather) + require.NoError(t, err) + if len(acc.Metrics) == 1 { + point := acc.Metrics[0] + assert.Equal(t, "cpu_load_short", point.Measurement) + assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Fields) + assert.Equal(t, map[string]string{ + "host": "server01", + "direction": "in", + "region": "us-west", + }, point.Tags) + assert.Equal(t, time.Unix(0, 1422568543702900257).Unix(), point.Time.Unix()) + } else { + t.Errorf("No points found in accumulator, expected 1") + } +} + +// Waits for the metric that was sent to the kafka broker to arrive at the kafka +// consumer +func waitForPoint(acc *testutil.Accumulator, t *testing.T) { + // Give the kafka container up to 2 seconds to get the point to the consumer + ticker := time.NewTicker(5 * time.Millisecond) + counter := 0 + for { + select { + case <-ticker.C: + counter++ + if counter > 1000 { + t.Fatal("Waited for 5s, point never arrived to consumer") + } else if acc.NFields() == 1 { + return + } + } + } +} diff --git a/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_test.go b/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_test.go new file mode 100644 index 000000000..630aca163 --- /dev/null +++ b/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_test.go @@ -0,0 +1,150 @@ +package kafka_consumer_legacy + +import ( + "strings" + "testing" + + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/testutil" + + "github.com/Shopify/sarama" + "github.com/stretchr/testify/assert" +) + +const ( + testMsg = "cpu_load_short,host=server01 value=23422.0 1422568543702900257\n" + testMsgGraphite = "cpu.load.short.graphite 23422 1454780029" + testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n" + invalidMsg = "cpu_load_short,host=server01 1422568543702900257\n" +) + +func newTestKafka() (*Kafka, chan *sarama.ConsumerMessage) { + in := make(chan *sarama.ConsumerMessage, 1000) + k := Kafka{ + ConsumerGroup: "test", + Topics: []string{"telegraf"}, + ZookeeperPeers: []string{"localhost:2181"}, + Offset: "oldest", + in: in, + doNotCommitMsgs: true, + errs: make(chan error, 1000), + done: make(chan struct{}), + } + return &k, in +} + +// Test that the parser parses kafka messages into points +func TestRunParser(t *testing.T) { + k, in := newTestKafka() + acc := testutil.Accumulator{} + k.acc = &acc + defer close(k.done) + + k.parser, _ = parsers.NewInfluxParser() + go k.receiver() + in <- saramaMsg(testMsg) + acc.Wait(1) + + assert.Equal(t, acc.NFields(), 1) +} + +// Test that the parser ignores invalid messages +func TestRunParserInvalidMsg(t *testing.T) { + k, in := newTestKafka() + acc := testutil.Accumulator{} + k.acc = &acc + defer close(k.done) + + k.parser, _ = parsers.NewInfluxParser() + go k.receiver() + in <- saramaMsg(invalidMsg) + acc.WaitError(1) + + assert.Equal(t, acc.NFields(), 0) +} + +// Test that overlong messages are dropped +func TestDropOverlongMsg(t *testing.T) { + const maxMessageLen = 64 * 1024 + k, in := newTestKafka() + k.MaxMessageLen = maxMessageLen + acc := testutil.Accumulator{} + k.acc = &acc + defer close(k.done) + overlongMsg := strings.Repeat("v", maxMessageLen+1) + + go k.receiver() + in <- saramaMsg(overlongMsg) + acc.WaitError(1) + + assert.Equal(t, acc.NFields(), 0) +} + +// Test that the parser parses kafka messages into points +func TestRunParserAndGather(t *testing.T) { + k, in := newTestKafka() + acc := testutil.Accumulator{} + k.acc = &acc + defer close(k.done) + + k.parser, _ = parsers.NewInfluxParser() + go k.receiver() + in <- saramaMsg(testMsg) + acc.Wait(1) + + acc.GatherError(k.Gather) + + assert.Equal(t, acc.NFields(), 1) + acc.AssertContainsFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(23422)}) +} + +// Test that the parser parses kafka messages into points +func TestRunParserAndGatherGraphite(t *testing.T) { + k, in := newTestKafka() + acc := testutil.Accumulator{} + k.acc = &acc + defer close(k.done) + + k.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) + go k.receiver() + in <- saramaMsg(testMsgGraphite) + acc.Wait(1) + + acc.GatherError(k.Gather) + + assert.Equal(t, acc.NFields(), 1) + acc.AssertContainsFields(t, "cpu_load_short_graphite", + map[string]interface{}{"value": float64(23422)}) +} + +// Test that the parser parses kafka messages into points +func TestRunParserAndGatherJSON(t *testing.T) { + k, in := newTestKafka() + acc := testutil.Accumulator{} + k.acc = &acc + defer close(k.done) + + k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil) + go k.receiver() + in <- saramaMsg(testMsgJSON) + acc.Wait(1) + + acc.GatherError(k.Gather) + + assert.Equal(t, acc.NFields(), 2) + acc.AssertContainsFields(t, "kafka_json_test", + map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }) +} + +func saramaMsg(val string) *sarama.ConsumerMessage { + return &sarama.ConsumerMessage{ + Key: nil, + Value: []byte(val), + Offset: 0, + Partition: 0, + } +}