Drop message batches in kafka output if too large (#4565)
This commit is contained in:
parent
1fafa616d7
commit
886d8cc840
|
@ -120,6 +120,7 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
|
||||||
err := ro.write(batch)
|
err := ro.write(batch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ro.failMetrics.Add(batch...)
|
ro.failMetrics.Add(batch...)
|
||||||
|
log.Printf("E! Error writing to output [%s]: %v", ro.Name, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,7 +44,7 @@ and use the old zookeeper connection method.
|
||||||
|
|
||||||
## Maximum length of a message to consume, in bytes (default 0/unlimited);
|
## Maximum length of a message to consume, in bytes (default 0/unlimited);
|
||||||
## larger messages are dropped
|
## larger messages are dropped
|
||||||
max_message_len = 65536
|
max_message_len = 1000000
|
||||||
```
|
```
|
||||||
|
|
||||||
## Testing
|
## Testing
|
||||||
|
|
|
@ -21,6 +21,7 @@ type Kafka struct {
|
||||||
Topics []string
|
Topics []string
|
||||||
Brokers []string
|
Brokers []string
|
||||||
MaxMessageLen int
|
MaxMessageLen int
|
||||||
|
Version string `toml:"version"`
|
||||||
|
|
||||||
Cluster *cluster.Consumer
|
Cluster *cluster.Consumer
|
||||||
|
|
||||||
|
@ -64,6 +65,12 @@ var sampleConfig = `
|
||||||
## Optional Client id
|
## Optional Client id
|
||||||
# client_id = "Telegraf"
|
# client_id = "Telegraf"
|
||||||
|
|
||||||
|
## Set the minimal supported Kafka version. Setting this enables the use of new
|
||||||
|
## Kafka features and APIs. Of particular interest, lz4 compression
|
||||||
|
## requires at least version 0.10.0.0.
|
||||||
|
## ex: version = "1.1.0"
|
||||||
|
# version = ""
|
||||||
|
|
||||||
## Optional TLS Config
|
## Optional TLS Config
|
||||||
# tls_ca = "/etc/telegraf/ca.pem"
|
# tls_ca = "/etc/telegraf/ca.pem"
|
||||||
# tls_cert = "/etc/telegraf/cert.pem"
|
# tls_cert = "/etc/telegraf/cert.pem"
|
||||||
|
@ -88,7 +95,7 @@ var sampleConfig = `
|
||||||
|
|
||||||
## Maximum length of a message to consume, in bytes (default 0/unlimited);
|
## Maximum length of a message to consume, in bytes (default 0/unlimited);
|
||||||
## larger messages are dropped
|
## larger messages are dropped
|
||||||
max_message_len = 65536
|
max_message_len = 1000000
|
||||||
`
|
`
|
||||||
|
|
||||||
func (k *Kafka) SampleConfig() string {
|
func (k *Kafka) SampleConfig() string {
|
||||||
|
@ -111,6 +118,15 @@ func (k *Kafka) Start(acc telegraf.Accumulator) error {
|
||||||
k.acc = acc
|
k.acc = acc
|
||||||
|
|
||||||
config := cluster.NewConfig()
|
config := cluster.NewConfig()
|
||||||
|
|
||||||
|
if k.Version != "" {
|
||||||
|
version, err := sarama.ParseKafkaVersion(k.Version)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
config.Version = version
|
||||||
|
}
|
||||||
|
|
||||||
config.Consumer.Return.Errors = true
|
config.Consumer.Return.Errors = true
|
||||||
|
|
||||||
tlsConfig, err := k.ClientConfig.TLSConfig()
|
tlsConfig, err := k.ClientConfig.TLSConfig()
|
||||||
|
|
|
@ -55,6 +55,7 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
|
||||||
## 0 : No compression
|
## 0 : No compression
|
||||||
## 1 : Gzip compression
|
## 1 : Gzip compression
|
||||||
## 2 : Snappy compression
|
## 2 : Snappy compression
|
||||||
|
## 3 : LZ4 compression
|
||||||
# compression_codec = 0
|
# compression_codec = 0
|
||||||
|
|
||||||
## RequiredAcks is used in Produce Requests to tell the broker how many
|
## RequiredAcks is used in Produce Requests to tell the broker how many
|
||||||
|
|
|
@ -3,6 +3,7 @@ package kafka
|
||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
@ -79,7 +80,7 @@ var sampleConfig = `
|
||||||
# client_id = "Telegraf"
|
# client_id = "Telegraf"
|
||||||
|
|
||||||
## Set the minimal supported Kafka version. Setting this enables the use of new
|
## Set the minimal supported Kafka version. Setting this enables the use of new
|
||||||
## Kafka features and APIs. Of particular interested, lz4 compression
|
## Kafka features and APIs. Of particular interest, lz4 compression
|
||||||
## requires at least version 0.10.0.0.
|
## requires at least version 0.10.0.0.
|
||||||
## ex: version = "1.1.0"
|
## ex: version = "1.1.0"
|
||||||
# version = ""
|
# version = ""
|
||||||
|
@ -120,6 +121,7 @@ var sampleConfig = `
|
||||||
## 0 : No compression
|
## 0 : No compression
|
||||||
## 1 : Gzip compression
|
## 1 : Gzip compression
|
||||||
## 2 : Snappy compression
|
## 2 : Snappy compression
|
||||||
|
## 3 : LZ4 compression
|
||||||
# compression_codec = 0
|
# compression_codec = 0
|
||||||
|
|
||||||
## RequiredAcks is used in Produce Requests to tell the broker how many
|
## RequiredAcks is used in Produce Requests to tell the broker how many
|
||||||
|
@ -294,6 +296,10 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
|
||||||
// We could have many errors, return only the first encountered.
|
// We could have many errors, return only the first encountered.
|
||||||
if errs, ok := err.(sarama.ProducerErrors); ok {
|
if errs, ok := err.(sarama.ProducerErrors); ok {
|
||||||
for _, prodErr := range errs {
|
for _, prodErr := range errs {
|
||||||
|
if prodErr.Err == sarama.ErrMessageSizeTooLarge {
|
||||||
|
log.Printf("E! Error writing to output [kafka]: Message too large, consider increasing `max_message_bytes`; dropping batch")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return prodErr
|
return prodErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue