Fix panic when rejecting empty batch (#5524)

This commit is contained in:
Daniel Nelson 2019-03-04 12:36:19 -08:00 committed by GitHub
parent c0e0da7ef6
commit 0b5811e193
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 1 deletions

View File

@ -182,6 +182,10 @@ func (b *Buffer) Reject(batch []telegraf.Metric) {
b.Lock() b.Lock()
defer b.Unlock() defer b.Unlock()
if len(batch) == 0 {
return
}
older := b.dist(b.first, b.batchFirst) older := b.dist(b.first, b.batchFirst)
free := b.cap - b.size free := b.cap - b.size
restore := min(len(batch), free+older) restore := min(len(batch), free+older)
@ -191,7 +195,8 @@ func (b *Buffer) Reject(batch []telegraf.Metric) {
rp := b.last rp := b.last
re := b.nextby(rp, restore) re := b.nextby(rp, restore)
b.last = re b.last = re
for rb != rp {
for rb != rp && rp != re {
rp = b.prev(rp) rp = b.prev(rp)
re = b.prev(re) re = b.prev(re)

View File

@ -714,3 +714,15 @@ func TestBuffer_AddOverwriteAndRejectOffset(t *testing.T) {
require.Equal(t, 13, reject) require.Equal(t, 13, reject)
require.Equal(t, 5, accept) require.Equal(t, 5, accept)
} }
func TestBuffer_RejectEmptyBatch(t *testing.T) {
b := setup(NewBuffer("test", 5))
batch := b.Batch(2)
b.Add(MetricTime(1))
b.Reject(batch)
b.Add(MetricTime(2))
batch = b.Batch(2)
for _, m := range batch {
require.NotNil(t, m)
}
}