From 7497a2027baf59ff1ee3655745f141fbc4f559ba Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 26 Dec 2018 19:36:10 -0800 Subject: [PATCH] Deliver empty metric tracking group immediately (#5176) --- agent/accumulator_test.go | 16 +++++++++++++++- metric/tracking.go | 28 ++++++++++++++++++---------- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/agent/accumulator_test.go b/agent/accumulator_test.go index 2bb08920f..316ad124b 100644 --- a/agent/accumulator_test.go +++ b/agent/accumulator_test.go @@ -9,7 +9,6 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -127,6 +126,21 @@ func TestSetPrecision(t *testing.T) { } } +func TestAddTrackingMetricGroupEmpty(t *testing.T) { + ch := make(chan telegraf.Metric, 10) + metrics := []telegraf.Metric{} + acc := NewAccumulator(&TestMetricMaker{}, ch).WithTracking(1) + + id := acc.AddTrackingMetricGroup(metrics) + + select { + case tracking := <-acc.Delivered(): + require.Equal(t, tracking.ID(), id) + default: + t.Fatal("empty group should be delivered immediately") + } +} + type TestMetricMaker struct { } diff --git a/metric/tracking.go b/metric/tracking.go index 83c3c7aec..3d8843240 100644 --- a/metric/tracking.go +++ b/metric/tracking.go @@ -50,7 +50,7 @@ type trackingData struct { rc int32 acceptCount int32 rejectCount int32 - notify NotifyFunc + notifyFunc NotifyFunc } func (d *trackingData) incr() { @@ -69,6 +69,16 @@ func (d *trackingData) reject() { atomic.AddInt32(&d.rejectCount, 1) } +func (d *trackingData) notify() { + d.notifyFunc( + &deliveryInfo{ + id: d.id, + accepted: int(d.acceptCount), + rejected: int(d.rejectCount), + }, + ) +} + type trackingMetric struct { telegraf.Metric d *trackingData @@ -82,7 +92,7 @@ func newTrackingMetric(metric telegraf.Metric, fn NotifyFunc) (telegraf.Metric, rc: 1, acceptCount: 0, rejectCount: 0, - notify: fn, + notifyFunc: fn, }, } @@ -98,7 +108,7 @@ func newTrackingMetricGroup(group []telegraf.Metric, fn NotifyFunc) ([]telegraf. rc: 0, acceptCount: 0, rejectCount: 0, - notify: fn, + notifyFunc: fn, } for i, m := range group { @@ -114,6 +124,10 @@ func newTrackingMetricGroup(group []telegraf.Metric, fn NotifyFunc) ([]telegraf. runtime.SetFinalizer(d, finalizer) } + if len(group) == 0 { + d.notify() + } + return group, d.id } @@ -146,13 +160,7 @@ func (m *trackingMetric) decr() { } if v == 0 { - m.d.notify( - &deliveryInfo{ - id: m.d.id, - accepted: int(m.d.acceptCount), - rejected: int(m.d.rejectCount), - }, - ) + m.d.notify() } }