Add Kafka output plugin topic_suffix option (#3196)

This commit is contained in:
Pavel Gurkov
2017-09-06 23:18:26 +02:00
committed by Daniel Nelson
parent ab1c11b06d
commit 5d4eec606f
3 changed files with 220 additions and 44 deletions

View File

@@ -3,6 +3,7 @@ package kafka
import (
"crypto/tls"
"fmt"
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
@@ -12,54 +13,97 @@ import (
"github.com/Shopify/sarama"
)
type Kafka struct {
// Kafka brokers to send metrics to
Brokers []string
// Kafka topic
Topic string
// Routing Key Tag
RoutingTag string `toml:"routing_tag"`
// Compression Codec Tag
CompressionCodec int
// RequiredAcks Tag
RequiredAcks int
// MaxRetry Tag
MaxRetry int
// Legacy SSL config options
// TLS client certificate
Certificate string
// TLS client key
Key string
// TLS certificate authority
CA string
// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`
// Skip SSL verification
InsecureSkipVerify bool
// SASL Username
SASLUsername string `toml:"sasl_username"`
// SASL Password
SASLPassword string `toml:"sasl_password"`
tlsConfig tls.Config
producer sarama.SyncProducer
serializer serializers.Serializer
var ValidTopicSuffixMethods = []string{
"",
"measurement",
"tags",
}
type (
Kafka struct {
// Kafka brokers to send metrics to
Brokers []string
// Kafka topic
Topic string
// Kafka topic suffix option
TopicSuffix TopicSuffix `toml:"topic_suffix"`
// Routing Key Tag
RoutingTag string `toml:"routing_tag"`
// Compression Codec Tag
CompressionCodec int
// RequiredAcks Tag
RequiredAcks int
// MaxRetry Tag
MaxRetry int
// Legacy SSL config options
// TLS client certificate
Certificate string
// TLS client key
Key string
// TLS certificate authority
CA string
// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`
// Skip SSL verification
InsecureSkipVerify bool
// SASL Username
SASLUsername string `toml:"sasl_username"`
// SASL Password
SASLPassword string `toml:"sasl_password"`
tlsConfig tls.Config
producer sarama.SyncProducer
serializer serializers.Serializer
}
TopicSuffix struct {
Method string `toml:"method"`
Keys []string `toml:"keys"`
Separator string `toml:"separator"`
}
)
var sampleConfig = `
## URLs of kafka brokers
brokers = ["localhost:9092"]
## Kafka topic for producer messages
topic = "telegraf"
## Optional topic suffix configuration.
## If the section is omitted, no suffix is used.
## Following topic suffix methods are supported:
## measurement - suffix equals to separator + measurement's name
## tags - suffix equals to separator + specified tags' values
## interleaved with separator
## Suffix equals to "_" + measurement name
# [outputs.kafka.topic_suffix]
# method = "measurement"
# separator = "_"
## Suffix equals to "__" + measurement's "foo" tag value.
## If there's no such a tag, suffix equals to an empty string
# [outputs.kafka.topic_suffix]
# method = "tags"
# keys = ["foo"]
# separator = "__"
## Suffix equals to "_" + measurement's "foo" and "bar"
## tag values, separated by "_". If there is no such tags,
## their values treated as empty strings.
# [outputs.kafka.topic_suffix]
# method = "tags"
# keys = ["foo", "bar"]
# separator = "_"
## Telegraf tag to use as a routing key
## ie, if this tag exists, its value will be used as the routing key
routing_tag = "host"
@@ -108,11 +152,45 @@ var sampleConfig = `
data_format = "influx"
`
func ValidateTopicSuffixMethod(method string) error {
for _, validMethod := range ValidTopicSuffixMethods {
if method == validMethod {
return nil
}
}
return fmt.Errorf("Unkown topic suffix method provided: %s", method)
}
func (k *Kafka) GetTopicName(metric telegraf.Metric) string {
var topicName string
switch k.TopicSuffix.Method {
case "measurement":
topicName = k.Topic + k.TopicSuffix.Separator + metric.Name()
case "tags":
var topicNameComponents []string
topicNameComponents = append(topicNameComponents, k.Topic)
for _, tag := range k.TopicSuffix.Keys {
tagValue := metric.Tags()[tag]
if tagValue != "" {
topicNameComponents = append(topicNameComponents, tagValue)
}
}
topicName = strings.Join(topicNameComponents, k.TopicSuffix.Separator)
default:
topicName = k.Topic
}
return topicName
}
func (k *Kafka) SetSerializer(serializer serializers.Serializer) {
k.serializer = serializer
}
func (k *Kafka) Connect() error {
err := ValidateTopicSuffixMethod(k.TopicSuffix.Method)
if err != nil {
return err
}
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks)
@@ -175,8 +253,10 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
return err
}
topicName := k.GetTopicName(metric)
m := &sarama.ProducerMessage{
Topic: k.Topic,
Topic: topicName,
Value: sarama.ByteEncoder(buf),
}
if h, ok := metric.Tags()[k.RoutingTag]; ok {