Add topic tag options to kafka output (#7142)

This commit is contained in:
Daniel Nelson 2020-03-10 13:38:26 -07:00 committed by GitHub
parent 41f39f5f59
commit b6de4da41f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 199 additions and 15 deletions

View File

@ -6,9 +6,11 @@ import (
"log" "log"
"strings" "strings"
"sync" "sync"
"time"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/common/kafka" "github.com/influxdata/telegraf/plugins/common/kafka"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
@ -83,6 +85,7 @@ const (
defaultMaxUndeliveredMessages = 1000 defaultMaxUndeliveredMessages = 1000
defaultMaxMessageLen = 1000000 defaultMaxMessageLen = 1000000
defaultConsumerGroup = "telegraf_metrics_consumers" defaultConsumerGroup = "telegraf_metrics_consumers"
reconnectDelay = 5 * time.Second
) )
type empty struct{} type empty struct{}
@ -259,6 +262,7 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
err := k.consumer.Consume(ctx, k.Topics, handler) err := k.consumer.Consume(ctx, k.Topics, handler)
if err != nil { if err != nil {
acc.AddError(err) acc.AddError(err)
internal.SleepContext(ctx, reconnectDelay)
} }
} }
err = k.consumer.Close() err = k.consumer.Close()

View File

@ -10,6 +10,13 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
## Kafka topic for producer messages ## Kafka topic for producer messages
topic = "telegraf" topic = "telegraf"
## The value of this tag will be used as the topic. If not set the 'topic'
## option is used.
# topic_tag = ""
## If true, the 'topic_tag' will be removed from to the metric.
# exclude_topic_tag = false
## Optional Client id ## Optional Client id
# client_id = "Telegraf" # client_id = "Telegraf"

View File

