Deliver empty metric tracking group immediately (#5176)

This commit is contained in:
Daniel Nelson 2018-12-26 19:36:10 -08:00 committed by GitHub
parent 72089042be
commit 7497a2027b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 11 deletions

View File

@ -9,7 +9,6 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -127,6 +126,21 @@ func TestSetPrecision(t *testing.T) {
} }
} }
func TestAddTrackingMetricGroupEmpty(t *testing.T) {
ch := make(chan telegraf.Metric, 10)
metrics := []telegraf.Metric{}
acc := NewAccumulator(&TestMetricMaker{}, ch).WithTracking(1)
id := acc.AddTrackingMetricGroup(metrics)
select {
case tracking := <-acc.Delivered():
require.Equal(t, tracking.ID(), id)
default:
t.Fatal("empty group should be delivered immediately")
}
}
type TestMetricMaker struct { type TestMetricMaker struct {
} }

View File

@ -50,7 +50,7 @@ type trackingData struct {
rc int32 rc int32
acceptCount int32 acceptCount int32
rejectCount int32 rejectCount int32
notify NotifyFunc notifyFunc NotifyFunc
} }
func (d *trackingData) incr() { func (d *trackingData) incr() {
@ -69,6 +69,16 @@ func (d *trackingData) reject() {
atomic.AddInt32(&d.rejectCount, 1) atomic.AddInt32(&d.rejectCount, 1)
} }
func (d *trackingData) notify() {
d.notifyFunc(
&deliveryInfo{
id: d.id,
accepted: int(d.acceptCount),
rejected: int(d.rejectCount),
},
)
}
type trackingMetric struct { type trackingMetric struct {
telegraf.Metric telegraf.Metric
d *trackingData d *trackingData
@ -82,7 +92,7 @@ func newTrackingMetric(metric telegraf.Metric, fn NotifyFunc) (telegraf.Metric,
rc: 1, rc: 1,
acceptCount: 0, acceptCount: 0,
rejectCount: 0, rejectCount: 0,
notify: fn, notifyFunc: fn,
}, },
} }
@ -98,7 +108,7 @@ func newTrackingMetricGroup(group []telegraf.Metric, fn NotifyFunc) ([]telegraf.
rc: 0, rc: 0,
acceptCount: 0, acceptCount: 0,
rejectCount: 0, rejectCount: 0,
notify: fn, notifyFunc: fn,
} }
for i, m := range group { for i, m := range group {
@ -114,6 +124,10 @@ func newTrackingMetricGroup(group []telegraf.Metric, fn NotifyFunc) ([]telegraf.
runtime.SetFinalizer(d, finalizer) runtime.SetFinalizer(d, finalizer)
} }
if len(group) == 0 {
d.notify()
}
return group, d.id return group, d.id
} }
@ -146,13 +160,7 @@ func (m *trackingMetric) decr() {
} }
if v == 0 { if v == 0 {
m.d.notify( m.d.notify()
&deliveryInfo{
id: m.d.id,
accepted: int(m.d.acceptCount),
rejected: int(m.d.rejectCount),
},
)
} }
} }