Add Kafka 0.9+ consumer support (#2487)

This commit is contained in:
Seuf 2017-06-08 03:22:28 +02:00 committed by Daniel Nelson
parent 8e309f864a
commit a24f7a0a05
15 changed files with 637 additions and 57 deletions

View File

@ -3,6 +3,7 @@
### Release Notes ### Release Notes
### Features ### Features
- [#2487](https://github.com/influxdata/telegraf/pull/2487): Add Kafka 0.9+ consumer support
- [#2773](https://github.com/influxdata/telegraf/pull/2773): Add support for self-signed certs to InfluxDB input plugin - [#2773](https://github.com/influxdata/telegraf/pull/2773): Add support for self-signed certs to InfluxDB input plugin
- [#2581](https://github.com/influxdata/telegraf/pull/2581): Add Docker container environment variables as tags. Only whitelisted - [#2581](https://github.com/influxdata/telegraf/pull/2581): Add Docker container environment variables as tags. Only whitelisted
- [#2817](https://github.com/influxdata/telegraf/pull/2817): Added timeout option to IPMI sensor plugin - [#2817](https://github.com/influxdata/telegraf/pull/2817): Added timeout option to IPMI sensor plugin

3
Godeps
View File

@ -1,5 +1,5 @@
collectd.org 2ce144541b8903101fb8f1483cc0497a68798122 collectd.org 2ce144541b8903101fb8f1483cc0497a68798122
github.com/Shopify/sarama 574d3147eee384229bf96a5d12c207fe7b5234f3 github.com/Shopify/sarama c01858abb625b73a3af51d0798e4ad42c8147093
github.com/Sirupsen/logrus 61e43dc76f7ee59a82bdf3d71033dc12bea4c77d github.com/Sirupsen/logrus 61e43dc76f7ee59a82bdf3d71033dc12bea4c77d
github.com/aerospike/aerospike-client-go 95e1ad7791bdbca44707fedbb29be42024900d9c github.com/aerospike/aerospike-client-go 95e1ad7791bdbca44707fedbb29be42024900d9c
github.com/amir/raidman c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985 github.com/amir/raidman c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985
@ -52,6 +52,7 @@ github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6
github.com/stretchr/testify 4d4bfba8f1d1027c4fdbe371823030df51419987 github.com/stretchr/testify 4d4bfba8f1d1027c4fdbe371823030df51419987
github.com/vjeantet/grok d73e972b60935c7fec0b4ffbc904ed39ecaf7efe github.com/vjeantet/grok d73e972b60935c7fec0b4ffbc904ed39ecaf7efe
github.com/wvanbergen/kafka bc265fedb9ff5b5c5d3c0fdcef4a819b3523d3ee github.com/wvanbergen/kafka bc265fedb9ff5b5c5d3c0fdcef4a819b3523d3ee
github.com/bsm/sarama-cluster ccdc0803695fbce22f1706d04ded46cd518fd832
github.com/wvanbergen/kazoo-go 968957352185472eacb69215fa3dbfcfdbac1096 github.com/wvanbergen/kazoo-go 968957352185472eacb69215fa3dbfcfdbac1096
github.com/yuin/gopher-lua 66c871e454fcf10251c61bf8eff02d0978cae75a github.com/yuin/gopher-lua 66c871e454fcf10251c61bf8eff02d0978cae75a
github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363 github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363

View File

@ -46,11 +46,15 @@ prepare-windows:
# Run all docker containers necessary for unit tests # Run all docker containers necessary for unit tests
docker-run: docker-run:
docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server:3.9.0 docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server:3.9.0
docker run --name zookeeper -p "2181:2181" -d wurstmeister/zookeeper
docker run --name kafka \ docker run --name kafka \
-e ADVERTISED_HOST=localhost \ --link zookeeper:zookeeper \
-e ADVERTISED_PORT=9092 \ -e KAFKA_ADVERTISED_HOST_NAME=localhost \
-p "2181:2181" -p "9092:9092" \ -e KAFKA_ADVERTISED_PORT=9092 \
-d spotify/kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_CREATE_TOPICS="test:1:1" \
-p "9092:9092" \
-d wurstmeister/kafka
docker run --name elasticsearch -p "9200:9200" -p "9300:9300" -d elasticsearch:5 docker run --name elasticsearch -p "9200:9200" -p "9300:9300" -d elasticsearch:5
docker run --name mysql -p "3306:3306" -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -d mysql docker run --name mysql -p "3306:3306" -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -d mysql
docker run --name memcached -p "11211:11211" -d memcached docker run --name memcached -p "11211:11211" -d memcached
@ -65,11 +69,15 @@ docker-run:
# Run docker containers necessary for CircleCI unit tests # Run docker containers necessary for CircleCI unit tests
docker-run-circle: docker-run-circle:
docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server:3.9.0 docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server:3.9.0
docker run --name zookeeper -p "2181:2181" -d wurstmeister/zookeeper
docker run --name kafka \ docker run --name kafka \
-e ADVERTISED_HOST=localhost \ --link zookeeper:zookeeper \
-e ADVERTISED_PORT=9092 \ -e KAFKA_ADVERTISED_HOST_NAME=localhost \
-p "2181:2181" -p "9092:9092" \ -e KAFKA_ADVERTISED_PORT=9092 \
-d spotify/kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_CREATE_TOPICS="test:1:1" \
-p "9092:9092" \
-d wurstmeister/kafka
docker run --name elasticsearch -p "9200:9200" -p "9300:9300" -d elasticsearch:5 docker run --name elasticsearch -p "9200:9200" -p "9300:9300" -d elasticsearch:5
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
@ -78,8 +86,8 @@ docker-run-circle:
# Kill all docker containers, ignore errors # Kill all docker containers, ignore errors
docker-kill: docker-kill:
-docker kill nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann nats elasticsearch -docker kill nsq aerospike redis rabbitmq postgres memcached mysql zookeeper kafka mqtt riemann nats elasticsearch
-docker rm nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann nats elasticsearch -docker rm nsq aerospike redis rabbitmq postgres memcached mysql zookeeper kafka mqtt riemann nats elasticsearch
# Run full unit tests using docker containers (includes setup and teardown) # Run full unit tests using docker containers (includes setup and teardown)
test: vet docker-kill docker-run test: vet docker-kill docker-run

View File

@ -10,6 +10,7 @@ works:
- github.com/aws/aws-sdk-go [APACHE](https://github.com/aws/aws-sdk-go/blob/master/LICENSE.txt) - github.com/aws/aws-sdk-go [APACHE](https://github.com/aws/aws-sdk-go/blob/master/LICENSE.txt)
- github.com/beorn7/perks [MIT](https://github.com/beorn7/perks/blob/master/LICENSE) - github.com/beorn7/perks [MIT](https://github.com/beorn7/perks/blob/master/LICENSE)
- github.com/boltdb/bolt [MIT](https://github.com/boltdb/bolt/blob/master/LICENSE) - github.com/boltdb/bolt [MIT](https://github.com/boltdb/bolt/blob/master/LICENSE)
- github.com/bsm/sarama-cluster [MIT](https://github.com/bsm/sarama-cluster/blob/master/LICENSE)
- github.com/cenkalti/backoff [MIT](https://github.com/cenkalti/backoff/blob/master/LICENSE) - github.com/cenkalti/backoff [MIT](https://github.com/cenkalti/backoff/blob/master/LICENSE)
- github.com/couchbase/go-couchbase [MIT](https://github.com/couchbase/go-couchbase/blob/master/LICENSE) - github.com/couchbase/go-couchbase [MIT](https://github.com/couchbase/go-couchbase/blob/master/LICENSE)
- github.com/couchbase/gomemcached [MIT](https://github.com/couchbase/gomemcached/blob/master/LICENSE) - github.com/couchbase/gomemcached [MIT](https://github.com/couchbase/gomemcached/blob/master/LICENSE)

View File

@ -2198,11 +2198,42 @@
# ## 0 means to use the default of 65536 bytes (64 kibibytes) # ## 0 means to use the default of 65536 bytes (64 kibibytes)
# max_line_size = 0 # max_line_size = 0
# # Read metrics from Kafka 0.9+ topic(s)
# # Read metrics from Kafka topic(s)
# [[inputs.kafka_consumer]] # [[inputs.kafka_consumer]]
# ## topic(s) to consume # ## topic(s) to consume
# topics = ["telegraf"] # topics = ["telegraf"]
# ## kafka servers
# brokers = ["localhost:9092"]
# ## the name of the consumer group
# consumer_group = "telegraf_metrics_consumers"
# ## Offset (must be either "oldest" or "newest")
# offset = "oldest"
#
# ## Optional SSL Config
# # ssl_ca = "/etc/telegraf/ca.pem"
# # ssl_cert = "/etc/telegraf/cert.pem"
# # ssl_key = "/etc/telegraf/key.pem"
# ## Use SSL but skip chain & host verification
# # insecure_skip_verify = false
#
# ## Optional SASL Config
# # sasl_username = "kafka"
# # sasl_password = "secret"
#
# ## Data format to consume.
# ## Each data format has its own unique set of configuration options, read
# ## more about them here:
# ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
# data_format = "influx"
#
# ## Maximum length of a message to consume, in bytes (default 0/unlimited);
# ## larger messages are dropped
# max_message_len = 65536
# # Read metrics from Kafka (0.8 or less) topic(s)
# [[inputs.kafka_consumer_legacy]]
# ## topic(s) to consume
# topics = ["telegraf"]
# ## an array of Zookeeper connection strings # ## an array of Zookeeper connection strings
# zookeeper_peers = ["localhost:2181"] # zookeeper_peers = ["localhost:2181"]
# ## Zookeeper Chroot # ## Zookeeper Chroot

View File

@ -143,19 +143,31 @@
[[inputs.diskio]] [[inputs.diskio]]
# no configuration # no configuration
# read metrics from a Kafka topic # read metrics from a Kafka 0.9+ topic
[[inputs.kafka_consumer]] [[inputs.kafka_consumer]]
# topic(s) to consume ## kafka brokers
brokers = ["localhost:9092"]
## topic(s) to consume
topics = ["telegraf"]
## the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
## Offset (must be either "oldest" or "newest")
offset = "oldest"
# read metrics from a Kafka legacy topic
[[inputs.kafka_consumer_legacy]]
## topic(s) to consume
topics = ["telegraf"] topics = ["telegraf"]
# an array of Zookeeper connection strings # an array of Zookeeper connection strings
zookeeper_peers = ["localhost:2181"] zookeeper_peers = ["localhost:2181"]
# the name of the consumer group ## the name of the consumer group
consumer_group = "telegraf_metrics_consumers" consumer_group = "telegraf_metrics_consumers"
# Maximum number of points to buffer between collection intervals # Maximum number of points to buffer between collection intervals
point_buffer = 100000 point_buffer = 100000
# Offset (must be either "oldest" or "newest") ## Offset (must be either "oldest" or "newest")
offset = "oldest" offset = "oldest"
# Read metrics from a LeoFS Server via SNMP # Read metrics from a LeoFS Server via SNMP
[[inputs.leofs]] [[inputs.leofs]]
# An array of URI to gather stats about LeoFS. # An array of URI to gather stats about LeoFS.

View File

@ -35,6 +35,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/iptables" _ "github.com/influxdata/telegraf/plugins/inputs/iptables"
_ "github.com/influxdata/telegraf/plugins/inputs/jolokia" _ "github.com/influxdata/telegraf/plugins/inputs/jolokia"
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer_legacy"
_ "github.com/influxdata/telegraf/plugins/inputs/kapacitor" _ "github.com/influxdata/telegraf/plugins/inputs/kapacitor"
_ "github.com/influxdata/telegraf/plugins/inputs/kubernetes" _ "github.com/influxdata/telegraf/plugins/inputs/kubernetes"
_ "github.com/influxdata/telegraf/plugins/inputs/leofs" _ "github.com/influxdata/telegraf/plugins/inputs/leofs"

View File

@ -6,6 +6,9 @@ line protocol. [Consumer Group](http://godoc.org/github.com/wvanbergen/kafka/con
is used to talk to the Kafka cluster so multiple instances of telegraf can read is used to talk to the Kafka cluster so multiple instances of telegraf can read
from the same topic in parallel. from the same topic in parallel.
For old kafka version (< 0.8), please use the kafka_consumer_legacy input plugin
and use the old zookeeper connection method.
## Configuration ## Configuration
```toml ```toml
@ -13,15 +16,23 @@ from the same topic in parallel.
[[inputs.kafka_consumer]] [[inputs.kafka_consumer]]
## topic(s) to consume ## topic(s) to consume
topics = ["telegraf"] topics = ["telegraf"]
## an array of Zookeeper connection strings brokers = ["localhost:9092"]
zookeeper_peers = ["localhost:2181"]
## Zookeeper Chroot
zookeeper_chroot = ""
## the name of the consumer group ## the name of the consumer group
consumer_group = "telegraf_metrics_consumers" consumer_group = "telegraf_metrics_consumers"
## Offset (must be either "oldest" or "newest") ## Offset (must be either "oldest" or "newest")
offset = "oldest" offset = "oldest"
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
## Optional SASL Config
# sasl_username = "kafka"
# sasl_password = "secret"
## Data format to consume. ## Data format to consume.
## Each data format has its own unique set of configuration options, read ## Each data format has its own unique set of configuration options, read
## more about them here: ## more about them here:

View File

@ -7,20 +7,35 @@ import (
"sync" "sync"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/wvanbergen/kafka/consumergroup" cluster "github.com/bsm/sarama-cluster"
) )
type Kafka struct { type Kafka struct {
ConsumerGroup string ConsumerGroup string
Topics []string Topics []string
Brokers []string
MaxMessageLen int MaxMessageLen int
ZookeeperPeers []string
ZookeeperChroot string Cluster *cluster.Consumer
Consumer *consumergroup.ConsumerGroup
// Verify Kafka SSL Certificate
InsecureSkipVerify bool
// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`
// SASL Username
SASLUsername string `toml:"sasl_username"`
// SASL Password
SASLPassword string `toml:"sasl_password"`
// Legacy metric buffer support // Legacy metric buffer support
MetricBuffer int MetricBuffer int
@ -47,12 +62,22 @@ type Kafka struct {
} }
var sampleConfig = ` var sampleConfig = `
## kafka servers
brokers = ["localhost:9092"]
## topic(s) to consume ## topic(s) to consume
topics = ["telegraf"] topics = ["telegraf"]
## an array of Zookeeper connection strings
zookeeper_peers = ["localhost:2181"] ## Optional SSL Config
## Zookeeper Chroot # ssl_ca = "/etc/telegraf/ca.pem"
zookeeper_chroot = "" # ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
## Optional SASL Config
# sasl_username = "kafka"
# sasl_password = "secret"
## the name of the consumer group ## the name of the consumer group
consumer_group = "telegraf_metrics_consumers" consumer_group = "telegraf_metrics_consumers"
## Offset (must be either "oldest" or "newest") ## Offset (must be either "oldest" or "newest")
@ -84,45 +109,67 @@ func (k *Kafka) SetParser(parser parsers.Parser) {
func (k *Kafka) Start(acc telegraf.Accumulator) error { func (k *Kafka) Start(acc telegraf.Accumulator) error {
k.Lock() k.Lock()
defer k.Unlock() defer k.Unlock()
var consumerErr error var clusterErr error
k.acc = acc k.acc = acc
config := consumergroup.NewConfig() config := cluster.NewConfig()
config.Zookeeper.Chroot = k.ZookeeperChroot config.Consumer.Return.Errors = true
tlsConfig, err := internal.GetTLSConfig(
k.SSLCert, k.SSLKey, k.SSLCA, k.InsecureSkipVerify)
if err != nil {
return err
}
if tlsConfig != nil {
log.Printf("D! TLS Enabled")
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
}
if k.SASLUsername != "" && k.SASLPassword != "" {
log.Printf("D! Using SASL auth with username '%s',",
k.SASLUsername)
config.Net.SASL.User = k.SASLUsername
config.Net.SASL.Password = k.SASLPassword
config.Net.SASL.Enable = true
}
switch strings.ToLower(k.Offset) { switch strings.ToLower(k.Offset) {
case "oldest", "": case "oldest", "":
config.Offsets.Initial = sarama.OffsetOldest config.Consumer.Offsets.Initial = sarama.OffsetOldest
case "newest": case "newest":
config.Offsets.Initial = sarama.OffsetNewest config.Consumer.Offsets.Initial = sarama.OffsetNewest
default: default:
log.Printf("I! WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n", log.Printf("I! WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n",
k.Offset) k.Offset)
config.Offsets.Initial = sarama.OffsetOldest config.Consumer.Offsets.Initial = sarama.OffsetOldest
} }
if k.Consumer == nil || k.Consumer.Closed() { if k.Cluster == nil {
k.Consumer, consumerErr = consumergroup.JoinConsumerGroup( k.Cluster, clusterErr = cluster.NewConsumer(
k.Brokers,
k.ConsumerGroup, k.ConsumerGroup,
k.Topics, k.Topics,
k.ZookeeperPeers,
config, config,
) )
if consumerErr != nil {
return consumerErr if clusterErr != nil {
log.Printf("E! Error when creating Kafka Consumer, brokers: %v, topics: %v\n",
k.Brokers, k.Topics)
return clusterErr
} }
// Setup message and error channels // Setup message and error channels
k.in = k.Consumer.Messages() k.in = k.Cluster.Messages()
k.errs = k.Consumer.Errors() k.errs = k.Cluster.Errors()
} }
k.done = make(chan struct{}) k.done = make(chan struct{})
// Start the kafka message reader // Start the kafka message reader
go k.receiver() go k.receiver()
log.Printf("I! Started the kafka consumer service, peers: %v, topics: %v\n", log.Printf("I! Started the kafka consumer service, brokers: %v, topics: %v\n",
k.ZookeeperPeers, k.Topics) k.Brokers, k.Topics)
return nil return nil
} }
@ -156,7 +203,7 @@ func (k *Kafka) receiver() {
// TODO(cam) this locking can be removed if this PR gets merged: // TODO(cam) this locking can be removed if this PR gets merged:
// https://github.com/wvanbergen/kafka/pull/84 // https://github.com/wvanbergen/kafka/pull/84
k.Lock() k.Lock()
k.Consumer.CommitUpto(msg) k.Cluster.MarkOffset(msg, "")
k.Unlock() k.Unlock()
} }
} }
@ -167,7 +214,7 @@ func (k *Kafka) Stop() {
k.Lock() k.Lock()
defer k.Unlock() defer k.Unlock()
close(k.done) close(k.done)
if err := k.Consumer.Close(); err != nil { if err := k.Cluster.Close(); err != nil {
k.acc.AddError(fmt.Errorf("Error closing consumer: %s\n", err.Error())) k.acc.AddError(fmt.Errorf("Error closing consumer: %s\n", err.Error()))
} }
} }

View File

@ -19,7 +19,6 @@ func TestReadsMetricsFromKafka(t *testing.T) {
} }
brokerPeers := []string{testutil.GetLocalHost() + ":9092"} brokerPeers := []string{testutil.GetLocalHost() + ":9092"}
zkPeers := []string{testutil.GetLocalHost() + ":2181"}
testTopic := fmt.Sprintf("telegraf_test_topic_%d", time.Now().Unix()) testTopic := fmt.Sprintf("telegraf_test_topic_%d", time.Now().Unix())
// Send a Kafka message to the kafka host // Send a Kafka message to the kafka host
@ -38,7 +37,7 @@ func TestReadsMetricsFromKafka(t *testing.T) {
k := &Kafka{ k := &Kafka{
ConsumerGroup: "telegraf_test_consumers", ConsumerGroup: "telegraf_test_consumers",
Topics: []string{testTopic}, Topics: []string{testTopic},
ZookeeperPeers: zkPeers, Brokers: brokerPeers,
PointBuffer: 100000, PointBuffer: 100000,
Offset: "oldest", Offset: "oldest",
} }

View File

@ -23,7 +23,7 @@ func newTestKafka() (*Kafka, chan *sarama.ConsumerMessage) {
k := Kafka{ k := Kafka{
ConsumerGroup: "test", ConsumerGroup: "test",
Topics: []string{"telegraf"}, Topics: []string{"telegraf"},
ZookeeperPeers: []string{"localhost:2181"}, Brokers: []string{"localhost:9092"},
Offset: "oldest", Offset: "oldest",
in: in, in: in,
doNotCommitMsgs: true, doNotCommitMsgs: true,

View File

@ -0,0 +1,39 @@
# Kafka Consumer Input Plugin
The [Kafka](http://kafka.apache.org/) consumer plugin polls a specified Kafka
topic and adds messages to InfluxDB. The plugin assumes messages follow the
line protocol. [Consumer Group](http://godoc.org/github.com/wvanbergen/kafka/consumergroup)
is used to talk to the Kafka cluster so multiple instances of telegraf can read
from the same topic in parallel.
## Configuration
```toml
# Read metrics from Kafka topic(s)
[[inputs.kafka_consumer]]
## topic(s) to consume
topics = ["telegraf"]
## an array of Zookeeper connection strings
zookeeper_peers = ["localhost:2181"]
## Zookeeper Chroot
zookeeper_chroot = ""
## the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
## Offset (must be either "oldest" or "newest")
offset = "oldest"
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 65536
```
## Testing
Running integration tests requires running Zookeeper & Kafka. See Makefile
for kafka container command.

View File

@ -0,0 +1,183 @@
package kafka_consumer_legacy
import (
"fmt"
"log"
"strings"
"sync"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/Shopify/sarama"
"github.com/wvanbergen/kafka/consumergroup"
)
type Kafka struct {
ConsumerGroup string
Topics []string
MaxMessageLen int
ZookeeperPeers []string
ZookeeperChroot string
Consumer *consumergroup.ConsumerGroup
// Legacy metric buffer support
MetricBuffer int
// TODO remove PointBuffer, legacy support
PointBuffer int
Offset string
parser parsers.Parser
sync.Mutex
// channel for all incoming kafka messages
in <-chan *sarama.ConsumerMessage
// channel for all kafka consumer errors
errs <-chan error
done chan struct{}
// keep the accumulator internally:
acc telegraf.Accumulator
// doNotCommitMsgs tells the parser not to call CommitUpTo on the consumer
// this is mostly for test purposes, but there may be a use-case for it later.
doNotCommitMsgs bool
}
var sampleConfig = `
## topic(s) to consume
topics = ["telegraf"]
## an array of Zookeeper connection strings
zookeeper_peers = ["localhost:2181"]
## Zookeeper Chroot
zookeeper_chroot = ""
## the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
## Offset (must be either "oldest" or "newest")
offset = "oldest"
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 65536
`
func (k *Kafka) SampleConfig() string {
return sampleConfig
}
func (k *Kafka) Description() string {
return "Read metrics from Kafka topic(s)"
}
func (k *Kafka) SetParser(parser parsers.Parser) {
k.parser = parser
}
func (k *Kafka) Start(acc telegraf.Accumulator) error {
k.Lock()
defer k.Unlock()
var consumerErr error
k.acc = acc
config := consumergroup.NewConfig()
config.Zookeeper.Chroot = k.ZookeeperChroot
switch strings.ToLower(k.Offset) {
case "oldest", "":
config.Offsets.Initial = sarama.OffsetOldest
case "newest":
config.Offsets.Initial = sarama.OffsetNewest
default:
log.Printf("I! WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n",
k.Offset)
config.Offsets.Initial = sarama.OffsetOldest
}
if k.Consumer == nil || k.Consumer.Closed() {
k.Consumer, consumerErr = consumergroup.JoinConsumerGroup(
k.ConsumerGroup,
k.Topics,
k.ZookeeperPeers,
config,
)
if consumerErr != nil {
return consumerErr
}
// Setup message and error channels
k.in = k.Consumer.Messages()
k.errs = k.Consumer.Errors()
}
k.done = make(chan struct{})
// Start the kafka message reader
go k.receiver()
log.Printf("I! Started the kafka consumer service, peers: %v, topics: %v\n",
k.ZookeeperPeers, k.Topics)
return nil
}
// receiver() reads all incoming messages from the consumer, and parses them into
// influxdb metric points.
func (k *Kafka) receiver() {
for {
select {
case <-k.done:
return
case err := <-k.errs:
if err != nil {
k.acc.AddError(fmt.Errorf("Consumer Error: %s\n", err))
}
case msg := <-k.in:
if k.MaxMessageLen != 0 && len(msg.Value) > k.MaxMessageLen {
k.acc.AddError(fmt.Errorf("Message longer than max_message_len (%d > %d)",
len(msg.Value), k.MaxMessageLen))
} else {
metrics, err := k.parser.Parse(msg.Value)
if err != nil {
k.acc.AddError(fmt.Errorf("Message Parse Error\nmessage: %s\nerror: %s",
string(msg.Value), err.Error()))
}
for _, metric := range metrics {
k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
}
if !k.doNotCommitMsgs {
// TODO(cam) this locking can be removed if this PR gets merged:
// https://github.com/wvanbergen/kafka/pull/84
k.Lock()
k.Consumer.CommitUpto(msg)
k.Unlock()
}
}
}
}
func (k *Kafka) Stop() {
k.Lock()
defer k.Unlock()
close(k.done)
if err := k.Consumer.Close(); err != nil {
k.acc.AddError(fmt.Errorf("Error closing consumer: %s\n", err.Error()))
}
}
func (k *Kafka) Gather(acc telegraf.Accumulator) error {
return nil
}
func init() {
inputs.Add("kafka_consumer_legacy", func() telegraf.Input {
return &Kafka{}
})
}

View File

@ -0,0 +1,96 @@
package kafka_consumer_legacy
import (
"fmt"
"testing"
"time"
"github.com/Shopify/sarama"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/parsers"
)
func TestReadsMetricsFromKafka(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
brokerPeers := []string{testutil.GetLocalHost() + ":9092"}
zkPeers := []string{testutil.GetLocalHost() + ":2181"}
testTopic := fmt.Sprintf("telegraf_test_topic_legacy_%d", time.Now().Unix())
// Send a Kafka message to the kafka host
msg := "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257\n"
producer, err := sarama.NewSyncProducer(brokerPeers, nil)
require.NoError(t, err)
_, _, err = producer.SendMessage(
&sarama.ProducerMessage{
Topic: testTopic,
Value: sarama.StringEncoder(msg),
})
require.NoError(t, err)
defer producer.Close()
// Start the Kafka Consumer
k := &Kafka{
ConsumerGroup: "telegraf_test_consumers",
Topics: []string{testTopic},
ZookeeperPeers: zkPeers,
PointBuffer: 100000,
Offset: "oldest",
}
p, _ := parsers.NewInfluxParser()
k.SetParser(p)
// Verify that we can now gather the sent message
var acc testutil.Accumulator
// Sanity check
assert.Equal(t, 0, len(acc.Metrics), "There should not be any points")
if err := k.Start(&acc); err != nil {
t.Fatal(err.Error())
} else {
defer k.Stop()
}
waitForPoint(&acc, t)
// Gather points
err = acc.GatherError(k.Gather)
require.NoError(t, err)
if len(acc.Metrics) == 1 {
point := acc.Metrics[0]
assert.Equal(t, "cpu_load_short", point.Measurement)
assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Fields)
assert.Equal(t, map[string]string{
"host": "server01",
"direction": "in",
"region": "us-west",
}, point.Tags)
assert.Equal(t, time.Unix(0, 1422568543702900257).Unix(), point.Time.Unix())
} else {
t.Errorf("No points found in accumulator, expected 1")
}
}
// Waits for the metric that was sent to the kafka broker to arrive at the kafka
// consumer
func waitForPoint(acc *testutil.Accumulator, t *testing.T) {
// Give the kafka container up to 2 seconds to get the point to the consumer
ticker := time.NewTicker(5 * time.Millisecond)
counter := 0
for {
select {
case <-ticker.C:
counter++
if counter > 1000 {
t.Fatal("Waited for 5s, point never arrived to consumer")
} else if acc.NFields() == 1 {
return
}
}
}
}

View File

@ -0,0 +1,150 @@
package kafka_consumer_legacy
import (
"strings"
"testing"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"
)
const (
testMsg = "cpu_load_short,host=server01 value=23422.0 1422568543702900257\n"
testMsgGraphite = "cpu.load.short.graphite 23422 1454780029"
testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n"
invalidMsg = "cpu_load_short,host=server01 1422568543702900257\n"
)
func newTestKafka() (*Kafka, chan *sarama.ConsumerMessage) {
in := make(chan *sarama.ConsumerMessage, 1000)
k := Kafka{
ConsumerGroup: "test",
Topics: []string{"telegraf"},
ZookeeperPeers: []string{"localhost:2181"},
Offset: "oldest",
in: in,
doNotCommitMsgs: true,
errs: make(chan error, 1000),
done: make(chan struct{}),
}
return &k, in
}
// Test that the parser parses kafka messages into points
func TestRunParser(t *testing.T) {
k, in := newTestKafka()
acc := testutil.Accumulator{}
k.acc = &acc
defer close(k.done)
k.parser, _ = parsers.NewInfluxParser()
go k.receiver()
in <- saramaMsg(testMsg)
acc.Wait(1)
assert.Equal(t, acc.NFields(), 1)
}
// Test that the parser ignores invalid messages
func TestRunParserInvalidMsg(t *testing.T) {
k, in := newTestKafka()
acc := testutil.Accumulator{}
k.acc = &acc
defer close(k.done)
k.parser, _ = parsers.NewInfluxParser()
go k.receiver()
in <- saramaMsg(invalidMsg)
acc.WaitError(1)
assert.Equal(t, acc.NFields(), 0)
}
// Test that overlong messages are dropped
func TestDropOverlongMsg(t *testing.T) {
const maxMessageLen = 64 * 1024
k, in := newTestKafka()
k.MaxMessageLen = maxMessageLen
acc := testutil.Accumulator{}
k.acc = &acc
defer close(k.done)
overlongMsg := strings.Repeat("v", maxMessageLen+1)
go k.receiver()
in <- saramaMsg(overlongMsg)
acc.WaitError(1)
assert.Equal(t, acc.NFields(), 0)
}
// Test that the parser parses kafka messages into points
func TestRunParserAndGather(t *testing.T) {
k, in := newTestKafka()
acc := testutil.Accumulator{}
k.acc = &acc
defer close(k.done)
k.parser, _ = parsers.NewInfluxParser()
go k.receiver()
in <- saramaMsg(testMsg)
acc.Wait(1)
acc.GatherError(k.Gather)
assert.Equal(t, acc.NFields(), 1)
acc.AssertContainsFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(23422)})
}
// Test that the parser parses kafka messages into points
func TestRunParserAndGatherGraphite(t *testing.T) {
k, in := newTestKafka()
acc := testutil.Accumulator{}
k.acc = &acc
defer close(k.done)
k.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
go k.receiver()
in <- saramaMsg(testMsgGraphite)
acc.Wait(1)
acc.GatherError(k.Gather)
assert.Equal(t, acc.NFields(), 1)
acc.AssertContainsFields(t, "cpu_load_short_graphite",
map[string]interface{}{"value": float64(23422)})
}
// Test that the parser parses kafka messages into points
func TestRunParserAndGatherJSON(t *testing.T) {
k, in := newTestKafka()
acc := testutil.Accumulator{}
k.acc = &acc
defer close(k.done)
k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
go k.receiver()
in <- saramaMsg(testMsgJSON)
acc.Wait(1)
acc.GatherError(k.Gather)
assert.Equal(t, acc.NFields(), 2)
acc.AssertContainsFields(t, "kafka_json_test",
map[string]interface{}{
"a": float64(5),
"b_c": float64(6),
})
}
func saramaMsg(val string) *sarama.ConsumerMessage {
return &sarama.ConsumerMessage{
Key: nil,
Value: []byte(val),
Offset: 0,
Partition: 0,
}
}