From 7ba226a00b11fa78bb7201cc723f1bd7c6c0c435 Mon Sep 17 00:00:00 2001 From: Steven Soroka Date: Fri, 5 Jun 2020 18:30:25 -0400 Subject: [PATCH] fix issue with stream parser blocking when data is in buffer (#7631) --- plugins/parsers/influx/machine.go | 20 ++++++++++---------- plugins/parsers/influx/machine.go.rl | 20 ++++++++++---------- plugins/parsers/influx/parser_test.go | 17 +++++++++++++++++ 3 files changed, 37 insertions(+), 20 deletions(-) diff --git a/plugins/parsers/influx/machine.go b/plugins/parsers/influx/machine.go index 59bd232dd..332b73592 100644 --- a/plugins/parsers/influx/machine.go +++ b/plugins/parsers/influx/machine.go @@ -31678,6 +31678,16 @@ func (m *streamMachine) Next() error { m.machine.data = expanded } + err := m.machine.exec() + if err != nil { + return err + } + + // If we have successfully parsed a full metric line break out + if m.machine.finishMetric { + break + } + n, err := m.reader.Read(m.machine.data[m.machine.pe:]) if n == 0 && err == io.EOF { m.machine.eof = m.machine.pe @@ -31692,16 +31702,6 @@ func (m *streamMachine) Next() error { m.machine.pe += n - err = m.machine.exec() - if err != nil { - return err - } - - // If we have successfully parsed a full metric line break out - if m.machine.finishMetric { - break - } - } return nil diff --git a/plugins/parsers/influx/machine.go.rl b/plugins/parsers/influx/machine.go.rl index 61f49c652..f8f40cd7c 100644 --- a/plugins/parsers/influx/machine.go.rl +++ b/plugins/parsers/influx/machine.go.rl @@ -506,6 +506,16 @@ func (m *streamMachine) Next() error { m.machine.data = expanded } + err := m.machine.exec() + if err != nil { + return err + } + + // If we have successfully parsed a full metric line break out + if m.machine.finishMetric { + break + } + n, err := m.reader.Read(m.machine.data[m.machine.pe:]) if n == 0 && err == io.EOF { m.machine.eof = m.machine.pe @@ -520,16 +530,6 @@ func (m *streamMachine) Next() error { m.machine.pe += n - err = m.machine.exec() - if err != nil { - return err - } - - // If we have successfully parsed a full metric line break out - if m.machine.finishMetric { - break - } - } return nil diff --git a/plugins/parsers/influx/parser_test.go b/plugins/parsers/influx/parser_test.go index 368ad277d..569eb3a22 100644 --- a/plugins/parsers/influx/parser_test.go +++ b/plugins/parsers/influx/parser_test.go @@ -3,6 +3,7 @@ package influx import ( "bytes" "errors" + "io" "strconv" "strings" "testing" @@ -895,3 +896,19 @@ func TestStreamParserReaderError(t *testing.T) { _, err = parser.Next() require.Equal(t, err, EOF) } + +func TestStreamParserProducesAllAvailableMetrics(t *testing.T) { + r, w := io.Pipe() + + parser := NewStreamParser(r) + parser.SetTimeFunc(DefaultTime) + + go w.Write([]byte("metric value=1\nmetric2 value=1\n")) + + _, err := parser.Next() + require.NoError(t, err) + + // should not block on second read + _, err = parser.Next() + require.NoError(t, err) +}