Fix dropwizard parsing error for metrics that need escaped (#4142)
If the dropwizard parser cannot convert the metric name into a valid
line protocol series then we will accept the name as is.
(cherry picked from commit 0af40a8a5d)
This commit is contained in:
committed by
Daniel Nelson
parent
03141eaad2
commit
8301861b1b
File diff suppressed because it is too large
Load Diff
@@ -221,6 +221,8 @@ discard_line :=
|
||||
# main machine.
|
||||
align :=
|
||||
(space* comment)* space* measurement_start @hold_recover %eof(yield);
|
||||
|
||||
series := measurement tagset $err(parse_error) eol;
|
||||
}%%
|
||||
|
||||
%% write data;
|
||||
@@ -242,12 +244,30 @@ type machine struct {
|
||||
p, pe, eof int
|
||||
pb int
|
||||
handler Handler
|
||||
initState int
|
||||
err error
|
||||
}
|
||||
|
||||
func NewMachine(handler Handler) *machine {
|
||||
m := &machine{
|
||||
handler: handler,
|
||||
initState: LineProtocol_en_align,
|
||||
}
|
||||
|
||||
%% access m.;
|
||||
%% variable p m.p;
|
||||
%% variable pe m.pe;
|
||||
%% variable eof m.eof;
|
||||
%% variable data m.data;
|
||||
%% write init;
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
func NewSeriesMachine(handler Handler) *machine {
|
||||
m := &machine{
|
||||
handler: handler,
|
||||
initState: LineProtocol_en_series,
|
||||
}
|
||||
|
||||
%% access m.;
|
||||
@@ -269,7 +289,7 @@ func (m *machine) SetData(data []byte) {
|
||||
m.err = nil
|
||||
|
||||
%% write init;
|
||||
m.cs = LineProtocol_en_align
|
||||
m.cs = m.initState
|
||||
}
|
||||
|
||||
// ParseLine parses a line of input and returns true if more data can be
|
||||
|
||||
@@ -1390,3 +1390,80 @@ func BenchmarkMachineProcstat(b *testing.B) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSeriesMachine(t *testing.T) {
|
||||
var tests = []struct {
|
||||
name string
|
||||
input []byte
|
||||
results []Result
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "empty string",
|
||||
input: []byte(""),
|
||||
results: nil,
|
||||
},
|
||||
{
|
||||
name: "no tags",
|
||||
input: []byte("cpu"),
|
||||
results: []Result{
|
||||
Result{
|
||||
Name: Measurement,
|
||||
Value: []byte("cpu"),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "tags",
|
||||
input: []byte("cpu,a=x,b=y"),
|
||||
results: []Result{
|
||||
Result{
|
||||
Name: Measurement,
|
||||
Value: []byte("cpu"),
|
||||
},
|
||||
Result{
|
||||
Name: TagKey,
|
||||
Value: []byte("a"),
|
||||
},
|
||||
Result{
|
||||
Name: TagValue,
|
||||
Value: []byte("x"),
|
||||
},
|
||||
Result{
|
||||
Name: TagKey,
|
||||
Value: []byte("b"),
|
||||
},
|
||||
Result{
|
||||
Name: TagValue,
|
||||
Value: []byte("y"),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
handler := &TestingHandler{}
|
||||
fsm := NewSeriesMachine(handler)
|
||||
fsm.SetData(tt.input)
|
||||
|
||||
count := 0
|
||||
for fsm.ParseLine() {
|
||||
if fsm.Err() != nil {
|
||||
handler.AddError(fsm.Err())
|
||||
}
|
||||
count++
|
||||
if count > 20 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if fsm.Err() != nil {
|
||||
handler.AddError(fsm.Err())
|
||||
}
|
||||
|
||||
results := handler.Results()
|
||||
require.Equal(t, tt.results, results)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,6 +38,7 @@ type Parser struct {
|
||||
handler *MetricHandler
|
||||
}
|
||||
|
||||
// NewParser returns a Parser than accepts line protocol
|
||||
func NewParser(handler *MetricHandler) *Parser {
|
||||
return &Parser{
|
||||
machine: NewMachine(handler),
|
||||
@@ -45,6 +46,14 @@ func NewParser(handler *MetricHandler) *Parser {
|
||||
}
|
||||
}
|
||||
|
||||
// NewSeriesParser returns a Parser than accepts a measurement and tagset
|
||||
func NewSeriesParser(handler *MetricHandler) *Parser {
|
||||
return &Parser{
|
||||
machine: NewSeriesMachine(handler),
|
||||
handler: handler,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
@@ -616,3 +616,83 @@ func BenchmarkParser(b *testing.B) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSeriesParser(t *testing.T) {
|
||||
var tests = []struct {
|
||||
name string
|
||||
input []byte
|
||||
timeFunc func() time.Time
|
||||
precision time.Duration
|
||||
metrics []telegraf.Metric
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "empty",
|
||||
input: []byte(""),
|
||||
metrics: []telegraf.Metric{},
|
||||
},
|
||||
{
|
||||
name: "minimal",
|
||||
input: []byte("cpu"),
|
||||
metrics: []telegraf.Metric{
|
||||
Metric(
|
||||
metric.New(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "tags",
|
||||
input: []byte("cpu,a=x,b=y"),
|
||||
metrics: []telegraf.Metric{
|
||||
Metric(
|
||||
metric.New(
|
||||
"cpu",
|
||||
map[string]string{
|
||||
"a": "x",
|
||||
"b": "y",
|
||||
},
|
||||
map[string]interface{}{},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "missing tag value",
|
||||
input: []byte("cpu,a="),
|
||||
metrics: []telegraf.Metric{},
|
||||
err: &ParseError{
|
||||
Offset: 6,
|
||||
msg: ErrTagParse.Error(),
|
||||
buf: "cpu,a=",
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
handler := NewMetricHandler()
|
||||
handler.SetTimeFunc(DefaultTime)
|
||||
if tt.timeFunc != nil {
|
||||
handler.SetTimeFunc(tt.timeFunc)
|
||||
}
|
||||
if tt.precision > 0 {
|
||||
handler.SetTimePrecision(tt.precision)
|
||||
}
|
||||
parser := NewSeriesParser(handler)
|
||||
|
||||
metrics, err := parser.Parse(tt.input)
|
||||
require.Equal(t, tt.err, err)
|
||||
|
||||
require.Equal(t, len(tt.metrics), len(metrics))
|
||||
for i, expected := range tt.metrics {
|
||||
require.Equal(t, expected.Name(), metrics[i].Name())
|
||||
require.Equal(t, expected.Tags(), metrics[i].Tags())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user