From 317c823bfc873fa0c74caca394ca7c6d86c3c708 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 3 Dec 2019 11:48:53 -0800 Subject: [PATCH] Set message timestamp to the metric time in kafka output (#6746) --- plugins/outputs/kafka/kafka.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 0c967819f..85eb32a3f 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -50,6 +50,8 @@ type ( // SASL Password SASLPassword string `toml:"sasl_password"` + Log telegraf.Logger `toml:"-"` + tlsConfig tls.Config producer sarama.SyncProducer @@ -316,13 +318,14 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error { for _, metric := range metrics { buf, err := k.serializer.Serialize(metric) if err != nil { - log.Printf("D! [outputs.kafka] Could not serialize metric: %v", err) + k.Log.Debugf("Could not serialize metric: %v", err) continue } m := &sarama.ProducerMessage{ - Topic: k.GetTopicName(metric), - Value: sarama.ByteEncoder(buf), + Topic: k.GetTopicName(metric), + Value: sarama.ByteEncoder(buf), + Timestamp: metric.Time(), } key, err := k.routingKey(metric) @@ -342,7 +345,11 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error { if errs, ok := err.(sarama.ProducerErrors); ok { for _, prodErr := range errs { if prodErr.Err == sarama.ErrMessageSizeTooLarge { - log.Printf("E! Error writing to output [kafka]: Message too large, consider increasing `max_message_bytes`; dropping batch") + k.Log.Error("Message too large, consider increasing `max_message_bytes`; dropping batch") + return nil + } + if prodErr.Err == sarama.ErrInvalidTimestamp { + k.Log.Error("The timestamp of the message is out of acceptable range, consider increasing broker `message.timestamp.difference.max.ms`; dropping batch") return nil } return prodErr