Add Kafka output plugin topic_suffix option (#3196)
This commit is contained in:
parent
df9bb7278b
commit
843e6ac044
|
@ -8,6 +8,34 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
|
||||||
brokers = ["localhost:9092"]
|
brokers = ["localhost:9092"]
|
||||||
## Kafka topic for producer messages
|
## Kafka topic for producer messages
|
||||||
topic = "telegraf"
|
topic = "telegraf"
|
||||||
|
|
||||||
|
## Optional topic suffix configuration.
|
||||||
|
## If the section is omitted, no suffix is used.
|
||||||
|
## Following topic suffix methods are supported:
|
||||||
|
## measurement - suffix equals to separator + measurement's name
|
||||||
|
## tags - suffix equals to separator + specified tags' values
|
||||||
|
## interleaved with separator
|
||||||
|
|
||||||
|
## Suffix equals to "_" + measurement's name
|
||||||
|
# [outputs.kafka.topic_suffix]
|
||||||
|
# method = "measurement"
|
||||||
|
# separator = "_"
|
||||||
|
|
||||||
|
## Suffix equals to "__" + measurement's "foo" tag value.
|
||||||
|
## If there's no such a tag, suffix equals to an empty string
|
||||||
|
# [outputs.kafka.topic_suffix]
|
||||||
|
# method = "tags"
|
||||||
|
# keys = ["foo"]
|
||||||
|
# separator = "__"
|
||||||
|
|
||||||
|
## Suffix equals to "_" + measurement's "foo" and "bar"
|
||||||
|
## tag values, separated by "_". If there is no such tags,
|
||||||
|
## their values treated as empty strings.
|
||||||
|
# [outputs.kafka.topic_suffix]
|
||||||
|
# method = "tags"
|
||||||
|
# keys = ["foo", "bar"]
|
||||||
|
# separator = "_"
|
||||||
|
|
||||||
## Telegraf tag to use as a routing key
|
## Telegraf tag to use as a routing key
|
||||||
## ie, if this tag exists, its value will be used as the routing key
|
## ie, if this tag exists, its value will be used as the routing key
|
||||||
routing_tag = "host"
|
routing_tag = "host"
|
||||||
|
@ -57,10 +85,9 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
|
||||||
* `brokers`: List of strings, this is for speaking to a cluster of `kafka` brokers. On each flush interval, Telegraf will randomly choose one of the urls to write to. Each URL should just include host and port e.g. -> `["{host}:{port}","{host2}:{port2}"]`
|
* `brokers`: List of strings, this is for speaking to a cluster of `kafka` brokers. On each flush interval, Telegraf will randomly choose one of the urls to write to. Each URL should just include host and port e.g. -> `["{host}:{port}","{host2}:{port2}"]`
|
||||||
* `topic`: The `kafka` topic to publish to.
|
* `topic`: The `kafka` topic to publish to.
|
||||||
|
|
||||||
|
|
||||||
### Optional parameters:
|
### Optional parameters:
|
||||||
|
|
||||||
* `routing_tag`: if this tag exists, its value will be used as the routing key
|
* `routing_tag`: If this tag exists, its value will be used as the routing key
|
||||||
* `compression_codec`: What level of compression to use: `0` -> no compression, `1` -> gzip compression, `2` -> snappy compression
|
* `compression_codec`: What level of compression to use: `0` -> no compression, `1` -> gzip compression, `2` -> snappy compression
|
||||||
* `required_acks`: a setting for how may `acks` required from the `kafka` broker cluster.
|
* `required_acks`: a setting for how may `acks` required from the `kafka` broker cluster.
|
||||||
* `max_retry`: Max number of times to retry failed write
|
* `max_retry`: Max number of times to retry failed write
|
||||||
|
@ -69,3 +96,5 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
|
||||||
* `ssl_key`: SSL key
|
* `ssl_key`: SSL key
|
||||||
* `insecure_skip_verify`: Use SSL but skip chain & host verification (default: false)
|
* `insecure_skip_verify`: Use SSL but skip chain & host verification (default: false)
|
||||||
* `data_format`: [About Telegraf data formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md)
|
* `data_format`: [About Telegraf data formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md)
|
||||||
|
* `topic_suffix`: Which, if any, method of calculating `kafka` topic suffix to use.
|
||||||
|
For examples, please refer to sample configuration.
|
|
@ -3,6 +3,7 @@ package kafka
|
||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
@ -12,54 +13,97 @@ import (
|
||||||
"github.com/Shopify/sarama"
|
"github.com/Shopify/sarama"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Kafka struct {
|
var ValidTopicSuffixMethods = []string{
|
||||||
// Kafka brokers to send metrics to
|
"",
|
||||||
Brokers []string
|
"measurement",
|
||||||
// Kafka topic
|
"tags",
|
||||||
Topic string
|
|
||||||
// Routing Key Tag
|
|
||||||
RoutingTag string `toml:"routing_tag"`
|
|
||||||
// Compression Codec Tag
|
|
||||||
CompressionCodec int
|
|
||||||
// RequiredAcks Tag
|
|
||||||
RequiredAcks int
|
|
||||||
// MaxRetry Tag
|
|
||||||
MaxRetry int
|
|
||||||
|
|
||||||
// Legacy SSL config options
|
|
||||||
// TLS client certificate
|
|
||||||
Certificate string
|
|
||||||
// TLS client key
|
|
||||||
Key string
|
|
||||||
// TLS certificate authority
|
|
||||||
CA string
|
|
||||||
|
|
||||||
// Path to CA file
|
|
||||||
SSLCA string `toml:"ssl_ca"`
|
|
||||||
// Path to host cert file
|
|
||||||
SSLCert string `toml:"ssl_cert"`
|
|
||||||
// Path to cert key file
|
|
||||||
SSLKey string `toml:"ssl_key"`
|
|
||||||
|
|
||||||
// Skip SSL verification
|
|
||||||
InsecureSkipVerify bool
|
|
||||||
|
|
||||||
// SASL Username
|
|
||||||
SASLUsername string `toml:"sasl_username"`
|
|
||||||
// SASL Password
|
|
||||||
SASLPassword string `toml:"sasl_password"`
|
|
||||||
|
|
||||||
tlsConfig tls.Config
|
|
||||||
producer sarama.SyncProducer
|
|
||||||
|
|
||||||
serializer serializers.Serializer
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type (
|
||||||
|
Kafka struct {
|
||||||
|
// Kafka brokers to send metrics to
|
||||||
|
Brokers []string
|
||||||
|
// Kafka topic
|
||||||
|
Topic string
|
||||||
|
// Kafka topic suffix option
|
||||||
|
TopicSuffix TopicSuffix `toml:"topic_suffix"`
|
||||||
|
// Routing Key Tag
|
||||||
|
RoutingTag string `toml:"routing_tag"`
|
||||||
|
// Compression Codec Tag
|
||||||
|
CompressionCodec int
|
||||||
|
// RequiredAcks Tag
|
||||||
|
RequiredAcks int
|
||||||
|
// MaxRetry Tag
|
||||||
|
MaxRetry int
|
||||||
|
|
||||||
|
// Legacy SSL config options
|
||||||
|
// TLS client certificate
|
||||||
|
Certificate string
|
||||||
|
// TLS client key
|
||||||
|
Key string
|
||||||
|
// TLS certificate authority
|
||||||
|
CA string
|
||||||
|
|
||||||
|
// Path to CA file
|
||||||
|
SSLCA string `toml:"ssl_ca"`
|
||||||
|
// Path to host cert file
|
||||||
|
SSLCert string `toml:"ssl_cert"`
|
||||||
|
// Path to cert key file
|
||||||
|
SSLKey string `toml:"ssl_key"`
|
||||||
|
|
||||||
|
// Skip SSL verification
|
||||||
|
InsecureSkipVerify bool
|
||||||
|
|
||||||
|
// SASL Username
|
||||||
|
SASLUsername string `toml:"sasl_username"`
|
||||||
|
// SASL Password
|
||||||
|
SASLPassword string `toml:"sasl_password"`
|
||||||
|
|
||||||
|
tlsConfig tls.Config
|
||||||
|
producer sarama.SyncProducer
|
||||||
|
|
||||||
|
serializer serializers.Serializer
|
||||||
|
}
|
||||||
|
TopicSuffix struct {
|
||||||
|
Method string `toml:"method"`
|
||||||
|
Keys []string `toml:"keys"`
|
||||||
|
Separator string `toml:"separator"`
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
var sampleConfig = `
|
var sampleConfig = `
|
||||||
## URLs of kafka brokers
|
## URLs of kafka brokers
|
||||||
brokers = ["localhost:9092"]
|
brokers = ["localhost:9092"]
|
||||||
## Kafka topic for producer messages
|
## Kafka topic for producer messages
|
||||||
topic = "telegraf"
|
topic = "telegraf"
|
||||||
|
|
||||||
|
## Optional topic suffix configuration.
|
||||||
|
## If the section is omitted, no suffix is used.
|
||||||
|
## Following topic suffix methods are supported:
|
||||||
|
## measurement - suffix equals to separator + measurement's name
|
||||||
|
## tags - suffix equals to separator + specified tags' values
|
||||||
|
## interleaved with separator
|
||||||
|
|
||||||
|
## Suffix equals to "_" + measurement name
|
||||||
|
# [outputs.kafka.topic_suffix]
|
||||||
|
# method = "measurement"
|
||||||
|
# separator = "_"
|
||||||
|
|
||||||
|
## Suffix equals to "__" + measurement's "foo" tag value.
|
||||||
|
## If there's no such a tag, suffix equals to an empty string
|
||||||
|
# [outputs.kafka.topic_suffix]
|
||||||
|
# method = "tags"
|
||||||
|
# keys = ["foo"]
|
||||||
|
# separator = "__"
|
||||||
|
|
||||||
|
## Suffix equals to "_" + measurement's "foo" and "bar"
|
||||||
|
## tag values, separated by "_". If there is no such tags,
|
||||||
|
## their values treated as empty strings.
|
||||||
|
# [outputs.kafka.topic_suffix]
|
||||||
|
# method = "tags"
|
||||||
|
# keys = ["foo", "bar"]
|
||||||
|
# separator = "_"
|
||||||
|
|
||||||
## Telegraf tag to use as a routing key
|
## Telegraf tag to use as a routing key
|
||||||
## ie, if this tag exists, its value will be used as the routing key
|
## ie, if this tag exists, its value will be used as the routing key
|
||||||
routing_tag = "host"
|
routing_tag = "host"
|
||||||
|
@ -108,11 +152,45 @@ var sampleConfig = `
|
||||||
data_format = "influx"
|
data_format = "influx"
|
||||||
`
|
`
|
||||||
|
|
||||||
|
func ValidateTopicSuffixMethod(method string) error {
|
||||||
|
for _, validMethod := range ValidTopicSuffixMethods {
|
||||||
|
if method == validMethod {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fmt.Errorf("Unkown topic suffix method provided: %s", method)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *Kafka) GetTopicName(metric telegraf.Metric) string {
|
||||||
|
var topicName string
|
||||||
|
switch k.TopicSuffix.Method {
|
||||||
|
case "measurement":
|
||||||
|
topicName = k.Topic + k.TopicSuffix.Separator + metric.Name()
|
||||||
|
case "tags":
|
||||||
|
var topicNameComponents []string
|
||||||
|
topicNameComponents = append(topicNameComponents, k.Topic)
|
||||||
|
for _, tag := range k.TopicSuffix.Keys {
|
||||||
|
tagValue := metric.Tags()[tag]
|
||||||
|
if tagValue != "" {
|
||||||
|
topicNameComponents = append(topicNameComponents, tagValue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
topicName = strings.Join(topicNameComponents, k.TopicSuffix.Separator)
|
||||||
|
default:
|
||||||
|
topicName = k.Topic
|
||||||
|
}
|
||||||
|
return topicName
|
||||||
|
}
|
||||||
|
|
||||||
func (k *Kafka) SetSerializer(serializer serializers.Serializer) {
|
func (k *Kafka) SetSerializer(serializer serializers.Serializer) {
|
||||||
k.serializer = serializer
|
k.serializer = serializer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *Kafka) Connect() error {
|
func (k *Kafka) Connect() error {
|
||||||
|
err := ValidateTopicSuffixMethod(k.TopicSuffix.Method)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
config := sarama.NewConfig()
|
config := sarama.NewConfig()
|
||||||
|
|
||||||
config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks)
|
config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks)
|
||||||
|
@ -175,8 +253,10 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
topicName := k.GetTopicName(metric)
|
||||||
|
|
||||||
m := &sarama.ProducerMessage{
|
m := &sarama.ProducerMessage{
|
||||||
Topic: k.Topic,
|
Topic: topicName,
|
||||||
Value: sarama.ByteEncoder(buf),
|
Value: sarama.ByteEncoder(buf),
|
||||||
}
|
}
|
||||||
if h, ok := metric.Tags()[k.RoutingTag]; ok {
|
if h, ok := metric.Tags()[k.RoutingTag]; ok {
|
||||||
|
|
|
@ -8,6 +8,11 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type topicSuffixTestpair struct {
|
||||||
|
topicSuffix TopicSuffix
|
||||||
|
expectedTopic string
|
||||||
|
}
|
||||||
|
|
||||||
func TestConnectAndWrite(t *testing.T) {
|
func TestConnectAndWrite(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("Skipping integration test in short mode")
|
t.Skip("Skipping integration test in short mode")
|
||||||
|
@ -28,4 +33,66 @@ func TestConnectAndWrite(t *testing.T) {
|
||||||
// Verify that we can successfully write data to the kafka broker
|
// Verify that we can successfully write data to the kafka broker
|
||||||
err = k.Write(testutil.MockMetrics())
|
err = k.Write(testutil.MockMetrics())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
k.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTopicSuffixes(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping integration test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
topic := "Test"
|
||||||
|
|
||||||
|
metric := testutil.TestMetric(1)
|
||||||
|
metricTagName := "tag1"
|
||||||
|
metricTagValue := metric.Tags()[metricTagName]
|
||||||
|
metricName := metric.Name()
|
||||||
|
|
||||||
|
var testcases = []topicSuffixTestpair{
|
||||||
|
// This ensures empty separator is okay
|
||||||
|
{TopicSuffix{Method: "measurement"},
|
||||||
|
topic + metricName},
|
||||||
|
{TopicSuffix{Method: "measurement", Separator: "sep"},
|
||||||
|
topic + "sep" + metricName},
|
||||||
|
{TopicSuffix{Method: "tags", Keys: []string{metricTagName}, Separator: "_"},
|
||||||
|
topic + "_" + metricTagValue},
|
||||||
|
{TopicSuffix{Method: "tags", Keys: []string{metricTagName, metricTagName, metricTagName}, Separator: "___"},
|
||||||
|
topic + "___" + metricTagValue + "___" + metricTagValue + "___" + metricTagValue},
|
||||||
|
{TopicSuffix{Method: "tags", Keys: []string{metricTagName, metricTagName, metricTagName}},
|
||||||
|
topic + metricTagValue + metricTagValue + metricTagValue},
|
||||||
|
// This ensures non-existing tags are ignored
|
||||||
|
{TopicSuffix{Method: "tags", Keys: []string{"non_existing_tag", "non_existing_tag"}, Separator: "___"},
|
||||||
|
topic},
|
||||||
|
{TopicSuffix{Method: "tags", Keys: []string{metricTagName, "non_existing_tag"}, Separator: "___"},
|
||||||
|
topic + "___" + metricTagValue},
|
||||||
|
// This ensures backward compatibility
|
||||||
|
{TopicSuffix{},
|
||||||
|
topic},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, testcase := range testcases {
|
||||||
|
topicSuffix := testcase.topicSuffix
|
||||||
|
expectedTopic := testcase.expectedTopic
|
||||||
|
k := &Kafka{
|
||||||
|
Topic: topic,
|
||||||
|
TopicSuffix: topicSuffix,
|
||||||
|
}
|
||||||
|
|
||||||
|
topic := k.GetTopicName(metric)
|
||||||
|
require.Equal(t, expectedTopic, topic)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestValidateTopicSuffixMethod(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping integration test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
err := ValidateTopicSuffixMethod("invalid_topic_suffix_method")
|
||||||
|
require.Error(t, err, "Topic suffix method used should be invalid.")
|
||||||
|
|
||||||
|
for _, method := range ValidTopicSuffixMethods {
|
||||||
|
err := ValidateTopicSuffixMethod(method)
|
||||||
|
require.NoError(t, err, "Topic suffix method used should be valid.")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue