Files
telegraf/plugins/kafka_consumer/kafka_consumer.go
Emil Stolarsky 0692b4be61 Add Kafka Consumer Plugin
The Kafka consumer plugin polls a specified Kafka topic and adds messages to
InfluxDB. The plugin assumes messages follow the line protocol. Consumer Group
is used to talk to the Kafka cluster so multiple instances of telegraf can read
from the same topic in parallel.
2015-07-02 15:40:13 -04:00

154 lines
3.2 KiB
Go

package kafka_consumer
import (
"os"
"os/signal"
"time"
"github.com/influxdb/influxdb/tsdb"
"github.com/influxdb/telegraf/plugins"
"github.com/wvanbergen/kafka/consumergroup"
"gopkg.in/Shopify/sarama.v1"
)
type Kafka struct {
ConsumerGroupName string
Topic string
ZookeeperPeers []string
Consumer *consumergroup.ConsumerGroup
BatchSize int
}
var sampleConfig = `
# topic to consume
topic = "topic_with_metrics"
# the name of the consumer group
consumerGroupName = "telegraf_metrics_consumers"
# an array of Zookeeper connection strings
zookeeperPeers = ["localhost:2181"]
# Batch size of points sent to InfluxDB
batchSize = 1000`
func (k *Kafka) SampleConfig() string {
return sampleConfig
}
func (k *Kafka) Description() string {
return "read metrics from a Kafka topic"
}
type Metric struct {
Measurement string `json:"measurement"`
Values map[string]interface{} `json:"values"`
Tags map[string]string `json:"tags"`
Time time.Time `json:"time"`
}
func (k *Kafka) Gather(acc plugins.Accumulator) error {
var consumerErr error
metricQueue := make(chan []byte, 200)
if k.Consumer == nil {
k.Consumer, consumerErr = consumergroup.JoinConsumerGroup(
k.ConsumerGroupName,
[]string{k.Topic},
k.ZookeeperPeers,
nil,
)
if consumerErr != nil {
return consumerErr
}
c := make(chan os.Signal, 1)
halt := make(chan bool, 1)
signal.Notify(c, os.Interrupt)
go func() {
<-c
halt <- true
emitMetrics(k, acc, metricQueue)
k.Consumer.Close()
}()
go readFromKafka(k.Consumer.Messages(), metricQueue, k.BatchSize, k.Consumer.CommitUpto, halt)
}
return emitMetrics(k, acc, metricQueue)
}
func emitMetrics(k *Kafka, acc plugins.Accumulator, metricConsumer <-chan []byte) error {
timeout := time.After(1 * time.Second)
for {
select {
case batch := <-metricConsumer:
var points []tsdb.Point
var err error
if points, err = tsdb.ParsePoints(batch); err != nil {
return err
}
for _, point := range points {
acc.AddValuesWithTime(point.Name(), point.Fields(), point.Tags(), point.Time())
}
case <-timeout:
return nil
}
}
}
const millisecond = 1000000 * time.Nanosecond
type ack func(*sarama.ConsumerMessage) error
func readFromKafka(kafkaMsgs <-chan *sarama.ConsumerMessage, metricProducer chan<- []byte, maxBatchSize int, ackMsg ack, halt <-chan bool) {
batch := make([]byte, 0)
currentBatchSize := 0
timeout := time.After(500 * millisecond)
var msg *sarama.ConsumerMessage
for {
select {
case msg = <-kafkaMsgs:
if currentBatchSize != 0 {
batch = append(batch, '\n')
}
batch = append(batch, msg.Value...)
currentBatchSize++
if currentBatchSize == maxBatchSize {
metricProducer <- batch
currentBatchSize = 0
batch = make([]byte, 0)
ackMsg(msg)
}
case <-timeout:
if currentBatchSize != 0 {
metricProducer <- batch
currentBatchSize = 0
batch = make([]byte, 0)
ackMsg(msg)
}
timeout = time.After(500 * millisecond)
case <-halt:
if currentBatchSize != 0 {
metricProducer <- batch
ackMsg(msg)
}
return
}
}
}
func init() {
plugins.Add("kafka", func() plugins.Plugin {
return &Kafka{}
})
}