@ -26,16 +26,18 @@ var zeroTime = time.Unix(0, 0)
type ( type (
Kafka struct { Kafka struct {
Brokers []string Brokers []string `toml:"brokers"`
Topic string Topic string `toml:"topic"`
TopicTag string `toml:"topic_tag"`
ExcludeTopicTag bool `toml:"exclude_topic_tag"`
ClientID string `toml:"client_id"` ClientID string `toml:"client_id"`
TopicSuffix TopicSuffix `toml:"topic_suffix"` TopicSuffix TopicSuffix `toml:"topic_suffix"`
RoutingTag string `toml:"routing_tag"` RoutingTag string `toml:"routing_tag"`
RoutingKey string `toml:"routing_key"` RoutingKey string `toml:"routing_key"`
CompressionCodec int CompressionCodec int `toml:"compression_codec"`
RequiredAcks int RequiredAcks int `toml:"required_acks"`
MaxRetry int MaxRetry int `toml:"max_retry"`
MaxMessageBytes int `toml:"max_message_bytes"` MaxMessageBytes int `toml:"max_message_bytes"`
Version string `toml:"version"` Version string `toml:"version"`
@ -57,7 +59,9 @@ type (
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
tlsConfig tls.Config tlsConfig tls.Config
producer sarama.SyncProducer
producerFunc func(addrs []string, config *sarama.Config) (sarama.SyncProducer, error)
producer sarama.SyncProducer
serializer serializers.Serializer serializer serializers.Serializer
} }
@ -94,6 +98,13 @@ var sampleConfig = `
## Kafka topic for producer messages ## Kafka topic for producer messages
topic = "telegraf" topic = "telegraf"
## The value of this tag will be used as the topic. If not set the 'topic'
## option is used.
# topic_tag = ""
## If true, the 'topic_tag' will be removed from to the metric.
# exclude_topic_tag = false
## Optional Client id ## Optional Client id
# client_id = "Telegraf" # client_id = "Telegraf"
@ -212,14 +223,29 @@ func ValidateTopicSuffixMethod(method string) error {
return fmt.Errorf("Unknown topic suffix method provided: %s", method) return fmt.Errorf("Unknown topic suffix method provided: %s", method)
} }
func (k *Kafka) GetTopicName(metric telegraf.Metric) string { func (k *Kafka) GetTopicName(metric telegraf.Metric) (telegraf.Metric, string) {
topic := k.Topic
if k.TopicTag != "" {
if t, ok := metric.GetTag(k.TopicTag); ok {
topic = t
// If excluding the topic tag, a copy is required to avoid modifying
// the metric buffer.
if k.ExcludeTopicTag {
metric = metric.Copy()
metric.Accept()
metric.RemoveTag(k.TopicTag)
}
}
}
var topicName string var topicName string
switch k.TopicSuffix.Method { switch k.TopicSuffix.Method {
case "measurement": case "measurement":
topicName = k.Topic + k.TopicSuffix.Separator + metric.Name() topicName = topic + k.TopicSuffix.Separator + metric.Name()
case "tags": case "tags":
var topicNameComponents []string var topicNameComponents []string
topicNameComponents = append(topicNameComponents, k.Topic) topicNameComponents = append(topicNameComponents, topic)
for _, tag := range k.TopicSuffix.Keys { for _, tag := range k.TopicSuffix.Keys {
tagValue := metric.Tags()[tag] tagValue := metric.Tags()[tag]
if tagValue != "" { if tagValue != "" {
@ -228,9 +254,9 @@ func (k *Kafka) GetTopicName(metric telegraf.Metric) string {
} }
topicName = strings.Join(topicNameComponents, k.TopicSuffix.Separator) topicName = strings.Join(topicNameComponents, k.TopicSuffix.Separator)
default: default:
topicName = k.Topic topicName = topic
} }
return topicName return metric, topicName
} }
func (k *Kafka) SetSerializer(serializer serializers.Serializer) { func (k *Kafka) SetSerializer(serializer serializers.Serializer) {
@ -306,7 +332,7 @@ func (k *Kafka) Connect() error {
config.Net.SASL.Version = version config.Net.SASL.Version = version
} }
producer, err := sarama.NewSyncProducer(k.Brokers, config) producer, err := k.producerFunc(k.Brokers, config)
if err != nil { if err != nil {
return err return err
} }
@ -348,6 +374,8 @@ func (k *Kafka) routingKey(metric telegraf.Metric) (string, error) {
func (k *Kafka) Write(metrics []telegraf.Metric) error { func (k *Kafka) Write(metrics []telegraf.Metric) error {
msgs := make([]*sarama.ProducerMessage, 0, len(metrics)) msgs := make([]*sarama.ProducerMessage, 0, len(metrics))
for _, metric := range metrics { for _, metric := range metrics {
metric, topic := k.GetTopicName(metric)
buf, err := k.serializer.Serialize(metric) buf, err := k.serializer.Serialize(metric)
if err != nil { if err != nil {
k.Log.Debugf("Could not serialize metric: %v", err) k.Log.Debugf("Could not serialize metric: %v", err)
@ -355,7 +383,7 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
} }
m := &sarama.ProducerMessage{ m := &sarama.ProducerMessage{
Topic: k.GetTopicName(metric), Topic: topic,
Value: sarama.ByteEncoder(buf), Value: sarama.ByteEncoder(buf),
} }
@ -403,6 +431,7 @@ func init() {
return &Kafka{ return &Kafka{
MaxRetry: 3, MaxRetry: 3,
RequiredAcks: -1, RequiredAcks: -1,
producerFunc: sarama.NewSyncProducer,
} }
}) })
} }

View File

@ -4,6 +4,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/Shopify/sarama"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/plugins/serializers"
@ -81,7 +82,7 @@ func TestTopicSuffixes(t *testing.T) {
TopicSuffix: topicSuffix, TopicSuffix: topicSuffix,
} }
topic := k.GetTopicName(metric) _, topic := k.GetTopicName(metric)
require.Equal(t, expectedTopic, topic) require.Equal(t, expectedTopic, topic)
} }
} }
@ -156,3 +157,146 @@ func TestRoutingKey(t *testing.T) {
}) })
} }
} }
type MockProducer struct {
sent []*sarama.ProducerMessage
}
func (p *MockProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
p.sent = append(p.sent, msg)
return 0, 0, nil
}
func (p *MockProducer) SendMessages(msgs []*sarama.ProducerMessage) error {
p.sent = append(p.sent, msgs...)
return nil
}
func (p *MockProducer) Close() error {
return nil
}
func NewMockProducer(addrs []string, config *sarama.Config) (sarama.SyncProducer, error) {
return &MockProducer{}, nil
}
func TestTopicTag(t *testing.T) {
tests := []struct {
name string
plugin *Kafka
input []telegraf.Metric
topic string
value string
}{
{
name: "static topic",
plugin: &Kafka{
Brokers: []string{"127.0.0.1"},
Topic: "telegraf",
producerFunc: NewMockProducer,
},
input: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0),
),
},
topic: "telegraf",
value: "cpu time_idle=42 0\n",
},
{
name: "topic tag overrides static topic",
plugin: &Kafka{
Brokers: []string{"127.0.0.1"},
Topic: "telegraf",
TopicTag: "topic",
producerFunc: NewMockProducer,
},
input: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"topic": "xyzzy",
},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0),
),
},
topic: "xyzzy",
value: "cpu,topic=xyzzy time_idle=42 0\n",
},
{
name: "missing topic tag falls back to static topic",
plugin: &Kafka{
Brokers: []string{"127.0.0.1"},
Topic: "telegraf",
TopicTag: "topic",
producerFunc: NewMockProducer,
},
input: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0),
),
},
topic: "telegraf",
value: "cpu time_idle=42 0\n",
},
{
name: "exclude topic tag removes tag",
plugin: &Kafka{
Brokers: []string{"127.0.0.1"},
Topic: "telegraf",
TopicTag: "topic",
ExcludeTopicTag: true,
producerFunc: NewMockProducer,
},
input: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"topic": "xyzzy",
},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0),
),
},
topic: "xyzzy",
value: "cpu time_idle=42 0\n",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, err := serializers.NewInfluxSerializer()
require.NoError(t, err)
tt.plugin.SetSerializer(s)
err = tt.plugin.Connect()
require.NoError(t, err)
producer := &MockProducer{}
tt.plugin.producer = producer
err = tt.plugin.Write(tt.input)
require.NoError(t, err)
require.Equal(t, tt.topic, producer.sent[0].Topic)
encoded, err := producer.sent[0].Value.Encode()
require.NoError(t, err)
require.Equal(t, tt.value, string(encoded))
})
}
}