From d2cf9a715755657b06ffde37f4509fb1f5df1262 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 21 Aug 2018 12:44:10 -0700 Subject: [PATCH] Add support for static and random routing keys in kafka output (#4579) --- plugins/outputs/kafka/README.md | 7 ++++ plugins/outputs/kafka/kafka.go | 55 +++++++++++++++++---------- plugins/outputs/kafka/kafka_test.go | 59 +++++++++++++++++++++++++++++ 3 files changed, 102 insertions(+), 19 deletions(-) diff --git a/plugins/outputs/kafka/README.md b/plugins/outputs/kafka/README.md index 5f4758baa..25b173a02 100644 --- a/plugins/outputs/kafka/README.md +++ b/plugins/outputs/kafka/README.md @@ -50,6 +50,13 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm ## ie, if this tag exists, its value will be used as the routing key routing_tag = "host" + ## Static routing key. Used when no routing_tag is set or as a fallback + ## when the tag specified in routing tag is not found. If set to "random", + ## a random value will be generated for each message. + ## ex: routing_key = "random" + ## routing_key = "telegraf" + # routing_key = "" + ## CompressionCodec represents the various compression codecs recognized by ## Kafka in messages. ## 0 : No compression diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index b9ae35396..f2951e6d5 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -10,6 +10,7 @@ import ( tlsint "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers" + uuid "github.com/satori/go.uuid" "github.com/Shopify/sarama" ) @@ -22,24 +23,16 @@ var ValidTopicSuffixMethods = []string{ type ( Kafka struct { - // Kafka brokers to send metrics to - Brokers []string - // Kafka topic - Topic string - // Kafka client id - ClientID string `toml:"client_id"` - // Kafka topic suffix option - TopicSuffix TopicSuffix `toml:"topic_suffix"` - // Routing Key Tag - RoutingTag string `toml:"routing_tag"` - // Compression Codec Tag + Brokers []string + Topic string + ClientID string `toml:"client_id"` + TopicSuffix TopicSuffix `toml:"topic_suffix"` + RoutingTag string `toml:"routing_tag"` + RoutingKey string `toml:"routing_key"` CompressionCodec int - // RequiredAcks Tag - RequiredAcks int - // MaxRetry Tag - MaxRetry int - // Max Message Bytes - MaxMessageBytes int `toml:"max_message_bytes"` + RequiredAcks int + MaxRetry int + MaxMessageBytes int `toml:"max_message_bytes"` Version string `toml:"version"` @@ -116,6 +109,13 @@ var sampleConfig = ` ## ie, if this tag exists, its value will be used as the routing key routing_tag = "host" + ## Static routing key. Used when no routing_tag is set or as a fallback + ## when the tag specified in routing tag is not found. If set to "random", + ## a random value will be generated for each message. + ## ex: routing_key = "random" + ## routing_key = "telegraf" + # routing_key = "" + ## CompressionCodec represents the various compression codecs recognized by ## Kafka in messages. ## 0 : No compression @@ -273,6 +273,22 @@ func (k *Kafka) Description() string { return "Configuration for the Kafka server to send metrics to" } +func (k *Kafka) routingKey(metric telegraf.Metric) string { + if k.RoutingTag != "" { + key, ok := metric.GetTag(k.RoutingTag) + if ok { + return key + } + } + + if k.RoutingKey == "random" { + u := uuid.NewV4() + return u.String() + } + + return k.RoutingKey +} + func (k *Kafka) Write(metrics []telegraf.Metric) error { msgs := make([]*sarama.ProducerMessage, 0, len(metrics)) for _, metric := range metrics { @@ -285,8 +301,9 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error { Topic: k.GetTopicName(metric), Value: sarama.ByteEncoder(buf), } - if h, ok := metric.GetTag(k.RoutingTag); ok { - m.Key = sarama.StringEncoder(h) + key := k.routingKey(metric) + if key != "" { + m.Key = sarama.StringEncoder(key) } msgs = append(msgs, m) } diff --git a/plugins/outputs/kafka/kafka_test.go b/plugins/outputs/kafka/kafka_test.go index b18d9f15d..ba900e32c 100644 --- a/plugins/outputs/kafka/kafka_test.go +++ b/plugins/outputs/kafka/kafka_test.go @@ -2,7 +2,10 @@ package kafka import ( "testing" + "time" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" @@ -96,3 +99,59 @@ func TestValidateTopicSuffixMethod(t *testing.T) { require.NoError(t, err, "Topic suffix method used should be valid.") } } + +func TestRoutingKey(t *testing.T) { + tests := []struct { + name string + kafka *Kafka + metric telegraf.Metric + check func(t *testing.T, routingKey string) + }{ + { + name: "static routing key", + kafka: &Kafka{ + RoutingKey: "static", + }, + metric: func() telegraf.Metric { + m, _ := metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ) + return m + }(), + check: func(t *testing.T, routingKey string) { + require.Equal(t, "static", routingKey) + }, + }, + { + name: "random routing key", + kafka: &Kafka{ + RoutingKey: "random", + }, + metric: func() telegraf.Metric { + m, _ := metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ) + return m + }(), + check: func(t *testing.T, routingKey string) { + require.Equal(t, 36, len(routingKey)) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + key := tt.kafka.routingKey(tt.metric) + tt.check(t, key) + }) + } +}