Use sarama built in support for consumer groups (#6172)
This commit is contained in:
@@ -2,219 +2,344 @@ package kafka_consumer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/value"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
const (
|
||||
testMsg = "cpu_load_short,host=server01 value=23422.0 1422568543702900257\n"
|
||||
testMsgGraphite = "cpu.load.short.graphite 23422 1454780029"
|
||||
testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n"
|
||||
invalidMsg = "cpu_load_short,host=server01 1422568543702900257\n"
|
||||
)
|
||||
type FakeConsumerGroup struct {
|
||||
brokers []string
|
||||
group string
|
||||
config *sarama.Config
|
||||
|
||||
type TestConsumer struct {
|
||||
errors chan error
|
||||
messages chan *sarama.ConsumerMessage
|
||||
handler sarama.ConsumerGroupHandler
|
||||
errors chan error
|
||||
}
|
||||
|
||||
func (c *TestConsumer) Errors() <-chan error {
|
||||
return c.errors
|
||||
}
|
||||
|
||||
func (c *TestConsumer) Messages() <-chan *sarama.ConsumerMessage {
|
||||
return c.messages
|
||||
}
|
||||
|
||||
func (c *TestConsumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
|
||||
}
|
||||
|
||||
func (c *TestConsumer) Close() error {
|
||||
func (g *FakeConsumerGroup) Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error {
|
||||
g.handler = handler
|
||||
g.handler.Setup(nil)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *TestConsumer) Inject(msg *sarama.ConsumerMessage) {
|
||||
c.messages <- msg
|
||||
func (g *FakeConsumerGroup) Errors() <-chan error {
|
||||
return g.errors
|
||||
}
|
||||
|
||||
func newTestKafka() (*Kafka, *TestConsumer) {
|
||||
consumer := &TestConsumer{
|
||||
errors: make(chan error),
|
||||
messages: make(chan *sarama.ConsumerMessage, 1000),
|
||||
func (g *FakeConsumerGroup) Close() error {
|
||||
close(g.errors)
|
||||
return nil
|
||||
}
|
||||
|
||||
type FakeCreator struct {
|
||||
ConsumerGroup *FakeConsumerGroup
|
||||
}
|
||||
|
||||
func (c *FakeCreator) Create(brokers []string, group string, config *sarama.Config) (ConsumerGroup, error) {
|
||||
c.ConsumerGroup.brokers = brokers
|
||||
c.ConsumerGroup.group = group
|
||||
c.ConsumerGroup.config = config
|
||||
return c.ConsumerGroup, nil
|
||||
}
|
||||
|
||||
func TestInit(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
plugin *KafkaConsumer
|
||||
initError bool
|
||||
check func(t *testing.T, plugin *KafkaConsumer)
|
||||
}{
|
||||
{
|
||||
name: "default config",
|
||||
plugin: &KafkaConsumer{},
|
||||
check: func(t *testing.T, plugin *KafkaConsumer) {
|
||||
require.Equal(t, plugin.ConsumerGroup, defaultConsumerGroup)
|
||||
require.Equal(t, plugin.MaxUndeliveredMessages, defaultMaxUndeliveredMessages)
|
||||
require.Equal(t, plugin.config.ClientID, "Telegraf")
|
||||
require.Equal(t, plugin.config.Consumer.Offsets.Initial, sarama.OffsetOldest)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "parses valid version string",
|
||||
plugin: &KafkaConsumer{
|
||||
Version: "1.0.0",
|
||||
},
|
||||
check: func(t *testing.T, plugin *KafkaConsumer) {
|
||||
require.Equal(t, plugin.config.Version, sarama.V1_0_0_0)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "invalid version string",
|
||||
plugin: &KafkaConsumer{
|
||||
Version: "100",
|
||||
},
|
||||
initError: true,
|
||||
},
|
||||
{
|
||||
name: "custom client_id",
|
||||
plugin: &KafkaConsumer{
|
||||
ClientID: "custom",
|
||||
},
|
||||
check: func(t *testing.T, plugin *KafkaConsumer) {
|
||||
require.Equal(t, plugin.config.ClientID, "custom")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "custom offset",
|
||||
plugin: &KafkaConsumer{
|
||||
Offset: "newest",
|
||||
},
|
||||
check: func(t *testing.T, plugin *KafkaConsumer) {
|
||||
require.Equal(t, plugin.config.Consumer.Offsets.Initial, sarama.OffsetNewest)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "invalid offset",
|
||||
plugin: &KafkaConsumer{
|
||||
Offset: "middle",
|
||||
},
|
||||
initError: true,
|
||||
},
|
||||
}
|
||||
k := Kafka{
|
||||
cluster: consumer,
|
||||
ConsumerGroup: "test",
|
||||
Topics: []string{"telegraf"},
|
||||
Brokers: []string{"localhost:9092"},
|
||||
Offset: "oldest",
|
||||
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
|
||||
doNotCommitMsgs: true,
|
||||
messages: make(map[telegraf.TrackingID]*sarama.ConsumerMessage),
|
||||
}
|
||||
return &k, consumer
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cg := &FakeConsumerGroup{}
|
||||
tt.plugin.ConsumerCreator = &FakeCreator{ConsumerGroup: cg}
|
||||
err := tt.plugin.Init()
|
||||
if tt.initError {
|
||||
require.Error(t, err)
|
||||
return
|
||||
}
|
||||
|
||||
func newTestKafkaWithTopicTag() (*Kafka, *TestConsumer) {
|
||||
consumer := &TestConsumer{
|
||||
errors: make(chan error),
|
||||
messages: make(chan *sarama.ConsumerMessage, 1000),
|
||||
}
|
||||
k := Kafka{
|
||||
cluster: consumer,
|
||||
ConsumerGroup: "test",
|
||||
Topics: []string{"telegraf"},
|
||||
Brokers: []string{"localhost:9092"},
|
||||
Offset: "oldest",
|
||||
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
|
||||
doNotCommitMsgs: true,
|
||||
messages: make(map[telegraf.TrackingID]*sarama.ConsumerMessage),
|
||||
TopicTag: "topic",
|
||||
}
|
||||
return &k, consumer
|
||||
}
|
||||
|
||||
// Test that the parser parses kafka messages into points
|
||||
func TestRunParser(t *testing.T) {
|
||||
k, consumer := newTestKafka()
|
||||
acc := testutil.Accumulator{}
|
||||
ctx := context.Background()
|
||||
|
||||
k.parser, _ = parsers.NewInfluxParser()
|
||||
go k.receiver(ctx, &acc)
|
||||
consumer.Inject(saramaMsg(testMsg))
|
||||
acc.Wait(1)
|
||||
|
||||
assert.Equal(t, acc.NFields(), 1)
|
||||
}
|
||||
|
||||
// Test that the parser parses kafka messages into points
|
||||
// and adds the topic tag
|
||||
func TestRunParserWithTopic(t *testing.T) {
|
||||
k, consumer := newTestKafkaWithTopicTag()
|
||||
acc := testutil.Accumulator{}
|
||||
ctx := context.Background()
|
||||
|
||||
k.parser, _ = parsers.NewInfluxParser()
|
||||
go k.receiver(ctx, &acc)
|
||||
consumer.Inject(saramaMsgWithTopic(testMsg, "test_topic"))
|
||||
acc.Wait(1)
|
||||
|
||||
assert.Equal(t, acc.NFields(), 1)
|
||||
assert.True(t, acc.HasTag("cpu_load_short", "topic"))
|
||||
}
|
||||
|
||||
// Test that the parser ignores invalid messages
|
||||
func TestRunParserInvalidMsg(t *testing.T) {
|
||||
k, consumer := newTestKafka()
|
||||
acc := testutil.Accumulator{}
|
||||
ctx := context.Background()
|
||||
|
||||
k.parser, _ = parsers.NewInfluxParser()
|
||||
go k.receiver(ctx, &acc)
|
||||
consumer.Inject(saramaMsg(invalidMsg))
|
||||
acc.WaitError(1)
|
||||
|
||||
assert.Equal(t, acc.NFields(), 0)
|
||||
}
|
||||
|
||||
// Test that overlong messages are dropped
|
||||
func TestDropOverlongMsg(t *testing.T) {
|
||||
const maxMessageLen = 64 * 1024
|
||||
k, consumer := newTestKafka()
|
||||
k.MaxMessageLen = maxMessageLen
|
||||
acc := testutil.Accumulator{}
|
||||
ctx := context.Background()
|
||||
overlongMsg := strings.Repeat("v", maxMessageLen+1)
|
||||
|
||||
go k.receiver(ctx, &acc)
|
||||
consumer.Inject(saramaMsg(overlongMsg))
|
||||
acc.WaitError(1)
|
||||
|
||||
assert.Equal(t, acc.NFields(), 0)
|
||||
}
|
||||
|
||||
// Test that the parser parses kafka messages into points
|
||||
func TestRunParserAndGather(t *testing.T) {
|
||||
k, consumer := newTestKafka()
|
||||
acc := testutil.Accumulator{}
|
||||
ctx := context.Background()
|
||||
|
||||
k.parser, _ = parsers.NewInfluxParser()
|
||||
go k.receiver(ctx, &acc)
|
||||
consumer.Inject(saramaMsg(testMsg))
|
||||
acc.Wait(1)
|
||||
|
||||
acc.GatherError(k.Gather)
|
||||
|
||||
assert.Equal(t, acc.NFields(), 1)
|
||||
acc.AssertContainsFields(t, "cpu_load_short",
|
||||
map[string]interface{}{"value": float64(23422)})
|
||||
}
|
||||
|
||||
// Test that the parser parses kafka messages into points
|
||||
func TestRunParserAndGatherGraphite(t *testing.T) {
|
||||
k, consumer := newTestKafka()
|
||||
acc := testutil.Accumulator{}
|
||||
ctx := context.Background()
|
||||
|
||||
k.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
|
||||
go k.receiver(ctx, &acc)
|
||||
consumer.Inject(saramaMsg(testMsgGraphite))
|
||||
acc.Wait(1)
|
||||
|
||||
acc.GatherError(k.Gather)
|
||||
|
||||
assert.Equal(t, acc.NFields(), 1)
|
||||
acc.AssertContainsFields(t, "cpu_load_short_graphite",
|
||||
map[string]interface{}{"value": float64(23422)})
|
||||
}
|
||||
|
||||
// Test that the parser parses kafka messages into points
|
||||
func TestRunParserAndGatherJSON(t *testing.T) {
|
||||
k, consumer := newTestKafka()
|
||||
acc := testutil.Accumulator{}
|
||||
ctx := context.Background()
|
||||
|
||||
k.parser, _ = parsers.NewParser(&parsers.Config{
|
||||
DataFormat: "json",
|
||||
MetricName: "kafka_json_test",
|
||||
})
|
||||
go k.receiver(ctx, &acc)
|
||||
consumer.Inject(saramaMsg(testMsgJSON))
|
||||
acc.Wait(1)
|
||||
|
||||
acc.GatherError(k.Gather)
|
||||
|
||||
assert.Equal(t, acc.NFields(), 2)
|
||||
acc.AssertContainsFields(t, "kafka_json_test",
|
||||
map[string]interface{}{
|
||||
"a": float64(5),
|
||||
"b_c": float64(6),
|
||||
tt.check(t, tt.plugin)
|
||||
})
|
||||
}
|
||||
|
||||
func saramaMsg(val string) *sarama.ConsumerMessage {
|
||||
return &sarama.ConsumerMessage{
|
||||
Key: nil,
|
||||
Value: []byte(val),
|
||||
Offset: 0,
|
||||
Partition: 0,
|
||||
}
|
||||
}
|
||||
|
||||
func saramaMsgWithTopic(val string, topic string) *sarama.ConsumerMessage {
|
||||
return &sarama.ConsumerMessage{
|
||||
Key: nil,
|
||||
Value: []byte(val),
|
||||
Offset: 0,
|
||||
Partition: 0,
|
||||
Topic: topic,
|
||||
func TestStartStop(t *testing.T) {
|
||||
cg := &FakeConsumerGroup{errors: make(chan error)}
|
||||
plugin := &KafkaConsumer{
|
||||
ConsumerCreator: &FakeCreator{ConsumerGroup: cg},
|
||||
}
|
||||
err := plugin.Init()
|
||||
require.NoError(t, err)
|
||||
|
||||
var acc testutil.Accumulator
|
||||
err = plugin.Start(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
plugin.Stop()
|
||||
}
|
||||
|
||||
type FakeConsumerGroupSession struct {
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (s *FakeConsumerGroupSession) Claims() map[string][]int32 {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (s *FakeConsumerGroupSession) MemberID() string {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (s *FakeConsumerGroupSession) GenerationID() int32 {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (s *FakeConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (s *FakeConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (s *FakeConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string) {
|
||||
}
|
||||
|
||||
func (s *FakeConsumerGroupSession) Context() context.Context {
|
||||
return s.ctx
|
||||
}
|
||||
|
||||
type FakeConsumerGroupClaim struct {
|
||||
messages chan *sarama.ConsumerMessage
|
||||
}
|
||||
|
||||
func (c *FakeConsumerGroupClaim) Topic() string {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (c *FakeConsumerGroupClaim) Partition() int32 {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (c *FakeConsumerGroupClaim) InitialOffset() int64 {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (c *FakeConsumerGroupClaim) HighWaterMarkOffset() int64 {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (c *FakeConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage {
|
||||
return c.messages
|
||||
}
|
||||
|
||||
func TestConsumerGroupHandler_Lifecycle(t *testing.T) {
|
||||
acc := &testutil.Accumulator{}
|
||||
parser := &value.ValueParser{MetricName: "cpu", DataType: "int"}
|
||||
cg := NewConsumerGroupHandler(acc, 1, parser)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
session := &FakeConsumerGroupSession{
|
||||
ctx: ctx,
|
||||
}
|
||||
var claim FakeConsumerGroupClaim
|
||||
var err error
|
||||
|
||||
err = cg.Setup(session)
|
||||
require.NoError(t, err)
|
||||
|
||||
cancel()
|
||||
err = cg.ConsumeClaim(session, &claim)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = cg.Cleanup(session)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestConsumerGroupHandler_ConsumeClaim(t *testing.T) {
|
||||
acc := &testutil.Accumulator{}
|
||||
parser := &value.ValueParser{MetricName: "cpu", DataType: "int"}
|
||||
cg := NewConsumerGroupHandler(acc, 1, parser)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
session := &FakeConsumerGroupSession{ctx: ctx}
|
||||
claim := &FakeConsumerGroupClaim{
|
||||
messages: make(chan *sarama.ConsumerMessage, 1),
|
||||
}
|
||||
|
||||
err := cg.Setup(session)
|
||||
require.NoError(t, err)
|
||||
|
||||
claim.messages <- &sarama.ConsumerMessage{
|
||||
Topic: "telegraf",
|
||||
Value: []byte("42"),
|
||||
}
|
||||
|
||||
go func() {
|
||||
err = cg.ConsumeClaim(session, claim)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
acc.Wait(1)
|
||||
cancel()
|
||||
|
||||
err = cg.Cleanup(session)
|
||||
require.NoError(t, err)
|
||||
|
||||
expected := []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"value": 42,
|
||||
},
|
||||
time.Now(),
|
||||
),
|
||||
}
|
||||
|
||||
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
|
||||
}
|
||||
|
||||
func TestConsumerGroupHandler_Handle(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
maxMessageLen int
|
||||
topicTag string
|
||||
msg *sarama.ConsumerMessage
|
||||
expected []telegraf.Metric
|
||||
}{
|
||||
{
|
||||
name: "happy path",
|
||||
msg: &sarama.ConsumerMessage{
|
||||
Topic: "telegraf",
|
||||
Value: []byte("42"),
|
||||
},
|
||||
expected: []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"value": 42,
|
||||
},
|
||||
time.Now(),
|
||||
),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "message to long",
|
||||
maxMessageLen: 4,
|
||||
msg: &sarama.ConsumerMessage{
|
||||
Topic: "telegraf",
|
||||
Value: []byte("12345"),
|
||||
},
|
||||
expected: []telegraf.Metric{},
|
||||
},
|
||||
{
|
||||
name: "parse error",
|
||||
msg: &sarama.ConsumerMessage{
|
||||
Topic: "telegraf",
|
||||
Value: []byte("not an integer"),
|
||||
},
|
||||
expected: []telegraf.Metric{},
|
||||
},
|
||||
{
|
||||
name: "add topic tag",
|
||||
topicTag: "topic",
|
||||
msg: &sarama.ConsumerMessage{
|
||||
Topic: "telegraf",
|
||||
Value: []byte("42"),
|
||||
},
|
||||
expected: []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{
|
||||
"topic": "telegraf",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"value": 42,
|
||||
},
|
||||
time.Now(),
|
||||
),
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
acc := &testutil.Accumulator{}
|
||||
parser := &value.ValueParser{MetricName: "cpu", DataType: "int"}
|
||||
cg := NewConsumerGroupHandler(acc, 1, parser)
|
||||
cg.MaxMessageLen = tt.maxMessageLen
|
||||
cg.TopicTag = tt.topicTag
|
||||
|
||||
ctx := context.Background()
|
||||
session := &FakeConsumerGroupSession{ctx: ctx}
|
||||
|
||||
cg.Reserve(ctx)
|
||||
cg.Handle(session, tt.msg)
|
||||
|
||||
testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user