Add Kafka Consumer Plugin
The Kafka consumer plugin polls a specified Kafka topic and adds messages to InfluxDB. The plugin assumes messages follow the line protocol. Consumer Group is used to talk to the Kafka cluster so multiple instances of telegraf can read from the same topic in parallel.
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package all
|
||||
|
||||
import (
|
||||
_ "github.com/influxdb/telegraf/plugins/kafka_consumer"
|
||||
_ "github.com/influxdb/telegraf/plugins/memcached"
|
||||
_ "github.com/influxdb/telegraf/plugins/mysql"
|
||||
_ "github.com/influxdb/telegraf/plugins/postgresql"
|
||||
|
||||
153
plugins/kafka_consumer/kafka_consumer.go
Normal file
153
plugins/kafka_consumer/kafka_consumer.go
Normal file
@@ -0,0 +1,153 @@
|
||||
package kafka_consumer
|
||||
|
||||
import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
"github.com/influxdb/telegraf/plugins"
|
||||
"github.com/wvanbergen/kafka/consumergroup"
|
||||
"gopkg.in/Shopify/sarama.v1"
|
||||
)
|
||||
|
||||
type Kafka struct {
|
||||
ConsumerGroupName string
|
||||
Topic string
|
||||
ZookeeperPeers []string
|
||||
Consumer *consumergroup.ConsumerGroup
|
||||
BatchSize int
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
# topic to consume
|
||||
topic = "topic_with_metrics"
|
||||
|
||||
# the name of the consumer group
|
||||
consumerGroupName = "telegraf_metrics_consumers"
|
||||
|
||||
# an array of Zookeeper connection strings
|
||||
zookeeperPeers = ["localhost:2181"]
|
||||
|
||||
# Batch size of points sent to InfluxDB
|
||||
batchSize = 1000`
|
||||
|
||||
func (k *Kafka) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
func (k *Kafka) Description() string {
|
||||
return "read metrics from a Kafka topic"
|
||||
}
|
||||
|
||||
type Metric struct {
|
||||
Measurement string `json:"measurement"`
|
||||
Values map[string]interface{} `json:"values"`
|
||||
Tags map[string]string `json:"tags"`
|
||||
Time time.Time `json:"time"`
|
||||
}
|
||||
|
||||
func (k *Kafka) Gather(acc plugins.Accumulator) error {
|
||||
var consumerErr error
|
||||
metricQueue := make(chan []byte, 200)
|
||||
|
||||
if k.Consumer == nil {
|
||||
k.Consumer, consumerErr = consumergroup.JoinConsumerGroup(
|
||||
k.ConsumerGroupName,
|
||||
[]string{k.Topic},
|
||||
k.ZookeeperPeers,
|
||||
nil,
|
||||
)
|
||||
|
||||
if consumerErr != nil {
|
||||
return consumerErr
|
||||
}
|
||||
|
||||
c := make(chan os.Signal, 1)
|
||||
halt := make(chan bool, 1)
|
||||
signal.Notify(c, os.Interrupt)
|
||||
go func() {
|
||||
<-c
|
||||
halt <- true
|
||||
emitMetrics(k, acc, metricQueue)
|
||||
k.Consumer.Close()
|
||||
}()
|
||||
|
||||
go readFromKafka(k.Consumer.Messages(), metricQueue, k.BatchSize, k.Consumer.CommitUpto, halt)
|
||||
}
|
||||
|
||||
return emitMetrics(k, acc, metricQueue)
|
||||
}
|
||||
|
||||
func emitMetrics(k *Kafka, acc plugins.Accumulator, metricConsumer <-chan []byte) error {
|
||||
timeout := time.After(1 * time.Second)
|
||||
|
||||
for {
|
||||
select {
|
||||
case batch := <-metricConsumer:
|
||||
var points []tsdb.Point
|
||||
var err error
|
||||
if points, err = tsdb.ParsePoints(batch); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, point := range points {
|
||||
acc.AddValuesWithTime(point.Name(), point.Fields(), point.Tags(), point.Time())
|
||||
}
|
||||
case <-timeout:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const millisecond = 1000000 * time.Nanosecond
|
||||
|
||||
type ack func(*sarama.ConsumerMessage) error
|
||||
|
||||
func readFromKafka(kafkaMsgs <-chan *sarama.ConsumerMessage, metricProducer chan<- []byte, maxBatchSize int, ackMsg ack, halt <-chan bool) {
|
||||
batch := make([]byte, 0)
|
||||
currentBatchSize := 0
|
||||
timeout := time.After(500 * millisecond)
|
||||
var msg *sarama.ConsumerMessage
|
||||
|
||||
for {
|
||||
select {
|
||||
case msg = <-kafkaMsgs:
|
||||
if currentBatchSize != 0 {
|
||||
batch = append(batch, '\n')
|
||||
}
|
||||
|
||||
batch = append(batch, msg.Value...)
|
||||
currentBatchSize++
|
||||
|
||||
if currentBatchSize == maxBatchSize {
|
||||
metricProducer <- batch
|
||||
currentBatchSize = 0
|
||||
batch = make([]byte, 0)
|
||||
ackMsg(msg)
|
||||
}
|
||||
case <-timeout:
|
||||
if currentBatchSize != 0 {
|
||||
metricProducer <- batch
|
||||
currentBatchSize = 0
|
||||
batch = make([]byte, 0)
|
||||
ackMsg(msg)
|
||||
}
|
||||
|
||||
timeout = time.After(500 * millisecond)
|
||||
case <-halt:
|
||||
if currentBatchSize != 0 {
|
||||
metricProducer <- batch
|
||||
ackMsg(msg)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
plugins.Add("kafka", func() plugins.Plugin {
|
||||
return &Kafka{}
|
||||
})
|
||||
}
|
||||
62
plugins/kafka_consumer/kafka_consumer_integration_test.go
Normal file
62
plugins/kafka_consumer/kafka_consumer_integration_test.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package kafka_consumer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/influxdb/telegraf/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestReadsMetricsFromKafka(t *testing.T) {
|
||||
var zkPeers, brokerPeers []string
|
||||
|
||||
if len(os.Getenv("ZOOKEEPER_PEERS")) == 0 {
|
||||
zkPeers = []string{"localhost:2181"}
|
||||
} else {
|
||||
zkPeers = strings.Split(os.Getenv("ZOOKEEPER_PEERS"), ",")
|
||||
}
|
||||
|
||||
if len(os.Getenv("KAFKA_PEERS")) == 0 {
|
||||
brokerPeers = []string{"localhost:9092"}
|
||||
} else {
|
||||
brokerPeers = strings.Split(os.Getenv("KAFKA_PEERS"), ",")
|
||||
}
|
||||
|
||||
k := &Kafka{
|
||||
ConsumerGroupName: "telegraf_test_consumers",
|
||||
Topic: fmt.Sprintf("telegraf_test_topic_%d", time.Now().Unix()),
|
||||
ZookeeperPeers: zkPeers,
|
||||
}
|
||||
|
||||
msg := "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257"
|
||||
producer, err := sarama.NewSyncProducer(brokerPeers, nil)
|
||||
require.NoError(t, err)
|
||||
_, _, err = producer.SendMessage(&sarama.ProducerMessage{Topic: k.Topic, Value: sarama.StringEncoder(msg)})
|
||||
producer.Close()
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
||||
// Sanity check
|
||||
assert.Equal(t, 0, len(acc.Points), "there should not be any points")
|
||||
|
||||
err = k.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 1, len(acc.Points), "there should be a single point")
|
||||
|
||||
point := acc.Points[0]
|
||||
assert.Equal(t, "cpu_load_short", point.Measurement)
|
||||
assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Values)
|
||||
assert.Equal(t, map[string]string{
|
||||
"host": "server01",
|
||||
"direction": "in",
|
||||
"region": "us-west",
|
||||
}, point.Tags)
|
||||
assert.Equal(t, time.Unix(0, 1422568543702900257), point.Time)
|
||||
}
|
||||
95
plugins/kafka_consumer/kafka_consumer_test.go
Normal file
95
plugins/kafka_consumer/kafka_consumer_test.go
Normal file
@@ -0,0 +1,95 @@
|
||||
package kafka_consumer
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/telegraf/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"gopkg.in/Shopify/sarama.v1"
|
||||
)
|
||||
|
||||
const testMsg = "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257"
|
||||
|
||||
func TestReadFromKafkaBatchesMsgsOnBatchSize(t *testing.T) {
|
||||
halt := make(chan bool, 1)
|
||||
metricChan := make(chan []byte, 1)
|
||||
kafkaChan := make(chan *sarama.ConsumerMessage, 10)
|
||||
for i := 0; i < 10; i++ {
|
||||
kafkaChan <- saramaMsg(testMsg)
|
||||
}
|
||||
|
||||
expectedBatch := strings.Repeat(testMsg+"\n", 9) + testMsg
|
||||
readFromKafka(kafkaChan, metricChan, 10, func(msg *sarama.ConsumerMessage) error {
|
||||
batch := <-metricChan
|
||||
assert.Equal(t, expectedBatch, string(batch))
|
||||
|
||||
halt <- true
|
||||
|
||||
return nil
|
||||
}, halt)
|
||||
}
|
||||
|
||||
func TestReadFromKafkaBatchesMsgsOnTimeout(t *testing.T) {
|
||||
halt := make(chan bool, 1)
|
||||
metricChan := make(chan []byte, 1)
|
||||
kafkaChan := make(chan *sarama.ConsumerMessage, 10)
|
||||
for i := 0; i < 3; i++ {
|
||||
kafkaChan <- saramaMsg(testMsg)
|
||||
}
|
||||
|
||||
expectedBatch := strings.Repeat(testMsg+"\n", 2) + testMsg
|
||||
readFromKafka(kafkaChan, metricChan, 10, func(msg *sarama.ConsumerMessage) error {
|
||||
batch := <-metricChan
|
||||
assert.Equal(t, expectedBatch, string(batch))
|
||||
|
||||
halt <- true
|
||||
|
||||
return nil
|
||||
}, halt)
|
||||
}
|
||||
|
||||
func TestEmitMetricsSendMetricsToAcc(t *testing.T) {
|
||||
k := &Kafka{}
|
||||
var acc testutil.Accumulator
|
||||
testChan := make(chan []byte, 1)
|
||||
testChan <- []byte(testMsg)
|
||||
|
||||
err := emitMetrics(k, &acc, testChan)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 1, len(acc.Points), "there should be a single point")
|
||||
|
||||
point := acc.Points[0]
|
||||
assert.Equal(t, "cpu_load_short", point.Measurement)
|
||||
assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Values)
|
||||
assert.Equal(t, map[string]string{
|
||||
"host": "server01",
|
||||
"direction": "in",
|
||||
"region": "us-west",
|
||||
}, point.Tags)
|
||||
|
||||
assert.Equal(t, time.Unix(0, 1422568543702900257), point.Time)
|
||||
}
|
||||
|
||||
func TestEmitMetricsTimesOut(t *testing.T) {
|
||||
k := &Kafka{}
|
||||
var acc testutil.Accumulator
|
||||
testChan := make(chan []byte)
|
||||
|
||||
err := emitMetrics(k, &acc, testChan)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 0, len(acc.Points), "there should not be a any points")
|
||||
}
|
||||
|
||||
func saramaMsg(val string) *sarama.ConsumerMessage {
|
||||
return &sarama.ConsumerMessage{
|
||||
Key: nil,
|
||||
Value: []byte(val),
|
||||
Offset: 0,
|
||||
Partition: 0,
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user