Add ability to tag metrics with topic in kafka_consumer (#5038)

This commit is contained in:
Sebastien Le Digabel 2018-11-29 00:29:26 +00:00 committed by Daniel Nelson
parent 35d08b2df7
commit 448c98d82e
3 changed files with 56 additions and 1 deletions

View File

@ -14,6 +14,8 @@ and use the old zookeeper connection method.
brokers = ["localhost:9092"] brokers = ["localhost:9092"]
## topic(s) to consume ## topic(s) to consume
topics = ["telegraf"] topics = ["telegraf"]
## Add topic as tag if topic_tag is not empty
# topic_tag = ""
## Optional Client id ## Optional Client id
# client_id = "Telegraf" # client_id = "Telegraf"

View File

@ -40,6 +40,8 @@ type Kafka struct {
Offset string `toml:"offset"` Offset string `toml:"offset"`
SASLUsername string `toml:"sasl_username"` SASLUsername string `toml:"sasl_username"`
SASLPassword string `toml:"sasl_password"` SASLPassword string `toml:"sasl_password"`
TopicTag string `toml:"topic_tag"`
tls.ClientConfig tls.ClientConfig
cluster Consumer cluster Consumer
@ -60,6 +62,8 @@ var sampleConfig = `
brokers = ["localhost:9092"] brokers = ["localhost:9092"]
## topic(s) to consume ## topic(s) to consume
topics = ["telegraf"] topics = ["telegraf"]
## Add topic as tag if topic_tag is not empty
# topic_tag = ""
## Optional Client id ## Optional Client id
# client_id = "Telegraf" # client_id = "Telegraf"
@ -256,7 +260,11 @@ func (k *Kafka) onMessage(acc telegraf.TrackingAccumulator, msg *sarama.Consumer
if err != nil { if err != nil {
return err return err
} }
if len(k.TopicTag) > 0 {
for _, metric := range metrics {
metric.AddTag(k.TopicTag, msg.Topic)
}
}
id := acc.AddTrackingMetricGroup(metrics) id := acc.AddTrackingMetricGroup(metrics)
k.messages[id] = msg k.messages[id] = msg

View File

@ -61,6 +61,25 @@ func newTestKafka() (*Kafka, *TestConsumer) {
return &k, consumer return &k, consumer
} }
func newTestKafkaWithTopicTag() (*Kafka, *TestConsumer) {
consumer := &TestConsumer{
errors: make(chan error),
messages: make(chan *sarama.ConsumerMessage, 1000),
}
k := Kafka{
cluster: consumer,
ConsumerGroup: "test",
Topics: []string{"telegraf"},
Brokers: []string{"localhost:9092"},
Offset: "oldest",
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
doNotCommitMsgs: true,
messages: make(map[telegraf.TrackingID]*sarama.ConsumerMessage),
TopicTag: "topic",
}
return &k, consumer
}
// Test that the parser parses kafka messages into points // Test that the parser parses kafka messages into points
func TestRunParser(t *testing.T) { func TestRunParser(t *testing.T) {
k, consumer := newTestKafka() k, consumer := newTestKafka()
@ -75,6 +94,22 @@ func TestRunParser(t *testing.T) {
assert.Equal(t, acc.NFields(), 1) assert.Equal(t, acc.NFields(), 1)
} }
// Test that the parser parses kafka messages into points
// and adds the topic tag
func TestRunParserWithTopic(t *testing.T) {
k, consumer := newTestKafkaWithTopicTag()
acc := testutil.Accumulator{}
ctx := context.Background()
k.parser, _ = parsers.NewInfluxParser()
go k.receiver(ctx, &acc)
consumer.Inject(saramaMsgWithTopic(testMsg, "test_topic"))
acc.Wait(1)
assert.Equal(t, acc.NFields(), 1)
assert.True(t, acc.HasTag("cpu_load_short", "topic"))
}
// Test that the parser ignores invalid messages // Test that the parser ignores invalid messages
func TestRunParserInvalidMsg(t *testing.T) { func TestRunParserInvalidMsg(t *testing.T) {
k, consumer := newTestKafka() k, consumer := newTestKafka()
@ -173,3 +208,13 @@ func saramaMsg(val string) *sarama.ConsumerMessage {
Partition: 0, Partition: 0,
} }
} }
func saramaMsgWithTopic(val string, topic string) *sarama.ConsumerMessage {
return &sarama.ConsumerMessage{
Key: nil,
Value: []byte(val),
Offset: 0,
Partition: 0,
Topic: topic,
}
}