Add precision rounding to accumulator

Adding precision rounding to the accumulator. This means that now every
input metric will get rounded at collection, rather than at write (and
only for the influxdb output).

This feature is disabled for service inputs, because service inputs
should be in control of their own timestamps & precisions.
This commit is contained in:
Cameron Sparr
2016-06-13 15:21:11 +01:00
parent 4d242836ee
commit d7efb7a71d
13 changed files with 213 additions and 40 deletions

View File

@@ -10,6 +10,7 @@ import (
"io"
"math"
"mime"
"time"
"github.com/influxdata/telegraf"
@@ -88,7 +89,13 @@ func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) {
}
// converting to telegraf metric
if len(fields) > 0 {
metric, err := telegraf.NewMetric(metricName, tags, fields)
var t time.Time
if m.TimestampMs != nil && *m.TimestampMs > 0 {
t = time.Unix(0, *m.TimestampMs*1000000)
} else {
t = time.Now()
}
metric, err := telegraf.NewMetric(metricName, tags, fields, t)
if err == nil {
metrics = append(metrics, metric)
}

View File

@@ -24,7 +24,6 @@ type InfluxDB struct {
Password string
Database string
UserAgent string
Precision string
RetentionPolicy string
WriteConsistency string
Timeout internal.Duration
@@ -39,6 +38,9 @@ type InfluxDB struct {
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
// Precision is only here for legacy support. It will be ignored.
Precision string
conns []client.Client
}
@@ -50,9 +52,6 @@ var sampleConfig = `
urls = ["http://localhost:8086"] # required
## The target database for metrics (telegraf will create it if not exists).
database = "telegraf" # required
## Precision of writes, valid values are "ns", "us" (or "µs"), "ms", "s", "m", "h".
## note: using "s" precision greatly improves InfluxDB compression.
precision = "s"
## Retention policy to write to.
retention_policy = "default"
@@ -184,7 +183,6 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
}
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: i.Database,
Precision: i.Precision,
RetentionPolicy: i.RetentionPolicy,
WriteConsistency: i.WriteConsistency,
})

View File

@@ -17,6 +17,7 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
now := time.Now()
pTesting = &PrometheusClient{Listen: "localhost:9127"}
err := pTesting.Start()
time.Sleep(time.Millisecond * 200)
@@ -30,11 +31,13 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) {
pt1, _ := telegraf.NewMetric(
"test_point_1",
tags,
map[string]interface{}{"value": 0.0})
map[string]interface{}{"value": 0.0},
now)
pt2, _ := telegraf.NewMetric(
"test_point_2",
tags,
map[string]interface{}{"value": 1.0})
map[string]interface{}{"value": 1.0},
now)
var metrics = []telegraf.Metric{
pt1,
pt2,
@@ -63,11 +66,13 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) {
pt3, _ := telegraf.NewMetric(
"test_point_3",
tags,
map[string]interface{}{"value": 0.0})
map[string]interface{}{"value": 0.0},
now)
pt4, _ := telegraf.NewMetric(
"test_point_4",
tags,
map[string]interface{}{"value": 1.0})
map[string]interface{}{"value": 1.0},
now)
metrics = []telegraf.Metric{
pt3,
pt4,