diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index b4e71ef57..18a8925a5 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "strings" + "time" "github.com/Shopify/sarama" "github.com/gofrs/uuid" @@ -21,6 +22,8 @@ var ValidTopicSuffixMethods = []string{ "tags", } +var zeroTime = time.Unix(0, 0) + type ( Kafka struct { Brokers []string @@ -344,9 +347,13 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error { } m := &sarama.ProducerMessage{ - Topic: k.GetTopicName(metric), - Value: sarama.ByteEncoder(buf), - Timestamp: metric.Time(), + Topic: k.GetTopicName(metric), + Value: sarama.ByteEncoder(buf), + } + + // Negative timestamps are not allowed by the Kafka protocol. + if !metric.Time().Before(zeroTime) { + m.Timestamp = metric.Time() } key, err := k.routingKey(metric)