From 61513c64b85b2cb325a49b930f5ae6ff44af4091 Mon Sep 17 00:00:00 2001 From: Mauro Murari Date: Mon, 13 Aug 2018 20:40:18 -0300 Subject: [PATCH] Add message 'max_bytes' configuration (#4537) --- plugins/outputs/kafka/kafka.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 5fdb8d857..dc8d846da 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -37,6 +37,8 @@ type ( RequiredAcks int // MaxRetry Tag MaxRetry int + // Max Message Bytes + MaxMessageBytes int Version string `toml:"version"` @@ -140,6 +142,9 @@ var sampleConfig = ` ## until the next flush. # max_retry = 3 + ## Max message bytes, should be lower than server message.max.bytes config + # MaxMessageBytes = 0 + ## Optional TLS Config # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" @@ -218,6 +223,10 @@ func (k *Kafka) Connect() error { config.Producer.Retry.Max = k.MaxRetry config.Producer.Return.Successes = true + if k.MaxMessageBytes > 0 { + config.Producer.MaxMessageBytes = k.MaxMessageBytes + } + // Legacy support ssl config if k.Certificate != "" { k.TLSCert = k.Certificate