Set message timestamp to the metric time in kafka output (#6746)
This commit is contained in:
parent
cdb00d6fe7
commit
317c823bfc
|
@ -50,6 +50,8 @@ type (
|
||||||
// SASL Password
|
// SASL Password
|
||||||
SASLPassword string `toml:"sasl_password"`
|
SASLPassword string `toml:"sasl_password"`
|
||||||
|
|
||||||
|
Log telegraf.Logger `toml:"-"`
|
||||||
|
|
||||||
tlsConfig tls.Config
|
tlsConfig tls.Config
|
||||||
producer sarama.SyncProducer
|
producer sarama.SyncProducer
|
||||||
|
|
||||||
|
@ -316,13 +318,14 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
buf, err := k.serializer.Serialize(metric)
|
buf, err := k.serializer.Serialize(metric)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("D! [outputs.kafka] Could not serialize metric: %v", err)
|
k.Log.Debugf("Could not serialize metric: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
m := &sarama.ProducerMessage{
|
m := &sarama.ProducerMessage{
|
||||||
Topic: k.GetTopicName(metric),
|
Topic: k.GetTopicName(metric),
|
||||||
Value: sarama.ByteEncoder(buf),
|
Value: sarama.ByteEncoder(buf),
|
||||||
|
Timestamp: metric.Time(),
|
||||||
}
|
}
|
||||||
|
|
||||||
key, err := k.routingKey(metric)
|
key, err := k.routingKey(metric)
|
||||||
|
@ -342,7 +345,11 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
|
||||||
if errs, ok := err.(sarama.ProducerErrors); ok {
|
if errs, ok := err.(sarama.ProducerErrors); ok {
|
||||||
for _, prodErr := range errs {
|
for _, prodErr := range errs {
|
||||||
if prodErr.Err == sarama.ErrMessageSizeTooLarge {
|
if prodErr.Err == sarama.ErrMessageSizeTooLarge {
|
||||||
log.Printf("E! Error writing to output [kafka]: Message too large, consider increasing `max_message_bytes`; dropping batch")
|
k.Log.Error("Message too large, consider increasing `max_message_bytes`; dropping batch")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if prodErr.Err == sarama.ErrInvalidTimestamp {
|
||||||
|
k.Log.Error("The timestamp of the message is out of acceptable range, consider increasing broker `message.timestamp.difference.max.ms`; dropping batch")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return prodErr
|
return prodErr
|
||||||
|
|
Loading…
Reference in New Issue