Add support for setting kafka client id (#4418)
This commit is contained in:
parent
0da94a1b3c
commit
c8f00030dd
|
@ -22,6 +22,9 @@ and use the old zookeeper connection method.
|
||||||
## Offset (must be either "oldest" or "newest")
|
## Offset (must be either "oldest" or "newest")
|
||||||
offset = "oldest"
|
offset = "oldest"
|
||||||
|
|
||||||
|
## Optional client id
|
||||||
|
# client_id = "my_client"
|
||||||
|
|
||||||
## Optional TLS Config
|
## Optional TLS Config
|
||||||
# tls_ca = "/etc/telegraf/ca.pem"
|
# tls_ca = "/etc/telegraf/ca.pem"
|
||||||
# tls_cert = "/etc/telegraf/cert.pem"
|
# tls_cert = "/etc/telegraf/cert.pem"
|
||||||
|
|
|
@ -17,6 +17,7 @@ import (
|
||||||
|
|
||||||
type Kafka struct {
|
type Kafka struct {
|
||||||
ConsumerGroup string
|
ConsumerGroup string
|
||||||
|
ClientID string `toml:"client_id"`
|
||||||
Topics []string
|
Topics []string
|
||||||
Brokers []string
|
Brokers []string
|
||||||
MaxMessageLen int
|
MaxMessageLen int
|
||||||
|
@ -60,6 +61,9 @@ var sampleConfig = `
|
||||||
## topic(s) to consume
|
## topic(s) to consume
|
||||||
topics = ["telegraf"]
|
topics = ["telegraf"]
|
||||||
|
|
||||||
|
## Optional Client id
|
||||||
|
# client_id = "Telegraf"
|
||||||
|
|
||||||
## Optional TLS Config
|
## Optional TLS Config
|
||||||
# tls_ca = "/etc/telegraf/ca.pem"
|
# tls_ca = "/etc/telegraf/ca.pem"
|
||||||
# tls_cert = "/etc/telegraf/cert.pem"
|
# tls_cert = "/etc/telegraf/cert.pem"
|
||||||
|
@ -114,6 +118,12 @@ func (k *Kafka) Start(acc telegraf.Accumulator) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if k.ClientID != "" {
|
||||||
|
config.ClientID = k.ClientID
|
||||||
|
} else {
|
||||||
|
config.ClientID = "Telegraf"
|
||||||
|
}
|
||||||
|
|
||||||
if tlsConfig != nil {
|
if tlsConfig != nil {
|
||||||
log.Printf("D! TLS Enabled")
|
log.Printf("D! TLS Enabled")
|
||||||
config.Net.TLS.Config = tlsConfig
|
config.Net.TLS.Config = tlsConfig
|
||||||
|
|
|
@ -10,6 +10,9 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
|
||||||
## Kafka topic for producer messages
|
## Kafka topic for producer messages
|
||||||
topic = "telegraf"
|
topic = "telegraf"
|
||||||
|
|
||||||
|
## Optional client id
|
||||||
|
# client_id = "my_client"
|
||||||
|
|
||||||
## Optional topic suffix configuration.
|
## Optional topic suffix configuration.
|
||||||
## If the section is omitted, no suffix is used.
|
## If the section is omitted, no suffix is used.
|
||||||
## Following topic suffix methods are supported:
|
## Following topic suffix methods are supported:
|
||||||
|
|
|
@ -25,6 +25,8 @@ type (
|
||||||
Brokers []string
|
Brokers []string
|
||||||
// Kafka topic
|
// Kafka topic
|
||||||
Topic string
|
Topic string
|
||||||
|
// Kafka client id
|
||||||
|
ClientID string `toml:"client_id"`
|
||||||
// Kafka topic suffix option
|
// Kafka topic suffix option
|
||||||
TopicSuffix TopicSuffix `toml:"topic_suffix"`
|
TopicSuffix TopicSuffix `toml:"topic_suffix"`
|
||||||
// Routing Key Tag
|
// Routing Key Tag
|
||||||
|
@ -69,6 +71,9 @@ var sampleConfig = `
|
||||||
## Kafka topic for producer messages
|
## Kafka topic for producer messages
|
||||||
topic = "telegraf"
|
topic = "telegraf"
|
||||||
|
|
||||||
|
## Optional Client id
|
||||||
|
# client_id = "Telegraf"
|
||||||
|
|
||||||
## Optional topic suffix configuration.
|
## Optional topic suffix configuration.
|
||||||
## If the section is omitted, no suffix is used.
|
## If the section is omitted, no suffix is used.
|
||||||
## Following topic suffix methods are supported:
|
## Following topic suffix methods are supported:
|
||||||
|
@ -186,6 +191,12 @@ func (k *Kafka) Connect() error {
|
||||||
}
|
}
|
||||||
config := sarama.NewConfig()
|
config := sarama.NewConfig()
|
||||||
|
|
||||||
|
if k.ClientID != "" {
|
||||||
|
config.ClientID = k.ClientID
|
||||||
|
} else {
|
||||||
|
config.ClientID = "Telegraf"
|
||||||
|
}
|
||||||
|
|
||||||
config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks)
|
config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks)
|
||||||
config.Producer.Compression = sarama.CompressionCodec(k.CompressionCodec)
|
config.Producer.Compression = sarama.CompressionCodec(k.CompressionCodec)
|
||||||
config.Producer.Retry.Max = k.MaxRetry
|
config.Producer.Retry.Max = k.MaxRetry
|
||||||
|
|
Loading…
Reference in New Issue