Allow CR and FF inside of string fields and fix parser panic (#7427)

This commit is contained in:
Daniel Nelson 2020-04-28 14:42:37 -07:00 committed by GitHub
parent 476a899a1a
commit cf3d48bb68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 16653 additions and 16901 deletions

File diff suppressed because it is too large Load Diff

View File

@ -5,6 +5,14 @@ import (
"io" "io"
) )
type readErr struct {
Err error
}
func (e *readErr) Error() string {
return e.Err.Error()
}
var ( var (
ErrNameParse = errors.New("expected measurement name") ErrNameParse = errors.New("expected measurement name")
ErrFieldParse = errors.New("expected field") ErrFieldParse = errors.New("expected field")
@ -220,7 +228,7 @@ fieldbool =
(true | false) >begin %bool; (true | false) >begin %bool;
fieldstringchar = fieldstringchar =
[^\f\r\n\\"] | '\\' [\\"] | newline; [^\n\\"] | '\\' [\\"] | newline;
fieldstring = fieldstring =
fieldstringchar* >begin %string; fieldstringchar* >begin %string;
@ -502,7 +510,12 @@ func (m *streamMachine) Next() error {
if n == 0 && err == io.EOF { if n == 0 && err == io.EOF {
m.machine.eof = m.machine.pe m.machine.eof = m.machine.pe
} else if err != nil && err != io.EOF { } else if err != nil && err != io.EOF {
return err // After the reader returns an error this function shouldn't be
// called again. This will cause the machine to return EOF this
// is done.
m.machine.p = m.machine.pe
m.machine.eof = m.machine.pe
return &readErr{Err: err}
} }
m.machine.pe += n m.machine.pe += n

View File

@ -873,6 +873,27 @@ var tests = []struct {
}, },
}, },
}, },
{
name: "cr in string field",
input: []byte("cpu value=\"4\r2\""),
results: []Result{
{
Name: Measurement,
Value: []byte("cpu"),
},
{
Name: FieldKey,
Value: []byte("value"),
},
{
Name: FieldString,
Value: []byte("4\r2"),
},
{
Name: Success,
},
},
},
{ {
name: "bool field", name: "bool field",
input: []byte("cpu value=true"), input: []byte("cpu value=true"),

View File

@ -174,11 +174,15 @@ func (h *StreamParser) SetTimePrecision(u time.Duration) {
} }
// Next parses the next item from the stream. You can repeat calls to this // Next parses the next item from the stream. You can repeat calls to this
// function until it returns EOF. // function if it returns ParseError to get the next metric or error.
func (p *StreamParser) Next() (telegraf.Metric, error) { func (p *StreamParser) Next() (telegraf.Metric, error) {
err := p.machine.Next() err := p.machine.Next()
if err == EOF { if err == EOF {
return nil, EOF return nil, err
}
if e, ok := err.(*readErr); ok {
return nil, e.Err
} }
if err != nil { if err != nil {

View File

@ -2,6 +2,7 @@ package influx
import ( import (
"bytes" "bytes"
"errors"
"strconv" "strconv"
"strings" "strings"
"testing" "testing"
@ -869,3 +870,28 @@ func TestStreamParserErrorString(t *testing.T) {
}) })
} }
} }
type MockReader struct {
ReadF func(p []byte) (int, error)
}
func (r *MockReader) Read(p []byte) (int, error) {
return r.ReadF(p)
}
// Errors from the Reader are returned from the Parser
func TestStreamParserReaderError(t *testing.T) {
readerErr := errors.New("error but not eof")
parser := NewStreamParser(&MockReader{
ReadF: func(p []byte) (int, error) {
return 0, readerErr
},
})
_, err := parser.Next()
require.Error(t, err)
require.Equal(t, err, readerErr)
_, err = parser.Next()
require.Equal(t, err, EOF)
}