Add timestamps to points in Kafka/AMQP outputs

This commit is contained in:
Eugene Dementiev 2015-09-23 20:02:34 +03:00 committed by Cameron Sparr
parent 1d741cbfc5
commit 74b3309225
2 changed files with 18 additions and 0 deletions

View File

@ -2,6 +2,7 @@ package amqp
import ( import (
"fmt" "fmt"
"time"
"github.com/influxdb/influxdb/client" "github.com/influxdb/influxdb/client"
"github.com/influxdb/telegraf/outputs" "github.com/influxdb/telegraf/outputs"
@ -72,6 +73,7 @@ func (q *AMQP) Write(bp client.BatchPoints) error {
return nil return nil
} }
var zero_time time.Time
for _, p := range bp.Points { for _, p := range bp.Points {
// Combine tags from Point and BatchPoints and grab the resulting // Combine tags from Point and BatchPoints and grab the resulting
// line-protocol output string to write to AMQP // line-protocol output string to write to AMQP
@ -85,6 +87,13 @@ func (q *AMQP) Write(bp client.BatchPoints) error {
} }
p.Tags[k] = v p.Tags[k] = v
} }
if p.Time == zero_time {
if bp.Time == zero_time {
p.Time = time.Now()
} else {
p.Time = bp.Time
}
}
value = p.MarshalString() value = p.MarshalString()
} }

View File

@ -3,6 +3,7 @@ package kafka
import ( import (
"errors" "errors"
"fmt" "fmt"
"time"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/influxdb/influxdb/client" "github.com/influxdb/influxdb/client"
@ -56,6 +57,7 @@ func (k *Kafka) Write(bp client.BatchPoints) error {
return nil return nil
} }
var zero_time time.Time
for _, p := range bp.Points { for _, p := range bp.Points {
// Combine tags from Point and BatchPoints and grab the resulting // Combine tags from Point and BatchPoints and grab the resulting
// line-protocol output string to write to Kafka // line-protocol output string to write to Kafka
@ -69,6 +71,13 @@ func (k *Kafka) Write(bp client.BatchPoints) error {
} }
p.Tags[k] = v p.Tags[k] = v
} }
if p.Time == zero_time {
if bp.Time == zero_time {
p.Time = time.Now()
} else {
p.Time = bp.Time
}
}
value = p.MarshalString() value = p.MarshalString()
} }