2015-08-26 17:02:10 +00:00
package kafka
import (
2016-01-11 12:20:51 +00:00
"crypto/tls"
2015-08-26 17:02:10 +00:00
"fmt"
2018-08-17 20:51:21 +00:00
"log"
2017-09-06 21:18:26 +00:00
"strings"
2020-01-15 23:26:50 +00:00
"time"
2016-02-03 19:59:34 +00:00
2019-06-04 00:34:48 +00:00
"github.com/Shopify/sarama"
2019-11-08 01:39:19 +00:00
"github.com/gofrs/uuid"
2016-01-27 21:21:36 +00:00
"github.com/influxdata/telegraf"
2018-05-04 23:33:23 +00:00
tlsint "github.com/influxdata/telegraf/internal/tls"
2020-01-03 00:27:26 +00:00
"github.com/influxdata/telegraf/plugins/common/kafka"
2016-01-27 23:15:14 +00:00
"github.com/influxdata/telegraf/plugins/outputs"
2016-02-10 22:50:07 +00:00
"github.com/influxdata/telegraf/plugins/serializers"
2015-08-26 17:02:10 +00:00
)
2017-09-06 21:18:26 +00:00
var ValidTopicSuffixMethods = [ ] string {
"" ,
"measurement" ,
"tags" ,
2015-08-26 17:02:10 +00:00
}
2020-01-15 23:26:50 +00:00
var zeroTime = time . Unix ( 0 , 0 )
2017-09-06 21:18:26 +00:00
type (
Kafka struct {
2020-03-10 20:38:26 +00:00
Brokers [ ] string ` toml:"brokers" `
Topic string ` toml:"topic" `
TopicTag string ` toml:"topic_tag" `
ExcludeTopicTag bool ` toml:"exclude_topic_tag" `
2018-08-21 19:44:10 +00:00
ClientID string ` toml:"client_id" `
TopicSuffix TopicSuffix ` toml:"topic_suffix" `
RoutingTag string ` toml:"routing_tag" `
RoutingKey string ` toml:"routing_key" `
2020-03-10 20:38:26 +00:00
CompressionCodec int ` toml:"compression_codec" `
RequiredAcks int ` toml:"required_acks" `
MaxRetry int ` toml:"max_retry" `
MaxMessageBytes int ` toml:"max_message_bytes" `
2017-09-06 21:18:26 +00:00
2018-07-31 22:09:30 +00:00
Version string ` toml:"version" `
2018-05-04 23:33:23 +00:00
// Legacy TLS config options
2017-09-06 21:18:26 +00:00
// TLS client certificate
Certificate string
// TLS client key
Key string
// TLS certificate authority
CA string
2020-01-03 00:27:26 +00:00
EnableTLS * bool ` toml:"enable_tls" `
2018-05-04 23:33:23 +00:00
tlsint . ClientConfig
2017-09-06 21:18:26 +00:00
SASLUsername string ` toml:"sasl_username" `
SASLPassword string ` toml:"sasl_password" `
2020-01-03 00:27:26 +00:00
SASLVersion * int ` toml:"sasl_version" `
2017-09-06 21:18:26 +00:00
2019-12-03 19:48:53 +00:00
Log telegraf . Logger ` toml:"-" `
2017-09-06 21:18:26 +00:00
tlsConfig tls . Config
2020-03-10 20:38:26 +00:00
producerFunc func ( addrs [ ] string , config * sarama . Config ) ( sarama . SyncProducer , error )
producer sarama . SyncProducer
2017-09-06 21:18:26 +00:00
serializer serializers . Serializer
}
TopicSuffix struct {
Method string ` toml:"method" `
Keys [ ] string ` toml:"keys" `
Separator string ` toml:"separator" `
}
)
2019-07-11 20:50:12 +00:00
// DebugLogger logs messages from sarama at the debug level.
type DebugLogger struct {
}
func ( * DebugLogger ) Print ( v ... interface { } ) {
args := make ( [ ] interface { } , 0 , len ( v ) + 1 )
args = append ( args , "D! [sarama] " )
log . Print ( v ... )
}
func ( * DebugLogger ) Printf ( format string , v ... interface { } ) {
log . Printf ( "D! [sarama] " + format , v ... )
}
func ( * DebugLogger ) Println ( v ... interface { } ) {
args := make ( [ ] interface { } , 0 , len ( v ) + 1 )
args = append ( args , "D! [sarama] " )
log . Println ( args ... )
}
2015-08-26 17:02:10 +00:00
var sampleConfig = `
2016-02-18 21:26:51 +00:00
# # URLs of kafka brokers
2015-10-15 21:53:29 +00:00
brokers = [ "localhost:9092" ]
2016-02-18 21:26:51 +00:00
# # Kafka topic for producer messages
2015-10-15 21:53:29 +00:00
topic = "telegraf"
2018-07-13 20:59:45 +00:00
2020-03-10 20:38:26 +00:00
# # The value of this tag will be used as the topic . If not set the ' topic '
# # option is used .
# topic_tag = ""
# # If true , the ' topic_tag ' will be removed from to the metric .
# exclude_topic_tag = false
2018-07-13 20:53:56 +00:00
# # Optional Client id
# client_id = "Telegraf"
2017-09-06 21:18:26 +00:00
2018-07-31 22:09:30 +00:00
# # Set the minimal supported Kafka version . Setting this enables the use of new
2018-08-17 20:51:21 +00:00
# # Kafka features and APIs . Of particular interest , lz4 compression
2018-07-31 22:09:30 +00:00
# # requires at least version 0.10 .0 .0 .
# # ex : version = "1.1.0"
# version = ""
2017-09-06 21:18:26 +00:00
# # Optional topic suffix configuration .
# # If the section is omitted , no suffix is used .
# # Following topic suffix methods are supported :
# # measurement - suffix equals to separator + measurement ' s name
# # tags - suffix equals to separator + specified tags ' values
# # interleaved with separator
# # Suffix equals to "_" + measurement name
# [ outputs . kafka . topic_suffix ]
# method = "measurement"
# separator = "_"
# # Suffix equals to "__" + measurement ' s "foo" tag value .
# # If there ' s no such a tag , suffix equals to an empty string
# [ outputs . kafka . topic_suffix ]
# method = "tags"
# keys = [ "foo" ]
# separator = "__"
# # Suffix equals to "_" + measurement ' s "foo" and "bar"
# # tag values , separated by "_" . If there is no such tags ,
# # their values treated as empty strings .
# [ outputs . kafka . topic_suffix ]
# method = "tags"
# keys = [ "foo" , "bar" ]
# separator = "_"
2020-02-05 00:40:00 +00:00
# # The routing tag specifies a tagkey on the metric whose value is used as
# # the message key . The message key is used to determine which partition to
# # send the message to . This tag is prefered over the routing_key option .
2015-10-15 21:53:29 +00:00
routing_tag = "host"
2016-01-11 12:20:51 +00:00
2020-02-05 00:40:00 +00:00
# # The routing key is set as the message key and used to determine which
# # partition to send the message to . This value is only used when no
# # routing_tag is set or as a fallback when the tag specified in routing tag
# # is not found .
# #
# # If set to "random" , a random value will be generated for each message .
# #
# # When unset , no message key is added and each message is routed to a random
# # partition .
# #
2018-08-21 19:44:10 +00:00
# # ex : routing_key = "random"
# # routing_key = "telegraf"
# routing_key = ""
2016-03-31 23:50:24 +00:00
# # CompressionCodec represents the various compression codecs recognized by
# # Kafka in messages .
2016-03-31 15:27:14 +00:00
# # 0 : No compression
# # 1 : Gzip compression
# # 2 : Snappy compression
2018-08-17 20:51:21 +00:00
# # 3 : LZ4 compression
2018-05-04 00:22:49 +00:00
# compression_codec = 0
2016-03-31 09:14:20 +00:00
2016-03-31 23:50:24 +00:00
# # RequiredAcks is used in Produce Requests to tell the broker how many
# # replica acknowledgements it must see before responding
# # 0 : the producer never waits for an acknowledgement from the broker .
# # This option provides the lowest latency but the weakest durability
# # guarantees ( some data will be lost when a server fails ) .
# # 1 : the producer gets an acknowledgement after the leader replica has
# # received the data . This option provides better durability as the
# # client waits until the server acknowledges the request as successful
# # ( only messages that were written to the now - dead leader but not yet
# # replicated will be lost ) .
# # - 1 : the producer gets an acknowledgement after all in - sync replicas have
# # received the data . This option provides the best durability , we
# # guarantee that no messages will be lost as long as at least one in
# # sync replica remains .
2018-05-04 00:22:49 +00:00
# required_acks = - 1
2016-03-31 09:14:20 +00:00
2018-05-04 00:22:49 +00:00
# # The maximum number of times to retry sending a metric before failing
# # until the next flush .
# max_retry = 3
2016-03-31 09:14:20 +00:00
2018-08-15 21:12:22 +00:00
# # The maximum permitted size of a message . Should be set equal to or
# # smaller than the broker ' s ' message . max . bytes ' .
# max_message_bytes = 1000000
2018-08-13 23:40:18 +00:00
2018-05-04 23:33:23 +00:00
# # Optional TLS Config
2020-01-03 00:27:26 +00:00
# enable_tls = true
2018-05-04 23:33:23 +00:00
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
# # Use TLS but skip chain & host verification
2016-02-03 19:59:34 +00:00
# insecure_skip_verify = false
2016-02-10 22:50:07 +00:00
2017-04-27 18:50:25 +00:00
# # Optional SASL Config
# sasl_username = "kafka"
# sasl_password = "secret"
2020-01-03 00:27:26 +00:00
# # SASL protocol version . When connecting to Azure EventHub set to 0.
# sasl_version = 1
2016-03-31 23:50:24 +00:00
# # Data format to output .
2017-04-27 18:06:40 +00:00
# # Each data format has its own unique set of configuration options , read
2016-02-18 21:26:51 +00:00
# # more about them here :
# # https : //github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
2018-05-04 00:22:49 +00:00
# data_format = "influx"
2015-08-26 17:02:10 +00:00
`
2017-09-06 21:18:26 +00:00
func ValidateTopicSuffixMethod ( method string ) error {
for _ , validMethod := range ValidTopicSuffixMethods {
if method == validMethod {
return nil
}
}
2017-09-06 21:19:42 +00:00
return fmt . Errorf ( "Unknown topic suffix method provided: %s" , method )
2017-09-06 21:18:26 +00:00
}
2020-03-10 20:38:26 +00:00
func ( k * Kafka ) GetTopicName ( metric telegraf . Metric ) ( telegraf . Metric , string ) {
topic := k . Topic
if k . TopicTag != "" {
if t , ok := metric . GetTag ( k . TopicTag ) ; ok {
topic = t
// If excluding the topic tag, a copy is required to avoid modifying
// the metric buffer.
if k . ExcludeTopicTag {
metric = metric . Copy ( )
metric . Accept ( )
metric . RemoveTag ( k . TopicTag )
}
}
}
2017-09-06 21:18:26 +00:00
var topicName string
switch k . TopicSuffix . Method {
case "measurement" :
2020-03-10 20:38:26 +00:00
topicName = topic + k . TopicSuffix . Separator + metric . Name ( )
2017-09-06 21:18:26 +00:00
case "tags" :
var topicNameComponents [ ] string
2020-03-10 20:38:26 +00:00
topicNameComponents = append ( topicNameComponents , topic )
2017-09-06 21:18:26 +00:00
for _ , tag := range k . TopicSuffix . Keys {
tagValue := metric . Tags ( ) [ tag ]
if tagValue != "" {
topicNameComponents = append ( topicNameComponents , tagValue )
}
}
topicName = strings . Join ( topicNameComponents , k . TopicSuffix . Separator )
default :
2020-03-10 20:38:26 +00:00
topicName = topic
2017-09-06 21:18:26 +00:00
}
2020-03-10 20:38:26 +00:00
return metric , topicName
2017-09-06 21:18:26 +00:00
}
2016-02-10 22:50:07 +00:00
func ( k * Kafka ) SetSerializer ( serializer serializers . Serializer ) {
k . serializer = serializer
}
2016-02-03 19:59:34 +00:00
func ( k * Kafka ) Connect ( ) error {
2017-09-06 21:18:26 +00:00
err := ValidateTopicSuffixMethod ( k . TopicSuffix . Method )
if err != nil {
return err
}
2016-02-03 19:59:34 +00:00
config := sarama . NewConfig ( )
2016-03-31 09:14:20 +00:00
2018-07-31 22:09:30 +00:00
if k . Version != "" {
version , err := sarama . ParseKafkaVersion ( k . Version )
if err != nil {
return err
}
config . Version = version
}
2018-07-13 20:53:56 +00:00
if k . ClientID != "" {
config . ClientID = k . ClientID
} else {
config . ClientID = "Telegraf"
}
2016-03-31 15:27:14 +00:00
config . Producer . RequiredAcks = sarama . RequiredAcks ( k . RequiredAcks )
config . Producer . Compression = sarama . CompressionCodec ( k . CompressionCodec )
config . Producer . Retry . Max = k . MaxRetry
2017-02-01 16:18:39 +00:00
config . Producer . Return . Successes = true
2016-01-11 12:20:51 +00:00
2018-08-13 23:40:18 +00:00
if k . MaxMessageBytes > 0 {
config . Producer . MaxMessageBytes = k . MaxMessageBytes
}
2016-02-03 19:59:34 +00:00
// Legacy support ssl config
if k . Certificate != "" {
2018-05-04 23:33:23 +00:00
k . TLSCert = k . Certificate
k . TLSCA = k . CA
k . TLSKey = k . Key
2016-01-11 12:20:51 +00:00
}
2020-01-03 00:27:26 +00:00
if k . EnableTLS != nil && * k . EnableTLS {
config . Net . TLS . Enable = true
}
2018-05-04 23:33:23 +00:00
tlsConfig , err := k . ClientConfig . TLSConfig ( )
2016-01-11 12:20:51 +00:00
if err != nil {
return err
}
if tlsConfig != nil {
config . Net . TLS . Config = tlsConfig
2020-01-03 00:27:26 +00:00
// To maintain backwards compatibility, if the enable_tls option is not
// set TLS is enabled if a non-default TLS config is used.
if k . EnableTLS == nil {
k . Log . Warnf ( "Use of deprecated configuration: enable_tls should be set when using TLS" )
config . Net . TLS . Enable = true
}
2016-01-11 12:20:51 +00:00
}
2017-04-27 18:50:25 +00:00
if k . SASLUsername != "" && k . SASLPassword != "" {
config . Net . SASL . User = k . SASLUsername
config . Net . SASL . Password = k . SASLPassword
config . Net . SASL . Enable = true
2020-01-03 00:27:26 +00:00
version , err := kafka . SASLVersion ( config . Version , k . SASLVersion )
if err != nil {
return err
}
config . Net . SASL . Version = version
2017-04-27 18:50:25 +00:00
}
2020-03-10 20:38:26 +00:00
producer , err := k . producerFunc ( k . Brokers , config )
2015-08-26 17:02:10 +00:00
if err != nil {
return err
}
k . producer = producer
return nil
}
func ( k * Kafka ) Close ( ) error {
return k . producer . Close ( )
}
func ( k * Kafka ) SampleConfig ( ) string {
return sampleConfig
}
func ( k * Kafka ) Description ( ) string {
return "Configuration for the Kafka server to send metrics to"
}
2019-11-08 01:39:19 +00:00
func ( k * Kafka ) routingKey ( metric telegraf . Metric ) ( string , error ) {
2018-08-21 19:44:10 +00:00
if k . RoutingTag != "" {
key , ok := metric . GetTag ( k . RoutingTag )
if ok {
2019-11-08 01:39:19 +00:00
return key , nil
2018-08-21 19:44:10 +00:00
}
}
if k . RoutingKey == "random" {
2019-11-08 01:39:19 +00:00
u , err := uuid . NewV4 ( )
if err != nil {
return "" , err
}
return u . String ( ) , nil
2018-08-21 19:44:10 +00:00
}
2019-11-08 01:39:19 +00:00
return k . RoutingKey , nil
2018-08-21 19:44:10 +00:00
}
2016-01-27 23:15:14 +00:00
func ( k * Kafka ) Write ( metrics [ ] telegraf . Metric ) error {
2018-07-31 22:08:04 +00:00
msgs := make ( [ ] * sarama . ProducerMessage , 0 , len ( metrics ) )
2016-02-10 22:50:07 +00:00
for _ , metric := range metrics {
2020-03-10 20:38:26 +00:00
metric , topic := k . GetTopicName ( metric )
2016-12-06 15:38:59 +00:00
buf , err := k . serializer . Serialize ( metric )
2016-02-10 22:50:07 +00:00
if err != nil {
2019-12-03 19:48:53 +00:00
k . Log . Debugf ( "Could not serialize metric: %v" , err )
2019-06-04 00:34:48 +00:00
continue
2015-08-26 17:02:10 +00:00
}
2016-02-10 22:50:07 +00:00
2016-12-06 15:38:59 +00:00
m := & sarama . ProducerMessage {
2020-03-10 20:38:26 +00:00
Topic : topic ,
2020-01-15 23:26:50 +00:00
Value : sarama . ByteEncoder ( buf ) ,
}
// Negative timestamps are not allowed by the Kafka protocol.
if ! metric . Time ( ) . Before ( zeroTime ) {
m . Timestamp = metric . Time ( )
2015-08-26 17:02:10 +00:00
}
2019-11-08 01:39:19 +00:00
key , err := k . routingKey ( metric )
if err != nil {
return fmt . Errorf ( "could not generate routing key: %v" , err )
}
2018-08-21 19:44:10 +00:00
if key != "" {
m . Key = sarama . StringEncoder ( key )
2016-12-06 15:38:59 +00:00
}
2018-07-31 22:08:04 +00:00
msgs = append ( msgs , m )
}
2016-12-06 15:38:59 +00:00
2018-07-31 22:08:04 +00:00
err := k . producer . SendMessages ( msgs )
if err != nil {
// We could have many errors, return only the first encountered.
if errs , ok := err . ( sarama . ProducerErrors ) ; ok {
for _ , prodErr := range errs {
2018-08-17 20:51:21 +00:00
if prodErr . Err == sarama . ErrMessageSizeTooLarge {
2019-12-03 19:48:53 +00:00
k . Log . Error ( "Message too large, consider increasing `max_message_bytes`; dropping batch" )
return nil
}
if prodErr . Err == sarama . ErrInvalidTimestamp {
k . Log . Error ( "The timestamp of the message is out of acceptable range, consider increasing broker `message.timestamp.difference.max.ms`; dropping batch" )
2018-08-17 20:51:21 +00:00
return nil
}
2018-07-31 22:08:04 +00:00
return prodErr
}
2015-08-26 17:02:10 +00:00
}
2018-07-31 22:08:04 +00:00
return err
2015-08-26 17:02:10 +00:00
}
2018-07-31 22:08:04 +00:00
2015-08-26 17:02:10 +00:00
return nil
}
func init ( ) {
2019-07-11 20:50:12 +00:00
sarama . Logger = & DebugLogger { }
2016-01-27 21:21:36 +00:00
outputs . Add ( "kafka" , func ( ) telegraf . Output {
2016-04-30 00:48:07 +00:00
return & Kafka {
MaxRetry : 3 ,
RequiredAcks : - 1 ,
2020-03-10 20:38:26 +00:00
producerFunc : sarama . NewSyncProducer ,
2016-04-30 00:48:07 +00:00
}
2015-08-26 17:02:10 +00:00
} )
}