diff --git a/plugins/common/kafka/sasl.go b/plugins/common/kafka/sasl.go new file mode 100644 index 000000000..cd3358b38 --- /dev/null +++ b/plugins/common/kafka/sasl.go @@ -0,0 +1,25 @@ +package kafka + +import ( + "errors" + + "github.com/Shopify/sarama" +) + +func SASLVersion(kafkaVersion sarama.KafkaVersion, saslVersion *int) (int16, error) { + if saslVersion == nil { + if kafkaVersion.IsAtLeast(sarama.V1_0_0_0) { + return sarama.SASLHandshakeV1, nil + } + return sarama.SASLHandshakeV0, nil + } + + switch *saslVersion { + case 0: + return sarama.SASLHandshakeV0, nil + case 1: + return sarama.SASLHandshakeV1, nil + default: + return 0, errors.New("invalid SASL version") + } +} diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index b0f2a4798..dec39cc32 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -34,10 +34,14 @@ and use the old zookeeper connection method. ## Use TLS but skip chain & host verification # insecure_skip_verify = false - ## Optional SASL Config + ## SASL authentication credentials. These settings should typically be used + ## with TLS encryption enabled using the "enable_tls" option. # sasl_username = "kafka" # sasl_password = "secret" + ## SASL protocol version. When connecting to Azure EventHub set to 0. + # sasl_version = 1 + ## Name of the consumer group. # consumer_group = "telegraf_metrics_consumers" diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 39f6f0e2b..5cd6a9771 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -10,6 +10,7 @@ import ( "github.com/Shopify/sarama" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/common/kafka" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" ) @@ -33,16 +34,21 @@ const sampleConfig = ` # version = "" ## Optional TLS Config + # enable_tls = true # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false - ## Optional SASL Config + ## SASL authentication credentials. These settings should typically be used + ## with TLS encryption enabled using the "enable_tls" option. # sasl_username = "kafka" # sasl_password = "secret" + ## SASL protocol version. When connecting to Azure EventHub set to 0. + # sasl_version = 1 + ## Name of the consumer group. # consumer_group = "telegraf_metrics_consumers" @@ -95,9 +101,13 @@ type KafkaConsumer struct { Version string `toml:"version"` SASLPassword string `toml:"sasl_password"` SASLUsername string `toml:"sasl_username"` + SASLVersion *int `toml:"sasl_version"` + EnableTLS *bool `toml:"enable_tls"` tls.ClientConfig + Log telegraf.Logger `toml:"-"` + ConsumerCreator ConsumerGroupCreator `toml:"-"` consumer ConsumerGroup config *sarama.Config @@ -158,6 +168,10 @@ func (k *KafkaConsumer) Init() error { config.Version = version } + if k.EnableTLS != nil && *k.EnableTLS { + config.Net.TLS.Enable = true + } + tlsConfig, err := k.ClientConfig.TLSConfig() if err != nil { return err @@ -165,13 +179,25 @@ func (k *KafkaConsumer) Init() error { if tlsConfig != nil { config.Net.TLS.Config = tlsConfig - config.Net.TLS.Enable = true + + // To maintain backwards compatibility, if the enable_tls option is not + // set TLS is enabled if a non-default TLS config is used. + if k.EnableTLS == nil { + k.Log.Warnf("Use of deprecated configuration: enable_tls should be set when using TLS") + config.Net.TLS.Enable = true + } } if k.SASLUsername != "" && k.SASLPassword != "" { config.Net.SASL.User = k.SASLUsername config.Net.SASL.Password = k.SASLPassword config.Net.SASL.Enable = true + + version, err := kafka.SASLVersion(config.Version, k.SASLVersion) + if err != nil { + return err + } + config.Net.SASL.Version = version } if k.ClientID != "" { diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index 3aa0efa50..0c8063578 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -7,6 +7,7 @@ import ( "github.com/Shopify/sarama" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/plugins/parsers/value" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" @@ -68,6 +69,7 @@ func TestInit(t *testing.T) { name: "parses valid version string", plugin: &KafkaConsumer{ Version: "1.0.0", + Log: testutil.Logger{}, }, check: func(t *testing.T, plugin *KafkaConsumer) { require.Equal(t, plugin.config.Version, sarama.V1_0_0_0) @@ -77,6 +79,7 @@ func TestInit(t *testing.T) { name: "invalid version string", plugin: &KafkaConsumer{ Version: "100", + Log: testutil.Logger{}, }, initError: true, }, @@ -84,6 +87,7 @@ func TestInit(t *testing.T) { name: "custom client_id", plugin: &KafkaConsumer{ ClientID: "custom", + Log: testutil.Logger{}, }, check: func(t *testing.T, plugin *KafkaConsumer) { require.Equal(t, plugin.config.ClientID, "custom") @@ -93,6 +97,7 @@ func TestInit(t *testing.T) { name: "custom offset", plugin: &KafkaConsumer{ Offset: "newest", + Log: testutil.Logger{}, }, check: func(t *testing.T, plugin *KafkaConsumer) { require.Equal(t, plugin.config.Consumer.Offsets.Initial, sarama.OffsetNewest) @@ -102,9 +107,54 @@ func TestInit(t *testing.T) { name: "invalid offset", plugin: &KafkaConsumer{ Offset: "middle", + Log: testutil.Logger{}, }, initError: true, }, + { + name: "default tls without tls config", + plugin: &KafkaConsumer{ + Log: testutil.Logger{}, + }, + check: func(t *testing.T, plugin *KafkaConsumer) { + require.False(t, plugin.config.Net.TLS.Enable) + }, + }, + { + name: "default tls with a tls config", + plugin: &KafkaConsumer{ + ClientConfig: tls.ClientConfig{ + InsecureSkipVerify: true, + }, + Log: testutil.Logger{}, + }, + check: func(t *testing.T, plugin *KafkaConsumer) { + require.True(t, plugin.config.Net.TLS.Enable) + }, + }, + { + name: "disable tls", + plugin: &KafkaConsumer{ + EnableTLS: func() *bool { v := false; return &v }(), + ClientConfig: tls.ClientConfig{ + InsecureSkipVerify: true, + }, + Log: testutil.Logger{}, + }, + check: func(t *testing.T, plugin *KafkaConsumer) { + require.False(t, plugin.config.Net.TLS.Enable) + }, + }, + { + name: "enable tls", + plugin: &KafkaConsumer{ + EnableTLS: func() *bool { v := true; return &v }(), + Log: testutil.Logger{}, + }, + check: func(t *testing.T, plugin *KafkaConsumer) { + require.True(t, plugin.config.Net.TLS.Enable) + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -125,6 +175,7 @@ func TestStartStop(t *testing.T) { cg := &FakeConsumerGroup{errors: make(chan error)} plugin := &KafkaConsumer{ ConsumerCreator: &FakeCreator{ConsumerGroup: cg}, + Log: testutil.Logger{}, } err := plugin.Init() require.NoError(t, err) diff --git a/plugins/inputs/zookeeper/README.md b/plugins/inputs/zookeeper/README.md index c452e8663..23009c519 100644 --- a/plugins/inputs/zookeeper/README.md +++ b/plugins/inputs/zookeeper/README.md @@ -19,7 +19,7 @@ The zookeeper plugin collects variables outputted from the 'mntr' command # timeout = "5s" ## Optional TLS Config - # enable_ssl = true + # enable_tls = true # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" diff --git a/plugins/outputs/kafka/README.md b/plugins/outputs/kafka/README.md index 25b173a02..7b9fc0e30 100644 --- a/plugins/outputs/kafka/README.md +++ b/plugins/outputs/kafka/README.md @@ -96,6 +96,9 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm # sasl_username = "kafka" # sasl_password = "secret" + ## SASL protocol version. When connecting to Azure EventHub set to 0. + # sasl_version = 1 + ## Data format to output. ## Each data format has its own unique set of configuration options, read ## more about them here: diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 85eb32a3f..b4e71ef57 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -10,6 +10,7 @@ import ( "github.com/gofrs/uuid" "github.com/influxdata/telegraf" tlsint "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/common/kafka" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers" ) @@ -43,12 +44,12 @@ type ( // TLS certificate authority CA string + EnableTLS *bool `toml:"enable_tls"` tlsint.ClientConfig - // SASL Username SASLUsername string `toml:"sasl_username"` - // SASL Password SASLPassword string `toml:"sasl_password"` + SASLVersion *int `toml:"sasl_version"` Log telegraf.Logger `toml:"-"` @@ -170,6 +171,7 @@ var sampleConfig = ` # max_message_bytes = 1000000 ## Optional TLS Config + # enable_tls = true # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" @@ -180,6 +182,9 @@ var sampleConfig = ` # sasl_username = "kafka" # sasl_password = "secret" + ## SASL protocol version. When connecting to Azure EventHub set to 0. + # sasl_version = 1 + ## Data format to output. ## Each data format has its own unique set of configuration options, read ## more about them here: @@ -258,6 +263,10 @@ func (k *Kafka) Connect() error { k.TLSKey = k.Key } + if k.EnableTLS != nil && *k.EnableTLS { + config.Net.TLS.Enable = true + } + tlsConfig, err := k.ClientConfig.TLSConfig() if err != nil { return err @@ -265,13 +274,25 @@ func (k *Kafka) Connect() error { if tlsConfig != nil { config.Net.TLS.Config = tlsConfig - config.Net.TLS.Enable = true + + // To maintain backwards compatibility, if the enable_tls option is not + // set TLS is enabled if a non-default TLS config is used. + if k.EnableTLS == nil { + k.Log.Warnf("Use of deprecated configuration: enable_tls should be set when using TLS") + config.Net.TLS.Enable = true + } } if k.SASLUsername != "" && k.SASLPassword != "" { config.Net.SASL.User = k.SASLUsername config.Net.SASL.Password = k.SASLPassword config.Net.SASL.Enable = true + + version, err := kafka.SASLVersion(config.Version, k.SASLVersion) + if err != nil { + return err + } + config.Net.SASL.Version = version } producer, err := sarama.NewSyncProducer(k.Brokers, config)