add kafka new consumer support

add kafka new consumer support
This commit is contained in:
Kirk Young 2016-07-27 10:53:04 -07:00
parent 412f5b5acb
commit e863c6f072
3 changed files with 184 additions and 36 deletions

1
Godeps
View File

@ -4,6 +4,7 @@ github.com/aerospike/aerospike-client-go 45863b7fd8640dc12f7fdd397104d97e1986f25
github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687 github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687
github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857 github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857
github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4 github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4
github.com/bsm/sarama-cluster dc1a390cf63c40d0a20ee9f79c4be120f79110e5
github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99 github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99
github.com/couchbase/go-couchbase cb664315a324d87d19c879d9cc67fda6be8c2ac1 github.com/couchbase/go-couchbase cb664315a324d87d19c879d9cc67fda6be8c2ac1
github.com/couchbase/gomemcached a5ea6356f648fec6ab89add00edd09151455b4b2 github.com/couchbase/gomemcached a5ea6356f648fec6ab89add00edd09151455b4b2

View File

@ -6,11 +6,15 @@ line protocol. [Consumer Group](http://godoc.org/github.com/wvanbergen/kafka/con
is used to talk to the Kafka cluster so multiple instances of telegraf can read is used to talk to the Kafka cluster so multiple instances of telegraf can read
from the same topic in parallel. from the same topic in parallel.
## Configuration Now supports kafka new consumer (version 0.9+) with TLS
## Configuration[0.8]
```toml ```toml
# Read metrics from Kafka topic(s) # Read metrics from Kafka topic(s)
[[inputs.kafka_consumer]] [[inputs.kafka_consumer]]
## is new consumer?
new_consumer = false
## topic(s) to consume ## topic(s) to consume
topics = ["telegraf"] topics = ["telegraf"]
## an array of Zookeeper connection strings ## an array of Zookeeper connection strings
@ -30,6 +34,41 @@ from the same topic in parallel.
data_format = "influx" data_format = "influx"
``` ```
## Configuration[0.9+]
```toml
# Read metrics from Kafka topic(s)
[[inputs.kafka_consumer]]
## is new consumer?
new_consumer = true
## topic(s) to consume
topics = ["telegraf"]
## an array of kafka 0.9+ brokers
broker_list = ["localhost:9092"]
## the name of the consumer group
consumer_group = "telegraf_kafka_consumer_group"
## Offset (must be either "oldest" or "newest")
offset = "oldest"
## Optional SSL Config
ssl_ca = "/etc/telegraf/ca.pem"
ssl_cert = "/etc/telegraf/cert.pem"
ssl_key = "/etc/telegraf/cert.key"
## Use SSL but skip chain & host verification
insecure_skip_verify = false
## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
```
## Testing ## Testing
Running integration tests requires running Zookeeper & Kafka. See Makefile Running integration tests requires running Zookeeper & Kafka. See Makefile

View File

@ -1,39 +1,65 @@
package kafka_consumer package kafka_consumer
import ( import (
"crypto/tls"
"log" "log"
"strings" "strings"
"sync" "sync"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"github.com/wvanbergen/kafka/consumergroup" "github.com/wvanbergen/kafka/consumergroup"
) )
type Kafka struct { type Kafka struct {
ConsumerGroup string // new kafka consumer
Topics []string NewConsumer bool
// common for both versions
ConsumerGroup string
Topics []string
Offset string
// for 0.8
ZookeeperPeers []string ZookeeperPeers []string
ZookeeperChroot string ZookeeperChroot string
Consumer *consumergroup.ConsumerGroup Consumer *consumergroup.ConsumerGroup
// for 0.9+
BrokerList []string
Consumer9 *cluster.Consumer
// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`
// Skip SSL verification
InsecureSkipVerify bool
tlsConfig tls.Config
// Legacy metric buffer support // Legacy metric buffer support
MetricBuffer int MetricBuffer int
// TODO remove PointBuffer, legacy support // TODO remove PointBuffer, legacy support
PointBuffer int PointBuffer int
Offset string
parser parsers.Parser parser parsers.Parser
sync.Mutex sync.Mutex
// channel for all incoming kafka messages // channel for all incoming kafka messages
in <-chan *sarama.ConsumerMessage in <-chan *sarama.ConsumerMessage
// channel for all kafka consumer errors // channel for all kafka consumer errors
errs <-chan *sarama.ConsumerError errs <-chan *sarama.ConsumerError
errs9 <-chan error
done chan struct{} done chan struct{}
// keep the accumulator internally: // keep the accumulator internally:
@ -45,6 +71,8 @@ type Kafka struct {
} }
var sampleConfig = ` var sampleConfig = `
## is new consumer?
new_consumer = false
## topic(s) to consume ## topic(s) to consume
topics = ["telegraf"] topics = ["telegraf"]
## an array of Zookeeper connection strings ## an array of Zookeeper connection strings
@ -82,41 +110,88 @@ func (k *Kafka) Start(acc telegraf.Accumulator) error {
k.acc = acc k.acc = acc
config := consumergroup.NewConfig() log.Println(k.NewConsumer)
config.Zookeeper.Chroot = k.ZookeeperChroot
switch strings.ToLower(k.Offset) {
case "oldest", "":
config.Offsets.Initial = sarama.OffsetOldest
case "newest":
config.Offsets.Initial = sarama.OffsetNewest
default:
log.Printf("WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n",
k.Offset)
config.Offsets.Initial = sarama.OffsetOldest
}
if k.Consumer == nil || k.Consumer.Closed() { if !k.NewConsumer {
k.Consumer, consumerErr = consumergroup.JoinConsumerGroup( config := consumergroup.NewConfig()
k.ConsumerGroup,
k.Topics, config.Zookeeper.Chroot = k.ZookeeperChroot
k.ZookeeperPeers, switch strings.ToLower(k.Offset) {
config, case "oldest", "":
) config.Offsets.Initial = sarama.OffsetOldest
if consumerErr != nil { case "newest":
return consumerErr config.Offsets.Initial = sarama.OffsetNewest
default:
log.Printf("WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n",
k.Offset)
config.Offsets.Initial = sarama.OffsetOldest
} }
if k.Consumer == nil || k.Consumer.Closed() {
k.Consumer, consumerErr = consumergroup.JoinConsumerGroup(
k.ConsumerGroup,
k.Topics,
k.ZookeeperPeers,
config,
)
if consumerErr != nil {
return consumerErr
}
// Setup message and error channels
k.in = k.Consumer.Messages()
k.errs = k.Consumer.Errors()
}
k.done = make(chan struct{})
// Start the kafka message reader
go k.receiver()
log.Printf("Started the kafka consumer service, peers: %v, topics: %v\n",
k.ZookeeperPeers, k.Topics)
} else {
config := cluster.NewConfig()
tlsConfig, err := internal.GetTLSConfig(k.SSLCert, k.SSLKey, k.SSLCA, k.InsecureSkipVerify)
if err != nil {
return err
}
if tlsConfig != nil {
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
}
switch strings.ToLower(k.Offset) {
case "oldest", "":
config.Consumer.Offsets.Initial = sarama.OffsetOldest
case "newest":
config.Consumer.Offsets.Initial = sarama.OffsetNewest
default:
log.Printf("WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n",
k.Offset)
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}
// TODO: make this configurable
config.Consumer.Return.Errors = true
if err := config.Validate(); err != nil {
return err
}
k.Consumer9, err = cluster.NewConsumer(k.BrokerList, k.ConsumerGroup, k.Topics, config)
if err != nil {
return err
}
// Setup message and error channels // Setup message and error channels
k.in = k.Consumer.Messages() k.in = k.Consumer9.Messages()
k.errs = k.Consumer.Errors() k.errs9 = k.Consumer9.Errors()
k.done = make(chan struct{})
// Start the kafka message reader for 0.9
go k.collector()
log.Printf("Started the kafka consumer service with new consumer, brokers: %v, topics: %v\n",
k.BrokerList, k.Topics)
} }
k.done = make(chan struct{})
// Start the kafka message reader
go k.receiver()
log.Printf("Started the kafka consumer service, peers: %v, topics: %v\n",
k.ZookeeperPeers, k.Topics)
return nil return nil
} }
@ -151,12 +226,45 @@ func (k *Kafka) receiver() {
} }
} }
// this is for kafka new consumer
func (k *Kafka) collector() {
for {
select {
case <-k.done:
return
case err := <-k.errs9:
log.Printf("Kafka Consumer Error: %s\n", err.Error())
case msg := <-k.in:
metrics, err := k.parser.Parse(msg.Value)
if err != nil {
log.Printf("KAFKA PARSE ERROR\nmessage: %s\nerror: %s",
string(msg.Value), err.Error())
}
for _, metric := range metrics {
k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
if !k.doNotCommitMsgs {
k.Consumer9.MarkOffset(msg, "")
}
}
}
}
func (k *Kafka) Stop() { func (k *Kafka) Stop() {
k.Lock() k.Lock()
defer k.Unlock() defer k.Unlock()
close(k.done) close(k.done)
if err := k.Consumer.Close(); err != nil { if !k.NewConsumer {
log.Printf("Error closing kafka consumer: %s\n", err.Error()) if err := k.Consumer.Close(); err != nil {
log.Printf("Error closing kafka consumer: %s\n", err.Error())
}
} else {
if err := k.Consumer9.Close(); err != nil {
log.Printf("Error closing kafka consumer: %s\n", err.Error())
}
} }
} }