diff --git a/plugins/outputs/kafka/README.md b/plugins/outputs/kafka/README.md index abd9c4921..b112c09cd 100644 --- a/plugins/outputs/kafka/README.md +++ b/plugins/outputs/kafka/README.md @@ -8,6 +8,34 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm brokers = ["localhost:9092"] ## Kafka topic for producer messages topic = "telegraf" + + ## Optional topic suffix configuration. + ## If the section is omitted, no suffix is used. + ## Following topic suffix methods are supported: + ## measurement - suffix equals to separator + measurement's name + ## tags - suffix equals to separator + specified tags' values + ## interleaved with separator + + ## Suffix equals to "_" + measurement's name + # [outputs.kafka.topic_suffix] + # method = "measurement" + # separator = "_" + + ## Suffix equals to "__" + measurement's "foo" tag value. + ## If there's no such a tag, suffix equals to an empty string + # [outputs.kafka.topic_suffix] + # method = "tags" + # keys = ["foo"] + # separator = "__" + + ## Suffix equals to "_" + measurement's "foo" and "bar" + ## tag values, separated by "_". If there is no such tags, + ## their values treated as empty strings. + # [outputs.kafka.topic_suffix] + # method = "tags" + # keys = ["foo", "bar"] + # separator = "_" + ## Telegraf tag to use as a routing key ## ie, if this tag exists, its value will be used as the routing key routing_tag = "host" @@ -57,10 +85,9 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm * `brokers`: List of strings, this is for speaking to a cluster of `kafka` brokers. On each flush interval, Telegraf will randomly choose one of the urls to write to. Each URL should just include host and port e.g. -> `["{host}:{port}","{host2}:{port2}"]` * `topic`: The `kafka` topic to publish to. - ### Optional parameters: -* `routing_tag`: if this tag exists, its value will be used as the routing key +* `routing_tag`: If this tag exists, its value will be used as the routing key * `compression_codec`: What level of compression to use: `0` -> no compression, `1` -> gzip compression, `2` -> snappy compression * `required_acks`: a setting for how may `acks` required from the `kafka` broker cluster. * `max_retry`: Max number of times to retry failed write @@ -69,3 +96,5 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm * `ssl_key`: SSL key * `insecure_skip_verify`: Use SSL but skip chain & host verification (default: false) * `data_format`: [About Telegraf data formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md) +* `topic_suffix`: Which, if any, method of calculating `kafka` topic suffix to use. +For examples, please refer to sample configuration. \ No newline at end of file diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 421b5c2a1..c2f2e4460 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -3,6 +3,7 @@ package kafka import ( "crypto/tls" "fmt" + "strings" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" @@ -12,54 +13,97 @@ import ( "github.com/Shopify/sarama" ) -type Kafka struct { - // Kafka brokers to send metrics to - Brokers []string - // Kafka topic - Topic string - // Routing Key Tag - RoutingTag string `toml:"routing_tag"` - // Compression Codec Tag - CompressionCodec int - // RequiredAcks Tag - RequiredAcks int - // MaxRetry Tag - MaxRetry int - - // Legacy SSL config options - // TLS client certificate - Certificate string - // TLS client key - Key string - // TLS certificate authority - CA string - - // Path to CA file - SSLCA string `toml:"ssl_ca"` - // Path to host cert file - SSLCert string `toml:"ssl_cert"` - // Path to cert key file - SSLKey string `toml:"ssl_key"` - - // Skip SSL verification - InsecureSkipVerify bool - - // SASL Username - SASLUsername string `toml:"sasl_username"` - // SASL Password - SASLPassword string `toml:"sasl_password"` - - tlsConfig tls.Config - producer sarama.SyncProducer - - serializer serializers.Serializer +var ValidTopicSuffixMethods = []string{ + "", + "measurement", + "tags", } +type ( + Kafka struct { + // Kafka brokers to send metrics to + Brokers []string + // Kafka topic + Topic string + // Kafka topic suffix option + TopicSuffix TopicSuffix `toml:"topic_suffix"` + // Routing Key Tag + RoutingTag string `toml:"routing_tag"` + // Compression Codec Tag + CompressionCodec int + // RequiredAcks Tag + RequiredAcks int + // MaxRetry Tag + MaxRetry int + + // Legacy SSL config options + // TLS client certificate + Certificate string + // TLS client key + Key string + // TLS certificate authority + CA string + + // Path to CA file + SSLCA string `toml:"ssl_ca"` + // Path to host cert file + SSLCert string `toml:"ssl_cert"` + // Path to cert key file + SSLKey string `toml:"ssl_key"` + + // Skip SSL verification + InsecureSkipVerify bool + + // SASL Username + SASLUsername string `toml:"sasl_username"` + // SASL Password + SASLPassword string `toml:"sasl_password"` + + tlsConfig tls.Config + producer sarama.SyncProducer + + serializer serializers.Serializer + } + TopicSuffix struct { + Method string `toml:"method"` + Keys []string `toml:"keys"` + Separator string `toml:"separator"` + } +) + var sampleConfig = ` ## URLs of kafka brokers brokers = ["localhost:9092"] ## Kafka topic for producer messages topic = "telegraf" + + ## Optional topic suffix configuration. + ## If the section is omitted, no suffix is used. + ## Following topic suffix methods are supported: + ## measurement - suffix equals to separator + measurement's name + ## tags - suffix equals to separator + specified tags' values + ## interleaved with separator + + ## Suffix equals to "_" + measurement name + # [outputs.kafka.topic_suffix] + # method = "measurement" + # separator = "_" + + ## Suffix equals to "__" + measurement's "foo" tag value. + ## If there's no such a tag, suffix equals to an empty string + # [outputs.kafka.topic_suffix] + # method = "tags" + # keys = ["foo"] + # separator = "__" + + ## Suffix equals to "_" + measurement's "foo" and "bar" + ## tag values, separated by "_". If there is no such tags, + ## their values treated as empty strings. + # [outputs.kafka.topic_suffix] + # method = "tags" + # keys = ["foo", "bar"] + # separator = "_" + ## Telegraf tag to use as a routing key ## ie, if this tag exists, its value will be used as the routing key routing_tag = "host" @@ -108,11 +152,45 @@ var sampleConfig = ` data_format = "influx" ` +func ValidateTopicSuffixMethod(method string) error { + for _, validMethod := range ValidTopicSuffixMethods { + if method == validMethod { + return nil + } + } + return fmt.Errorf("Unkown topic suffix method provided: %s", method) +} + +func (k *Kafka) GetTopicName(metric telegraf.Metric) string { + var topicName string + switch k.TopicSuffix.Method { + case "measurement": + topicName = k.Topic + k.TopicSuffix.Separator + metric.Name() + case "tags": + var topicNameComponents []string + topicNameComponents = append(topicNameComponents, k.Topic) + for _, tag := range k.TopicSuffix.Keys { + tagValue := metric.Tags()[tag] + if tagValue != "" { + topicNameComponents = append(topicNameComponents, tagValue) + } + } + topicName = strings.Join(topicNameComponents, k.TopicSuffix.Separator) + default: + topicName = k.Topic + } + return topicName +} + func (k *Kafka) SetSerializer(serializer serializers.Serializer) { k.serializer = serializer } func (k *Kafka) Connect() error { + err := ValidateTopicSuffixMethod(k.TopicSuffix.Method) + if err != nil { + return err + } config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks) @@ -175,8 +253,10 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error { return err } + topicName := k.GetTopicName(metric) + m := &sarama.ProducerMessage{ - Topic: k.Topic, + Topic: topicName, Value: sarama.ByteEncoder(buf), } if h, ok := metric.Tags()[k.RoutingTag]; ok { diff --git a/plugins/outputs/kafka/kafka_test.go b/plugins/outputs/kafka/kafka_test.go index f99e0ecea..b18d9f15d 100644 --- a/plugins/outputs/kafka/kafka_test.go +++ b/plugins/outputs/kafka/kafka_test.go @@ -8,6 +8,11 @@ import ( "github.com/stretchr/testify/require" ) +type topicSuffixTestpair struct { + topicSuffix TopicSuffix + expectedTopic string +} + func TestConnectAndWrite(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") @@ -28,4 +33,66 @@ func TestConnectAndWrite(t *testing.T) { // Verify that we can successfully write data to the kafka broker err = k.Write(testutil.MockMetrics()) require.NoError(t, err) + k.Close() +} + +func TestTopicSuffixes(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + topic := "Test" + + metric := testutil.TestMetric(1) + metricTagName := "tag1" + metricTagValue := metric.Tags()[metricTagName] + metricName := metric.Name() + + var testcases = []topicSuffixTestpair{ + // This ensures empty separator is okay + {TopicSuffix{Method: "measurement"}, + topic + metricName}, + {TopicSuffix{Method: "measurement", Separator: "sep"}, + topic + "sep" + metricName}, + {TopicSuffix{Method: "tags", Keys: []string{metricTagName}, Separator: "_"}, + topic + "_" + metricTagValue}, + {TopicSuffix{Method: "tags", Keys: []string{metricTagName, metricTagName, metricTagName}, Separator: "___"}, + topic + "___" + metricTagValue + "___" + metricTagValue + "___" + metricTagValue}, + {TopicSuffix{Method: "tags", Keys: []string{metricTagName, metricTagName, metricTagName}}, + topic + metricTagValue + metricTagValue + metricTagValue}, + // This ensures non-existing tags are ignored + {TopicSuffix{Method: "tags", Keys: []string{"non_existing_tag", "non_existing_tag"}, Separator: "___"}, + topic}, + {TopicSuffix{Method: "tags", Keys: []string{metricTagName, "non_existing_tag"}, Separator: "___"}, + topic + "___" + metricTagValue}, + // This ensures backward compatibility + {TopicSuffix{}, + topic}, + } + + for _, testcase := range testcases { + topicSuffix := testcase.topicSuffix + expectedTopic := testcase.expectedTopic + k := &Kafka{ + Topic: topic, + TopicSuffix: topicSuffix, + } + + topic := k.GetTopicName(metric) + require.Equal(t, expectedTopic, topic) + } +} + +func TestValidateTopicSuffixMethod(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + err := ValidateTopicSuffixMethod("invalid_topic_suffix_method") + require.Error(t, err, "Topic suffix method used should be invalid.") + + for _, method := range ValidTopicSuffixMethods { + err := ValidateTopicSuffixMethod(method) + require.NoError(t, err, "Topic suffix method used should be valid.") + } }