Add support for static and random routing keys in kafka output (#4579)

This commit is contained in:
Daniel Nelson 2018-08-21 12:44:10 -07:00 committed by GitHub
parent edb6e1f655
commit d2cf9a7157
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 102 additions and 19 deletions

View File

@ -50,6 +50,13 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
## ie, if this tag exists, its value will be used as the routing key ## ie, if this tag exists, its value will be used as the routing key
routing_tag = "host" routing_tag = "host"
## Static routing key. Used when no routing_tag is set or as a fallback
## when the tag specified in routing tag is not found. If set to "random",
## a random value will be generated for each message.
## ex: routing_key = "random"
## routing_key = "telegraf"
# routing_key = ""
## CompressionCodec represents the various compression codecs recognized by ## CompressionCodec represents the various compression codecs recognized by
## Kafka in messages. ## Kafka in messages.
## 0 : No compression ## 0 : No compression

View File

@ -10,6 +10,7 @@ import (
tlsint "github.com/influxdata/telegraf/internal/tls" tlsint "github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/plugins/serializers"
uuid "github.com/satori/go.uuid"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
) )
@ -22,24 +23,16 @@ var ValidTopicSuffixMethods = []string{
type ( type (
Kafka struct { Kafka struct {
// Kafka brokers to send metrics to Brokers []string
Brokers []string Topic string
// Kafka topic ClientID string `toml:"client_id"`
Topic string TopicSuffix TopicSuffix `toml:"topic_suffix"`
// Kafka client id RoutingTag string `toml:"routing_tag"`
ClientID string `toml:"client_id"` RoutingKey string `toml:"routing_key"`
// Kafka topic suffix option
TopicSuffix TopicSuffix `toml:"topic_suffix"`
// Routing Key Tag
RoutingTag string `toml:"routing_tag"`
// Compression Codec Tag
CompressionCodec int CompressionCodec int
// RequiredAcks Tag RequiredAcks int
RequiredAcks int MaxRetry int
// MaxRetry Tag MaxMessageBytes int `toml:"max_message_bytes"`
MaxRetry int
// Max Message Bytes
MaxMessageBytes int `toml:"max_message_bytes"`
Version string `toml:"version"` Version string `toml:"version"`
@ -116,6 +109,13 @@ var sampleConfig = `
## ie, if this tag exists, its value will be used as the routing key ## ie, if this tag exists, its value will be used as the routing key
routing_tag = "host" routing_tag = "host"
## Static routing key. Used when no routing_tag is set or as a fallback
## when the tag specified in routing tag is not found. If set to "random",
## a random value will be generated for each message.
## ex: routing_key = "random"
## routing_key = "telegraf"
# routing_key = ""
## CompressionCodec represents the various compression codecs recognized by ## CompressionCodec represents the various compression codecs recognized by
## Kafka in messages. ## Kafka in messages.
## 0 : No compression ## 0 : No compression
@ -273,6 +273,22 @@ func (k *Kafka) Description() string {
return "Configuration for the Kafka server to send metrics to" return "Configuration for the Kafka server to send metrics to"
} }
func (k *Kafka) routingKey(metric telegraf.Metric) string {
if k.RoutingTag != "" {
key, ok := metric.GetTag(k.RoutingTag)
if ok {
return key
}
}
if k.RoutingKey == "random" {
u := uuid.NewV4()
return u.String()
}
return k.RoutingKey
}
func (k *Kafka) Write(metrics []telegraf.Metric) error { func (k *Kafka) Write(metrics []telegraf.Metric) error {
msgs := make([]*sarama.ProducerMessage, 0, len(metrics)) msgs := make([]*sarama.ProducerMessage, 0, len(metrics))
for _, metric := range metrics { for _, metric := range metrics {
@ -285,8 +301,9 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
Topic: k.GetTopicName(metric), Topic: k.GetTopicName(metric),
Value: sarama.ByteEncoder(buf), Value: sarama.ByteEncoder(buf),
} }
if h, ok := metric.GetTag(k.RoutingTag); ok { key := k.routingKey(metric)
m.Key = sarama.StringEncoder(h) if key != "" {
m.Key = sarama.StringEncoder(key)
} }
msgs = append(msgs, m) msgs = append(msgs, m)
} }

View File

@ -2,7 +2,10 @@ package kafka
import ( import (
"testing" "testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -96,3 +99,59 @@ func TestValidateTopicSuffixMethod(t *testing.T) {
require.NoError(t, err, "Topic suffix method used should be valid.") require.NoError(t, err, "Topic suffix method used should be valid.")
} }
} }
func TestRoutingKey(t *testing.T) {
tests := []struct {
name string
kafka *Kafka
metric telegraf.Metric
check func(t *testing.T, routingKey string)
}{
{
name: "static routing key",
kafka: &Kafka{
RoutingKey: "static",
},
metric: func() telegraf.Metric {
m, _ := metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
)
return m
}(),
check: func(t *testing.T, routingKey string) {
require.Equal(t, "static", routingKey)
},
},
{
name: "random routing key",
kafka: &Kafka{
RoutingKey: "random",
},
metric: func() telegraf.Metric {
m, _ := metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
)
return m
}(),
check: func(t *testing.T, routingKey string) {
require.Equal(t, 36, len(routingKey))
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
key := tt.kafka.routingKey(tt.metric)
tt.check(t, key)
})
}
}