diff --git a/CHANGELOG.md b/CHANGELOG.md index af9444b81..25e5b3daa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Features - [#1340](https://github.com/influxdata/telegraf/issues/1340): statsd: do not log every dropped metric. +- [#1368](https://github.com/influxdata/telegraf/pull/1368): Add precision rounding to all metrics on collection. ### Bugfixes diff --git a/accumulator.go b/accumulator.go index cbea58ebf..15c5485f8 100644 --- a/accumulator.go +++ b/accumulator.go @@ -18,4 +18,8 @@ type Accumulator interface { Debug() bool SetDebug(enabled bool) + + SetPrecision(precision, interval time.Duration) + + DisablePrecision() } diff --git a/agent/accumulator.go b/agent/accumulator.go index d6ff8de60..504731720 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -17,6 +17,7 @@ func NewAccumulator( acc := accumulator{} acc.metrics = metrics acc.inputConfig = inputConfig + acc.precision = time.Nanosecond return &acc } @@ -32,6 +33,8 @@ type accumulator struct { inputConfig *internal_models.InputConfig prefix string + + precision time.Duration } func (ac *accumulator) Add( @@ -141,6 +144,7 @@ func (ac *accumulator) AddFields( } else { timestamp = time.Now() } + timestamp = timestamp.Round(ac.precision) if ac.prefix != "" { measurement = ac.prefix + measurement @@ -173,6 +177,31 @@ func (ac *accumulator) SetTrace(trace bool) { ac.trace = trace } +// SetPrecision takes two time.Duration objects. If the first is non-zero, +// it sets that as the precision. Otherwise, it takes the second argument +// as the order of time that the metrics should be rounded to, with the +// maximum being 1s. +func (ac *accumulator) SetPrecision(precision, interval time.Duration) { + if precision > 0 { + ac.precision = precision + return + } + switch { + case interval >= time.Second: + ac.precision = time.Second + case interval >= time.Millisecond: + ac.precision = time.Millisecond + case interval >= time.Microsecond: + ac.precision = time.Microsecond + default: + ac.precision = time.Nanosecond + } +} + +func (ac *accumulator) DisablePrecision() { + ac.precision = time.Nanosecond +} + func (ac *accumulator) setDefaultTags(tags map[string]string) { ac.defaultTags = tags } diff --git a/agent/accumulator_test.go b/agent/accumulator_test.go index ee8f65e48..9bf681192 100644 --- a/agent/accumulator_test.go +++ b/agent/accumulator_test.go @@ -38,6 +38,128 @@ func TestAdd(t *testing.T) { actual) } +func TestAddNoPrecisionWithInterval(t *testing.T) { + a := accumulator{} + now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC) + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + a.SetPrecision(0, time.Second) + a.Add("acctest", float64(101), map[string]string{}) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Contains(t, actual, "acctest,acc=test value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800000000000)), + actual) +} + +func TestAddNoIntervalWithPrecision(t *testing.T) { + a := accumulator{} + now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC) + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + a.SetPrecision(time.Second, time.Millisecond) + a.Add("acctest", float64(101), map[string]string{}) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Contains(t, actual, "acctest,acc=test value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800000000000)), + actual) +} + +func TestAddDisablePrecision(t *testing.T) { + a := accumulator{} + now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC) + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + a.SetPrecision(time.Second, time.Millisecond) + a.DisablePrecision() + a.Add("acctest", float64(101), map[string]string{}) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Contains(t, actual, "acctest,acc=test value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800082912748)), + actual) +} + +func TestDifferentPrecisions(t *testing.T) { + a := accumulator{} + now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC) + a.metrics = make(chan telegraf.Metric, 10) + defer close(a.metrics) + a.inputConfig = &internal_models.InputConfig{} + + a.SetPrecision(0, time.Second) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now) + testm := <-a.metrics + actual := testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800000000000)), + actual) + + a.SetPrecision(0, time.Millisecond) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now) + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800083000000)), + actual) + + a.SetPrecision(0, time.Microsecond) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now) + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800082913000)), + actual) + + a.SetPrecision(0, time.Nanosecond) + a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now) + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800082912748)), + actual) +} + func TestAddDefaultTags(t *testing.T) { a := accumulator{} a.addDefaultTag("default", "tag") diff --git a/agent/agent.go b/agent/agent.go index 1423ef773..d1d36186e 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -118,6 +118,8 @@ func (a *Agent) gatherer( acc := NewAccumulator(input.Config, metricC) acc.SetDebug(a.Config.Agent.Debug) + acc.SetPrecision(a.Config.Agent.Precision.Duration, + a.Config.Agent.Interval.Duration) acc.setDefaultTags(a.Config.Tags) internal.RandomSleep(a.Config.Agent.CollectionJitter.Duration, shutdown) @@ -201,6 +203,8 @@ func (a *Agent) Test() error { for _, input := range a.Config.Inputs { acc := NewAccumulator(input.Config, metricC) acc.SetTrace(true) + acc.SetPrecision(a.Config.Agent.Precision.Duration, + a.Config.Agent.Interval.Duration) acc.setDefaultTags(a.Config.Tags) fmt.Printf("* Plugin: %s, Collection 1\n", input.Name) @@ -289,6 +293,9 @@ func (a *Agent) Run(shutdown chan struct{}) error { case telegraf.ServiceInput: acc := NewAccumulator(input.Config, metricC) acc.SetDebug(a.Config.Agent.Debug) + // Service input plugins should set their own precision of their + // metrics. + acc.DisablePrecision() acc.setDefaultTags(a.Config.Tags) if err := p.Start(acc); err != nil { log.Printf("Service for input %s failed to start, exiting\n%s\n", diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 251925589..8192bd12e 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -52,6 +52,11 @@ ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s flush_jitter = "0s" + ## By default, precision will be set to the same timestamp order as the + ## collection interval, with the maximum being 1s. + ## Precision will NOT be used for service inputs, such as logparser and statsd. + ## Valid values are "Nns", "Nus" (or "Nµs"), "Nms", "Ns". + precision = "" ## Run telegraf in debug mode debug = false ## Run telegraf in quiet mode @@ -75,9 +80,6 @@ urls = ["http://localhost:8086"] # required ## The target database for metrics (telegraf will create it if not exists). database = "telegraf" # required - ## Precision of writes, valid values are "ns", "us" (or "µs"), "ms", "s", "m", "h". - ## note: using "s" precision greatly improves InfluxDB compression. - precision = "s" ## Retention policy to write to. retention_policy = "default" diff --git a/internal/config/config.go b/internal/config/config.go index fdc9a8753..99db2e30d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -77,6 +77,14 @@ type AgentConfig struct { // ie, if Interval=10s then always collect on :00, :10, :20, etc. RoundInterval bool + // By default, precision will be set to the same timestamp order as the + // collection interval, with the maximum being 1s. + // ie, when interval = "10s", precision will be "1s" + // when interval = "250ms", precision will be "1ms" + // Precision will NOT be used for service inputs. It is up to each individual + // service input to set the timestamp at the appropriate precision. + Precision internal.Duration + // CollectionJitter is used to jitter the collection by a random amount. // Each plugin will sleep for a random time within jitter before collecting. // This can be used to avoid many plugins querying things like sysfs at the @@ -108,11 +116,10 @@ type AgentConfig struct { // does _not_ deactivate FlushInterval. FlushBufferWhenFull bool - // TODO(cam): Remove UTC and Precision parameters, they are no longer + // TODO(cam): Remove UTC and parameter, they are no longer // valid for the agent config. Leaving them here for now for backwards- // compatability - UTC bool `toml:"utc"` - Precision string + UTC bool `toml:"utc"` // Debug is the option for running in debug mode Debug bool @@ -209,6 +216,11 @@ var header = `# Telegraf Configuration ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s flush_jitter = "0s" + ## By default, precision will be set to the same timestamp order as the + ## collection interval, with the maximum being 1s. + ## Precision will NOT be used for service inputs, such as logparser and statsd. + ## Valid values are "Nns", "Nus" (or "Nµs"), "Nms", "Ns". + precision = "" ## Run telegraf in debug mode debug = false ## Run telegraf in quiet mode diff --git a/metric.go b/metric.go index 574565c22..0d186784a 100644 --- a/metric.go +++ b/metric.go @@ -45,14 +45,9 @@ func NewMetric( name string, tags map[string]string, fields map[string]interface{}, - t ...time.Time, + t time.Time, ) (Metric, error) { - var T time.Time - if len(t) > 0 { - T = t[0] - } - - pt, err := client.NewPoint(name, tags, fields, T) + pt, err := client.NewPoint(name, tags, fields, t) if err != nil { return nil, err } diff --git a/metric_test.go b/metric_test.go index 1177ab494..4182c9cc1 100644 --- a/metric_test.go +++ b/metric_test.go @@ -51,23 +51,6 @@ func TestNewMetricString(t *testing.T) { assert.Equal(t, lineProtoPrecision, m.PrecisionString("s")) } -func TestNewMetricStringNoTime(t *testing.T) { - tags := map[string]string{ - "host": "localhost", - } - fields := map[string]interface{}{ - "usage_idle": float64(99), - } - m, err := NewMetric("cpu", tags, fields) - assert.NoError(t, err) - - lineProto := fmt.Sprintf("cpu,host=localhost usage_idle=99") - assert.Equal(t, lineProto, m.String()) - - lineProtoPrecision := fmt.Sprintf("cpu,host=localhost usage_idle=99") - assert.Equal(t, lineProtoPrecision, m.PrecisionString("s")) -} - func TestNewMetricFailNaN(t *testing.T) { now := time.Now() diff --git a/plugins/inputs/prometheus/parser.go b/plugins/inputs/prometheus/parser.go index babd25753..e8a7c0892 100644 --- a/plugins/inputs/prometheus/parser.go +++ b/plugins/inputs/prometheus/parser.go @@ -10,6 +10,7 @@ import ( "io" "math" "mime" + "time" "github.com/influxdata/telegraf" @@ -88,7 +89,13 @@ func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) { } // converting to telegraf metric if len(fields) > 0 { - metric, err := telegraf.NewMetric(metricName, tags, fields) + var t time.Time + if m.TimestampMs != nil && *m.TimestampMs > 0 { + t = time.Unix(0, *m.TimestampMs*1000000) + } else { + t = time.Now() + } + metric, err := telegraf.NewMetric(metricName, tags, fields, t) if err == nil { metrics = append(metrics, metric) } diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index f359b8fab..d2c0523c7 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -24,7 +24,6 @@ type InfluxDB struct { Password string Database string UserAgent string - Precision string RetentionPolicy string WriteConsistency string Timeout internal.Duration @@ -39,6 +38,9 @@ type InfluxDB struct { // Use SSL but skip chain & host verification InsecureSkipVerify bool + // Precision is only here for legacy support. It will be ignored. + Precision string + conns []client.Client } @@ -50,9 +52,6 @@ var sampleConfig = ` urls = ["http://localhost:8086"] # required ## The target database for metrics (telegraf will create it if not exists). database = "telegraf" # required - ## Precision of writes, valid values are "ns", "us" (or "µs"), "ms", "s", "m", "h". - ## note: using "s" precision greatly improves InfluxDB compression. - precision = "s" ## Retention policy to write to. retention_policy = "default" @@ -184,7 +183,6 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { } bp, err := client.NewBatchPoints(client.BatchPointsConfig{ Database: i.Database, - Precision: i.Precision, RetentionPolicy: i.RetentionPolicy, WriteConsistency: i.WriteConsistency, }) diff --git a/plugins/outputs/prometheus_client/prometheus_client_test.go b/plugins/outputs/prometheus_client/prometheus_client_test.go index 15ed7b7e4..14aee13d9 100644 --- a/plugins/outputs/prometheus_client/prometheus_client_test.go +++ b/plugins/outputs/prometheus_client/prometheus_client_test.go @@ -17,6 +17,7 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } + now := time.Now() pTesting = &PrometheusClient{Listen: "localhost:9127"} err := pTesting.Start() time.Sleep(time.Millisecond * 200) @@ -30,11 +31,13 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) { pt1, _ := telegraf.NewMetric( "test_point_1", tags, - map[string]interface{}{"value": 0.0}) + map[string]interface{}{"value": 0.0}, + now) pt2, _ := telegraf.NewMetric( "test_point_2", tags, - map[string]interface{}{"value": 1.0}) + map[string]interface{}{"value": 1.0}, + now) var metrics = []telegraf.Metric{ pt1, pt2, @@ -63,11 +66,13 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) { pt3, _ := telegraf.NewMetric( "test_point_3", tags, - map[string]interface{}{"value": 0.0}) + map[string]interface{}{"value": 0.0}, + now) pt4, _ := telegraf.NewMetric( "test_point_4", tags, - map[string]interface{}{"value": 1.0}) + map[string]interface{}{"value": 1.0}, + now) metrics = []telegraf.Metric{ pt3, pt4, diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 9b6fb2373..1058faf83 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -84,6 +84,14 @@ func (a *Accumulator) AddFields( a.Metrics = append(a.Metrics, p) } +func (a *Accumulator) SetPrecision(precision, interval time.Duration) { + return +} + +func (a *Accumulator) DisablePrecision() { + return +} + func (a *Accumulator) Debug() bool { // stub for implementing Accumulator interface. return a.debug