Do not add invalid timestamps to kafka messages (#6908)
This commit is contained in:
parent
68925ed1ef
commit
f6b302621e
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/Shopify/sarama"
|
"github.com/Shopify/sarama"
|
||||||
"github.com/gofrs/uuid"
|
"github.com/gofrs/uuid"
|
||||||
|
@ -21,6 +22,8 @@ var ValidTopicSuffixMethods = []string{
|
||||||
"tags",
|
"tags",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var zeroTime = time.Unix(0, 0)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
Kafka struct {
|
Kafka struct {
|
||||||
Brokers []string
|
Brokers []string
|
||||||
|
@ -346,7 +349,11 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
|
||||||
m := &sarama.ProducerMessage{
|
m := &sarama.ProducerMessage{
|
||||||
Topic: k.GetTopicName(metric),
|
Topic: k.GetTopicName(metric),
|
||||||
Value: sarama.ByteEncoder(buf),
|
Value: sarama.ByteEncoder(buf),
|
||||||
Timestamp: metric.Time(),
|
}
|
||||||
|
|
||||||
|
// Negative timestamps are not allowed by the Kafka protocol.
|
||||||
|
if !metric.Time().Before(zeroTime) {
|
||||||
|
m.Timestamp = metric.Time()
|
||||||
}
|
}
|
||||||
|
|
||||||
key, err := k.routingKey(metric)
|
key, err := k.routingKey(metric)
|
||||||
|
|
Loading…
Reference in New Issue