From 74b330922527510c209645474b7ac664c1b2b9d9 Mon Sep 17 00:00:00 2001 From: Eugene Dementiev Date: Wed, 23 Sep 2015 20:02:34 +0300 Subject: [PATCH] Add timestamps to points in Kafka/AMQP outputs --- outputs/amqp/amqp.go | 9 +++++++++ outputs/kafka/kafka.go | 9 +++++++++ 2 files changed, 18 insertions(+) diff --git a/outputs/amqp/amqp.go b/outputs/amqp/amqp.go index 070793fdb..9ca2783f1 100644 --- a/outputs/amqp/amqp.go +++ b/outputs/amqp/amqp.go @@ -2,6 +2,7 @@ package amqp import ( "fmt" + "time" "github.com/influxdb/influxdb/client" "github.com/influxdb/telegraf/outputs" @@ -72,6 +73,7 @@ func (q *AMQP) Write(bp client.BatchPoints) error { return nil } + var zero_time time.Time for _, p := range bp.Points { // Combine tags from Point and BatchPoints and grab the resulting // line-protocol output string to write to AMQP @@ -85,6 +87,13 @@ func (q *AMQP) Write(bp client.BatchPoints) error { } p.Tags[k] = v } + if p.Time == zero_time { + if bp.Time == zero_time { + p.Time = time.Now() + } else { + p.Time = bp.Time + } + } value = p.MarshalString() } diff --git a/outputs/kafka/kafka.go b/outputs/kafka/kafka.go index 370f1ba95..ce9946ba3 100644 --- a/outputs/kafka/kafka.go +++ b/outputs/kafka/kafka.go @@ -3,6 +3,7 @@ package kafka import ( "errors" "fmt" + "time" "github.com/Shopify/sarama" "github.com/influxdb/influxdb/client" @@ -56,6 +57,7 @@ func (k *Kafka) Write(bp client.BatchPoints) error { return nil } + var zero_time time.Time for _, p := range bp.Points { // Combine tags from Point and BatchPoints and grab the resulting // line-protocol output string to write to Kafka @@ -69,6 +71,13 @@ func (k *Kafka) Write(bp client.BatchPoints) error { } p.Tags[k] = v } + if p.Time == zero_time { + if bp.Time == zero_time { + p.Time = time.Now() + } else { + p.Time = bp.Time + } + } value = p.MarshalString() }