diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 0c967819f..85eb32a3f 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -50,6 +50,8 @@ type ( // SASL Password SASLPassword string `toml:"sasl_password"` + Log telegraf.Logger `toml:"-"` + tlsConfig tls.Config producer sarama.SyncProducer @@ -316,13 +318,14 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error { for _, metric := range metrics { buf, err := k.serializer.Serialize(metric) if err != nil { - log.Printf("D! [outputs.kafka] Could not serialize metric: %v", err) + k.Log.Debugf("Could not serialize metric: %v", err) continue } m := &sarama.ProducerMessage{ - Topic: k.GetTopicName(metric), - Value: sarama.ByteEncoder(buf), + Topic: k.GetTopicName(metric), + Value: sarama.ByteEncoder(buf), + Timestamp: metric.Time(), } key, err := k.routingKey(metric) @@ -342,7 +345,11 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error { if errs, ok := err.(sarama.ProducerErrors); ok { for _, prodErr := range errs { if prodErr.Err == sarama.ErrMessageSizeTooLarge { - log.Printf("E! Error writing to output [kafka]: Message too large, consider increasing `max_message_bytes`; dropping batch") + k.Log.Error("Message too large, consider increasing `max_message_bytes`; dropping batch") + return nil + } + if prodErr.Err == sarama.ErrInvalidTimestamp { + k.Log.Error("The timestamp of the message is out of acceptable range, consider increasing broker `message.timestamp.difference.max.ms`; dropping batch") return nil } return prodErr