Fix several influx parser issues (#5484)
- Add line/column position - Allow handlers to return errors - Fix tag value escaping - Allow newline in string fields
This commit is contained in:
@@ -10,6 +10,7 @@ var (
|
||||
ErrTagParse = errors.New("expected tag")
|
||||
ErrTimestampParse = errors.New("expected timestamp")
|
||||
ErrParse = errors.New("parse error")
|
||||
EOF = errors.New("EOF")
|
||||
)
|
||||
|
||||
%%{
|
||||
@@ -19,58 +20,67 @@ action begin {
|
||||
m.pb = m.p
|
||||
}
|
||||
|
||||
action yield {
|
||||
yield = true
|
||||
fnext align;
|
||||
fbreak;
|
||||
}
|
||||
|
||||
action name_error {
|
||||
m.err = ErrNameParse
|
||||
err = ErrNameParse
|
||||
fhold;
|
||||
fnext discard_line;
|
||||
fbreak;
|
||||
}
|
||||
|
||||
action field_error {
|
||||
m.err = ErrFieldParse
|
||||
err = ErrFieldParse
|
||||
fhold;
|
||||
fnext discard_line;
|
||||
fbreak;
|
||||
}
|
||||
|
||||
action tagset_error {
|
||||
m.err = ErrTagParse
|
||||
err = ErrTagParse
|
||||
fhold;
|
||||
fnext discard_line;
|
||||
fbreak;
|
||||
}
|
||||
|
||||
action timestamp_error {
|
||||
m.err = ErrTimestampParse
|
||||
err = ErrTimestampParse
|
||||
fhold;
|
||||
fnext discard_line;
|
||||
fbreak;
|
||||
}
|
||||
|
||||
action parse_error {
|
||||
m.err = ErrParse
|
||||
err = ErrParse
|
||||
fhold;
|
||||
fnext discard_line;
|
||||
fbreak;
|
||||
}
|
||||
|
||||
action align_error {
|
||||
err = ErrParse
|
||||
fnext discard_line;
|
||||
fbreak;
|
||||
}
|
||||
|
||||
action hold_recover {
|
||||
fhold;
|
||||
fgoto main;
|
||||
}
|
||||
|
||||
action discard {
|
||||
action goto_align {
|
||||
fgoto align;
|
||||
}
|
||||
|
||||
action found_metric {
|
||||
foundMetric = true
|
||||
}
|
||||
|
||||
action name {
|
||||
m.handler.SetMeasurement(m.text())
|
||||
err = m.handler.SetMeasurement(m.text())
|
||||
if err != nil {
|
||||
fhold;
|
||||
fnext discard_line;
|
||||
fbreak;
|
||||
}
|
||||
}
|
||||
|
||||
action tagkey {
|
||||
@@ -78,7 +88,12 @@ action tagkey {
|
||||
}
|
||||
|
||||
action tagvalue {
|
||||
m.handler.AddTag(key, m.text())
|
||||
err = m.handler.AddTag(key, m.text())
|
||||
if err != nil {
|
||||
fhold;
|
||||
fnext discard_line;
|
||||
fbreak;
|
||||
}
|
||||
}
|
||||
|
||||
action fieldkey {
|
||||
@@ -86,32 +101,76 @@ action fieldkey {
|
||||
}
|
||||
|
||||
action integer {
|
||||
m.handler.AddInt(key, m.text())
|
||||
err = m.handler.AddInt(key, m.text())
|
||||
if err != nil {
|
||||
fhold;
|
||||
fnext discard_line;
|
||||
fbreak;
|
||||
}
|
||||
}
|
||||
|
||||
action unsigned {
|
||||
m.handler.AddUint(key, m.text())
|
||||
err = m.handler.AddUint(key, m.text())
|
||||
if err != nil {
|
||||
fhold;
|
||||
fnext discard_line;
|
||||
fbreak;
|
||||
}
|
||||
}
|
||||
|
||||
action float {
|
||||
m.handler.AddFloat(key, m.text())
|
||||
err = m.handler.AddFloat(key, m.text())
|
||||
if err != nil {
|
||||
fhold;
|
||||
fnext discard_line;
|
||||
fbreak;
|
||||
}
|
||||
}
|
||||
|
||||
action bool {
|
||||
m.handler.AddBool(key, m.text())
|
||||
err = m.handler.AddBool(key, m.text())
|
||||
if err != nil {
|
||||
fhold;
|
||||
fnext discard_line;
|
||||
fbreak;
|
||||
}
|
||||
}
|
||||
|
||||
action string {
|
||||
m.handler.AddString(key, m.text())
|
||||
err = m.handler.AddString(key, m.text())
|
||||
if err != nil {
|
||||
fhold;
|
||||
fnext discard_line;
|
||||
fbreak;
|
||||
}
|
||||
}
|
||||
|
||||
action timestamp {
|
||||
m.handler.SetTimestamp(m.text())
|
||||
err = m.handler.SetTimestamp(m.text())
|
||||
if err != nil {
|
||||
fhold;
|
||||
fnext discard_line;
|
||||
fbreak;
|
||||
}
|
||||
}
|
||||
|
||||
action incr_newline {
|
||||
m.lineno++
|
||||
m.sol = m.p
|
||||
m.sol++ // next char will be the first column in the line
|
||||
}
|
||||
|
||||
action eol {
|
||||
fnext align;
|
||||
fbreak;
|
||||
}
|
||||
|
||||
ws =
|
||||
[\t\v\f ];
|
||||
|
||||
newline =
|
||||
'\r'? '\n' %to(incr_newline);
|
||||
|
||||
non_zero_digit =
|
||||
[1-9];
|
||||
|
||||
@@ -155,7 +214,7 @@ fieldbool =
|
||||
(true | false) >begin %bool;
|
||||
|
||||
fieldstringchar =
|
||||
[^\n\f\r\\"] | '\\' [\\"];
|
||||
[^\f\r\n\\"] | '\\' [\\"] | newline;
|
||||
|
||||
fieldstring =
|
||||
fieldstringchar* >begin %string;
|
||||
@@ -172,16 +231,16 @@ fieldset =
|
||||
field ( ',' field )*;
|
||||
|
||||
tagchar =
|
||||
[^\t\n\f\r ,=\\] | ( '\\' [^\t\n\f\r] );
|
||||
[^\t\n\f\r ,=\\] | ( '\\' [^\t\n\f\r\\] ) | '\\\\' %to{ fhold; };
|
||||
|
||||
tagkey =
|
||||
tagchar+ >begin %tagkey;
|
||||
|
||||
tagvalue =
|
||||
tagchar+ >begin %tagvalue;
|
||||
tagchar+ >begin %eof(tagvalue) %tagvalue;
|
||||
|
||||
tagset =
|
||||
(',' (tagkey '=' tagvalue) $err(tagset_error))*;
|
||||
((',' tagkey '=' tagvalue) $err(tagset_error))*;
|
||||
|
||||
measurement_chars =
|
||||
[^\t\n\f\r ,\\] | ( '\\' [^\t\n\f\r] );
|
||||
@@ -190,52 +249,71 @@ measurement_start =
|
||||
measurement_chars - '#';
|
||||
|
||||
measurement =
|
||||
(measurement_start measurement_chars*) >begin %name;
|
||||
(measurement_start measurement_chars*) >begin %eof(name) %name;
|
||||
|
||||
newline =
|
||||
[\r\n];
|
||||
eol_break =
|
||||
newline %to(eol)
|
||||
;
|
||||
|
||||
comment =
|
||||
'#' (any -- newline)* newline;
|
||||
|
||||
eol =
|
||||
ws* newline? >yield %eof(yield);
|
||||
|
||||
line =
|
||||
measurement
|
||||
metric =
|
||||
measurement >err(name_error)
|
||||
tagset
|
||||
(ws+ fieldset) $err(field_error)
|
||||
ws+ fieldset $err(field_error)
|
||||
(ws+ timestamp)? $err(timestamp_error)
|
||||
eol;
|
||||
;
|
||||
|
||||
# The main machine parses a single line of line protocol.
|
||||
main := line $err(parse_error);
|
||||
line_with_term =
|
||||
ws* metric ws* eol_break
|
||||
;
|
||||
|
||||
line_without_term =
|
||||
ws* metric ws*
|
||||
;
|
||||
|
||||
main :=
|
||||
(line_with_term*
|
||||
(line_with_term | line_without_term?)
|
||||
) >found_metric
|
||||
;
|
||||
|
||||
# The discard_line machine discards the current line. Useful for recovering
|
||||
# on the next line when an error occurs.
|
||||
discard_line :=
|
||||
(any - newline)* newline @discard;
|
||||
(any -- newline)* newline @goto_align;
|
||||
|
||||
commentline =
|
||||
ws* '#' (any -- newline)* newline;
|
||||
|
||||
emptyline =
|
||||
ws* newline;
|
||||
|
||||
# The align machine scans forward to the start of the next line. This machine
|
||||
# is used to skip over whitespace and comments, keeping this logic out of the
|
||||
# main machine.
|
||||
#
|
||||
# Skip valid lines that don't contain line protocol, any other data will move
|
||||
# control to the main parser via the err action.
|
||||
align :=
|
||||
(space* comment)* space* measurement_start @hold_recover %eof(yield);
|
||||
(emptyline | commentline | ws+)* %err(hold_recover);
|
||||
|
||||
series := measurement tagset $err(parse_error) eol;
|
||||
# Series is a machine for matching measurement+tagset
|
||||
series :=
|
||||
(measurement >err(name_error) tagset eol_break?)
|
||||
>found_metric
|
||||
;
|
||||
}%%
|
||||
|
||||
%% write data;
|
||||
|
||||
type Handler interface {
|
||||
SetMeasurement(name []byte)
|
||||
AddTag(key []byte, value []byte)
|
||||
AddInt(key []byte, value []byte)
|
||||
AddUint(key []byte, value []byte)
|
||||
AddFloat(key []byte, value []byte)
|
||||
AddString(key []byte, value []byte)
|
||||
AddBool(key []byte, value []byte)
|
||||
SetTimestamp(tm []byte)
|
||||
SetMeasurement(name []byte) error
|
||||
AddTag(key []byte, value []byte) error
|
||||
AddInt(key []byte, value []byte) error
|
||||
AddUint(key []byte, value []byte) error
|
||||
AddFloat(key []byte, value []byte) error
|
||||
AddString(key []byte, value []byte) error
|
||||
AddBool(key []byte, value []byte) error
|
||||
SetTimestamp(tm []byte) error
|
||||
}
|
||||
|
||||
type machine struct {
|
||||
@@ -243,9 +321,10 @@ type machine struct {
|
||||
cs int
|
||||
p, pe, eof int
|
||||
pb int
|
||||
lineno int
|
||||
sol int
|
||||
handler Handler
|
||||
initState int
|
||||
err error
|
||||
}
|
||||
|
||||
func NewMachine(handler Handler) *machine {
|
||||
@@ -256,6 +335,7 @@ func NewMachine(handler Handler) *machine {
|
||||
|
||||
%% access m.;
|
||||
%% variable p m.p;
|
||||
%% variable cs m.cs;
|
||||
%% variable pe m.pe;
|
||||
%% variable eof m.eof;
|
||||
%% variable data m.data;
|
||||
@@ -284,55 +364,76 @@ func (m *machine) SetData(data []byte) {
|
||||
m.data = data
|
||||
m.p = 0
|
||||
m.pb = 0
|
||||
m.lineno = 1
|
||||
m.sol = 0
|
||||
m.pe = len(data)
|
||||
m.eof = len(data)
|
||||
m.err = nil
|
||||
|
||||
%% write init;
|
||||
m.cs = m.initState
|
||||
}
|
||||
|
||||
// ParseLine parses a line of input and returns true if more data can be
|
||||
// parsed.
|
||||
func (m *machine) ParseLine() bool {
|
||||
if m.data == nil || m.p >= m.pe {
|
||||
m.err = nil
|
||||
return false
|
||||
// Next parses the next metric line and returns nil if it was successfully
|
||||
// processed. If the line contains a syntax error an error is returned,
|
||||
// otherwise if the end of file is reached before finding a metric line then
|
||||
// EOF is returned.
|
||||
func (m *machine) Next() error {
|
||||
if m.p == m.pe && m.pe == m.eof {
|
||||
return EOF
|
||||
}
|
||||
|
||||
m.err = nil
|
||||
var err error
|
||||
var key []byte
|
||||
var yield bool
|
||||
foundMetric := false
|
||||
|
||||
%% write exec;
|
||||
|
||||
// Even if there was an error, return true. On the next call to this
|
||||
// function we will attempt to scan to the next line of input and recover.
|
||||
if m.err != nil {
|
||||
return true
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Don't check the error state in the case that we just yielded, because
|
||||
// the yield indicates we just completed parsing a line.
|
||||
if !yield && m.cs == LineProtocol_error {
|
||||
m.err = ErrParse
|
||||
return true
|
||||
// This would indicate an error in the machine that was reported with a
|
||||
// more specific error. We return a generic error but this should
|
||||
// possibly be a panic.
|
||||
if m.cs == %%{ write error; }%% {
|
||||
m.cs = LineProtocol_en_discard_line
|
||||
return ErrParse
|
||||
}
|
||||
|
||||
return true
|
||||
// If we haven't found a metric line yet and we reached the EOF, report it
|
||||
// now. This happens when the data ends with a comment or whitespace.
|
||||
//
|
||||
// Otherwise we have successfully parsed a metric line, so if we are at
|
||||
// the EOF we will report it the next call.
|
||||
if !foundMetric && m.p == m.pe && m.pe == m.eof {
|
||||
return EOF
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Err returns the error that occurred on the last call to ParseLine. If the
|
||||
// result is nil, then the line was parsed successfully.
|
||||
func (m *machine) Err() error {
|
||||
return m.err
|
||||
}
|
||||
|
||||
// Position returns the current position into the input.
|
||||
// Position returns the current byte offset into the data.
|
||||
func (m *machine) Position() int {
|
||||
return m.p
|
||||
}
|
||||
|
||||
// LineOffset returns the byte offset of the current line.
|
||||
func (m *machine) LineOffset() int {
|
||||
return m.sol
|
||||
}
|
||||
|
||||
// LineNumber returns the current line number. Lines are counted based on the
|
||||
// regular expression `\r?\n`.
|
||||
func (m *machine) LineNumber() int {
|
||||
return m.lineno
|
||||
}
|
||||
|
||||
// Column returns the current column.
|
||||
func (m *machine) Column() int {
|
||||
lineOffset := m.p - m.sol
|
||||
return lineOffset + 1
|
||||
}
|
||||
|
||||
func (m *machine) text() []byte {
|
||||
return m.data[m.pb:m.p]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user