diff --git a/internal/models/running_output.go b/internal/models/running_output.go index a0d1f6b03..25576d745 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -120,6 +120,7 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) { err := ro.write(batch) if err != nil { ro.failMetrics.Add(batch...) + log.Printf("E! Error writing to output [%s]: %v", ro.Name, err) } } } diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index 24a0efc0f..b6fc8fc89 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -44,7 +44,7 @@ and use the old zookeeper connection method. ## Maximum length of a message to consume, in bytes (default 0/unlimited); ## larger messages are dropped - max_message_len = 65536 + max_message_len = 1000000 ``` ## Testing diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index d3791b224..eba9b68ac 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -21,6 +21,7 @@ type Kafka struct { Topics []string Brokers []string MaxMessageLen int + Version string `toml:"version"` Cluster *cluster.Consumer @@ -64,6 +65,12 @@ var sampleConfig = ` ## Optional Client id # client_id = "Telegraf" + ## Set the minimal supported Kafka version. Setting this enables the use of new + ## Kafka features and APIs. Of particular interest, lz4 compression + ## requires at least version 0.10.0.0. + ## ex: version = "1.1.0" + # version = "" + ## Optional TLS Config # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" @@ -88,7 +95,7 @@ var sampleConfig = ` ## Maximum length of a message to consume, in bytes (default 0/unlimited); ## larger messages are dropped - max_message_len = 65536 + max_message_len = 1000000 ` func (k *Kafka) SampleConfig() string { @@ -111,6 +118,15 @@ func (k *Kafka) Start(acc telegraf.Accumulator) error { k.acc = acc config := cluster.NewConfig() + + if k.Version != "" { + version, err := sarama.ParseKafkaVersion(k.Version) + if err != nil { + return err + } + config.Version = version + } + config.Consumer.Return.Errors = true tlsConfig, err := k.ClientConfig.TLSConfig() diff --git a/plugins/outputs/kafka/README.md b/plugins/outputs/kafka/README.md index bb410a1d5..5f4758baa 100644 --- a/plugins/outputs/kafka/README.md +++ b/plugins/outputs/kafka/README.md @@ -55,6 +55,7 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm ## 0 : No compression ## 1 : Gzip compression ## 2 : Snappy compression + ## 3 : LZ4 compression # compression_codec = 0 ## RequiredAcks is used in Produce Requests to tell the broker how many diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 0bb5ca4a3..b9ae35396 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -3,6 +3,7 @@ package kafka import ( "crypto/tls" "fmt" + "log" "strings" "github.com/influxdata/telegraf" @@ -79,7 +80,7 @@ var sampleConfig = ` # client_id = "Telegraf" ## Set the minimal supported Kafka version. Setting this enables the use of new - ## Kafka features and APIs. Of particular interested, lz4 compression + ## Kafka features and APIs. Of particular interest, lz4 compression ## requires at least version 0.10.0.0. ## ex: version = "1.1.0" # version = "" @@ -120,6 +121,7 @@ var sampleConfig = ` ## 0 : No compression ## 1 : Gzip compression ## 2 : Snappy compression + ## 3 : LZ4 compression # compression_codec = 0 ## RequiredAcks is used in Produce Requests to tell the broker how many @@ -294,6 +296,10 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error { // We could have many errors, return only the first encountered. if errs, ok := err.(sarama.ProducerErrors); ok { for _, prodErr := range errs { + if prodErr.Err == sarama.ErrMessageSizeTooLarge { + log.Printf("E! Error writing to output [kafka]: Message too large, consider increasing `max_message_bytes`; dropping batch") + return nil + } return prodErr } }