fix issue with stream parser blocking when data is in buffer (#7631)

This commit is contained in:
Steven Soroka 2020-06-05 18:30:25 -04:00 committed by GitHub
parent 741ea839d2
commit 7ba226a00b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 37 additions and 20 deletions

View File

@ -31678,6 +31678,16 @@ func (m *streamMachine) Next() error {
m.machine.data = expanded m.machine.data = expanded
} }
err := m.machine.exec()
if err != nil {
return err
}
// If we have successfully parsed a full metric line break out
if m.machine.finishMetric {
break
}
n, err := m.reader.Read(m.machine.data[m.machine.pe:]) n, err := m.reader.Read(m.machine.data[m.machine.pe:])
if n == 0 && err == io.EOF { if n == 0 && err == io.EOF {
m.machine.eof = m.machine.pe m.machine.eof = m.machine.pe
@ -31692,16 +31702,6 @@ func (m *streamMachine) Next() error {
m.machine.pe += n m.machine.pe += n
err = m.machine.exec()
if err != nil {
return err
}
// If we have successfully parsed a full metric line break out
if m.machine.finishMetric {
break
}
} }
return nil return nil

View File

@ -506,6 +506,16 @@ func (m *streamMachine) Next() error {
m.machine.data = expanded m.machine.data = expanded
} }
err := m.machine.exec()
if err != nil {
return err
}
// If we have successfully parsed a full metric line break out
if m.machine.finishMetric {
break
}
n, err := m.reader.Read(m.machine.data[m.machine.pe:]) n, err := m.reader.Read(m.machine.data[m.machine.pe:])
if n == 0 && err == io.EOF { if n == 0 && err == io.EOF {
m.machine.eof = m.machine.pe m.machine.eof = m.machine.pe
@ -520,16 +530,6 @@ func (m *streamMachine) Next() error {
m.machine.pe += n m.machine.pe += n
err = m.machine.exec()
if err != nil {
return err
}
// If we have successfully parsed a full metric line break out
if m.machine.finishMetric {
break
}
} }
return nil return nil

View File

@ -3,6 +3,7 @@ package influx
import ( import (
"bytes" "bytes"
"errors" "errors"
"io"
"strconv" "strconv"
"strings" "strings"
"testing" "testing"
@ -895,3 +896,19 @@ func TestStreamParserReaderError(t *testing.T) {
_, err = parser.Next() _, err = parser.Next()
require.Equal(t, err, EOF) require.Equal(t, err, EOF)
} }
func TestStreamParserProducesAllAvailableMetrics(t *testing.T) {
r, w := io.Pipe()
parser := NewStreamParser(r)
parser.SetTimeFunc(DefaultTime)
go w.Write([]byte("metric value=1\nmetric2 value=1\n"))
_, err := parser.Next()
require.NoError(t, err)
// should not block on second read
_, err = parser.Next()
require.NoError(t, err)
}