Fix dropwizard parsing error for metrics that need escaped (#4142)

If the dropwizard parser cannot convert the metric name into a valid
line protocol series then we will accept the name as is.
This commit is contained in:
Daniel Nelson 2018-05-14 11:00:03 -07:00 committed by GitHub
parent b13b8a04cf
commit 2c29f8f84a
10 changed files with 8949 additions and 8197 deletions

View File

@ -54,6 +54,8 @@ type Metric interface {
AddField(key string, value interface{}) AddField(key string, value interface{})
RemoveField(key string) RemoveField(key string)
SetTime(t time.Time)
// HashID returns an unique identifier for the series. // HashID returns an unique identifier for the series.
HashID() uint64 HashID() uint64

View File

@ -202,6 +202,10 @@ func (m *metric) RemoveField(key string) {
} }
} }
func (m *metric) SetTime(t time.Time) {
m.tm = t
}
func (m *metric) Copy() telegraf.Metric { func (m *metric) Copy() telegraf.Metric {
m2 := &metric{ m2 := &metric{
name: m.name, name: m.name,

View File

@ -1,7 +1,6 @@
package dropwizard package dropwizard
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log" "log"
@ -10,6 +9,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/templating" "github.com/influxdata/telegraf/internal/templating"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
) )
@ -19,8 +19,8 @@ var keyEscaper = strings.NewReplacer(" ", "\\ ", ",", "\\,", "=", "\\=")
// Parser parses json inputs containing dropwizard metrics, // Parser parses json inputs containing dropwizard metrics,
// either top-level or embedded inside a json field. // either top-level or embedded inside a json field.
// This parser is using gjon for retrieving paths within the json file. // This parser is using gjson for retrieving paths within the json file.
type Parser struct { type parser struct {
// an optional json path containing the metric registry object // an optional json path containing the metric registry object
// if left empty, the whole json object is parsed as a metric registry // if left empty, the whole json object is parsed as a metric registry
@ -45,15 +45,28 @@ type Parser struct {
// an optional map of default tags to use for metrics // an optional map of default tags to use for metrics
DefaultTags map[string]string DefaultTags map[string]string
// templating configuration separator string
Separator string
Templates []string
templateEngine *templating.Engine templateEngine *templating.Engine
timeFunc metric.TimeFunc
// seriesParser parses line protocol measurement + tags
seriesParser *influx.Parser
}
func NewParser() *parser {
handler := influx.NewMetricHandler()
seriesParser := influx.NewSeriesParser(handler)
parser := &parser{
timeFunc: time.Now,
seriesParser: seriesParser,
}
return parser
} }
// Parse parses the input bytes to an array of metrics // Parse parses the input bytes to an array of metrics
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { func (p *parser) Parse(buf []byte) ([]telegraf.Metric, error) {
metrics := make([]telegraf.Metric, 0) metrics := make([]telegraf.Metric, 0)
@ -100,28 +113,38 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
return metrics, nil return metrics, nil
} }
// InitTemplating initializes the templating support func (p *parser) SetTemplates(separator string, templates []string) error {
func (p *Parser) InitTemplating() error { if len(templates) == 0 {
if len(p.Templates) > 0 { p.templateEngine = nil
defaultTemplate, _ := templating.NewDefaultTemplateWithPattern("measurement*") return nil
templateEngine, err := templating.NewEngine(p.Separator, defaultTemplate, p.Templates) }
p.templateEngine = templateEngine
defaultTemplate, err := templating.NewDefaultTemplateWithPattern("measurement*")
if err != nil {
return err return err
} }
templateEngine, err := templating.NewEngine(separator, defaultTemplate, templates)
if err != nil {
return err
}
p.separator = separator
p.templateEngine = templateEngine
return nil return nil
} }
// ParseLine is not supported by the dropwizard format // ParseLine is not supported by the dropwizard format
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { func (p *parser) ParseLine(line string) (telegraf.Metric, error) {
return nil, fmt.Errorf("ParseLine not supported: %s, for data format: dropwizard", line) return nil, fmt.Errorf("ParseLine not supported: %s, for data format: dropwizard", line)
} }
// SetDefaultTags sets the default tags // SetDefaultTags sets the default tags
func (p *Parser) SetDefaultTags(tags map[string]string) { func (p *parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags p.DefaultTags = tags
} }
func (p *Parser) readTags(buf []byte) map[string]string { func (p *parser) readTags(buf []byte) map[string]string {
if p.TagsPath != "" { if p.TagsPath != "" {
var tagsBytes []byte var tagsBytes []byte
@ -147,7 +170,7 @@ func (p *Parser) readTags(buf []byte) map[string]string {
return tags return tags
} }
func (p *Parser) parseTime(buf []byte) (time.Time, error) { func (p *parser) parseTime(buf []byte) (time.Time, error) {
if p.TimePath != "" { if p.TimePath != "" {
timeFormat := p.TimeFormat timeFormat := p.TimeFormat
@ -157,19 +180,19 @@ func (p *Parser) parseTime(buf []byte) (time.Time, error) {
timeString := gjson.GetBytes(buf, p.TimePath).String() timeString := gjson.GetBytes(buf, p.TimePath).String()
if timeString == "" { if timeString == "" {
err := fmt.Errorf("time not found in JSON path %s", p.TimePath) err := fmt.Errorf("time not found in JSON path %s", p.TimePath)
return time.Now().UTC(), err return p.timeFunc(), err
} }
t, err := time.Parse(timeFormat, timeString) t, err := time.Parse(timeFormat, timeString)
if err != nil { if err != nil {
err = fmt.Errorf("time %s cannot be parsed with format %s, %s", timeString, timeFormat, err) err = fmt.Errorf("time %s cannot be parsed with format %s, %s", timeString, timeFormat, err)
return time.Now().UTC(), err return p.timeFunc(), err
} }
return t.UTC(), nil return t.UTC(), nil
} }
return time.Now().UTC(), nil return p.timeFunc(), nil
} }
func (p *Parser) unmarshalMetrics(buf []byte) (map[string]interface{}, error) { func (p *parser) unmarshalMetrics(buf []byte) (map[string]interface{}, error) {
var registryBytes []byte var registryBytes []byte
if p.MetricRegistryPath != "" { if p.MetricRegistryPath != "" {
@ -195,11 +218,8 @@ func (p *Parser) unmarshalMetrics(buf []byte) (map[string]interface{}, error) {
return jsonOut, nil return jsonOut, nil
} }
func (p *Parser) readDWMetrics(metricType string, dwms interface{}, metrics []telegraf.Metric, tm time.Time) []telegraf.Metric { func (p *parser) readDWMetrics(metricType string, dwms interface{}, metrics []telegraf.Metric, tm time.Time) []telegraf.Metric {
if dwmsTyped, ok := dwms.(map[string]interface{}); ok {
switch dwmsTyped := dwms.(type) {
case map[string]interface{}:
var metricsBuffer bytes.Buffer
for dwmName, dwmFields := range dwmsTyped { for dwmName, dwmFields := range dwmsTyped {
measurementName := dwmName measurementName := dwmName
tags := make(map[string]string) tags := make(map[string]string)
@ -207,59 +227,46 @@ func (p *Parser) readDWMetrics(metricType string, dwms interface{}, metrics []te
if p.templateEngine != nil { if p.templateEngine != nil {
measurementName, tags, fieldPrefix, _ = p.templateEngine.Apply(dwmName) measurementName, tags, fieldPrefix, _ = p.templateEngine.Apply(dwmName)
if len(fieldPrefix) > 0 { if len(fieldPrefix) > 0 {
fieldPrefix = fmt.Sprintf("%s%s", fieldPrefix, p.Separator) fieldPrefix = fmt.Sprintf("%s%s", fieldPrefix, p.separator)
} }
} }
tags["metric_type"] = metricType
measurementWithTags := measurementName parsed, err := p.seriesParser.Parse([]byte(measurementName))
for tagName, tagValue := range tags { var m telegraf.Metric
tagKeyValue := fmt.Sprintf("%s=%s", keyEscaper.Replace(tagName), keyEscaper.Replace(tagValue)) if err != nil || len(parsed) != 1 {
measurementWithTags = fmt.Sprintf("%s,%s", measurementWithTags, tagKeyValue) m, err = metric.New(measurementName, map[string]string{}, map[string]interface{}{}, tm)
if err != nil {
log.Printf("W! failed to create metric of type '%s': %s\n", metricType, err)
continue
}
} else {
m = parsed[0]
m.SetTime(tm)
} }
fields := make([]string, 0) m.AddTag("metric_type", metricType)
switch t := dwmFields.(type) { for k, v := range tags {
case map[string]interface{}: // json object m.AddTag(k, v)
for fieldName, fieldValue := range t { }
key := keyEscaper.Replace(fieldPrefix + fieldName)
switch v := fieldValue.(type) { if fields, ok := dwmFields.(map[string]interface{}); ok {
case float64: for k, v := range fields {
fields = append(fields, fmt.Sprintf("%s=%f", key, v)) switch v := v.(type) {
case string: case float64, string, bool:
fields = append(fields, fmt.Sprintf("%s=\"%s\"", key, fieldEscaper.Replace(v))) m.AddField(fieldPrefix+k, v)
case bool: default:
fields = append(fields, fmt.Sprintf("%s=%t", key, v)) // ignore
default: // ignore
} }
} }
default: // ignore
} }
metricsBuffer.WriteString(fmt.Sprintf("%s,metric_type=%s ", measurementWithTags, metricType)) metrics = append(metrics, m)
metricsBuffer.WriteString(strings.Join(fields, ","))
metricsBuffer.WriteString("\n")
} }
handler := influx.NewMetricHandler()
handler.SetTimeFunc(func() time.Time { return tm })
parser := influx.NewParser(handler)
newMetrics, err := parser.Parse(metricsBuffer.Bytes())
if err != nil {
log.Printf("W! failed to create metric of type '%s': %s\n", metricType, err)
}
return append(metrics, newMetrics...)
default:
return metrics
} }
return metrics
} }
func arraymap(vs []string, f func(string) string) []string { func (p *parser) SetTimeFunc(f metric.TimeFunc) {
vsm := make([]string, len(vs)) p.timeFunc = f
for i, v := range vs {
vsm[i] = f(v)
}
return vsm
} }

View File

@ -4,13 +4,19 @@ import (
"testing" "testing"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"fmt" "fmt"
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
var TimeFunc = func() time.Time {
return time.Unix(0, 0)
}
// validEmptyJSON is a valid dropwizard json document, but without any metrics // validEmptyJSON is a valid dropwizard json document, but without any metrics
const validEmptyJSON = ` const validEmptyJSON = `
{ {
@ -24,7 +30,7 @@ const validEmptyJSON = `
` `
func TestParseValidEmptyJSON(t *testing.T) { func TestParseValidEmptyJSON(t *testing.T) {
parser := Parser{} parser := NewParser()
// Most basic vanilla test // Most basic vanilla test
metrics, err := parser.Parse([]byte(validEmptyJSON)) metrics, err := parser.Parse([]byte(validEmptyJSON))
@ -49,7 +55,7 @@ const validCounterJSON = `
` `
func TestParseValidCounterJSON(t *testing.T) { func TestParseValidCounterJSON(t *testing.T) {
parser := Parser{} parser := NewParser()
metrics, err := parser.Parse([]byte(validCounterJSON)) metrics, err := parser.Parse([]byte(validCounterJSON))
assert.NoError(t, err) assert.NoError(t, err)
@ -87,11 +93,10 @@ const validEmbeddedCounterJSON = `
func TestParseValidEmbeddedCounterJSON(t *testing.T) { func TestParseValidEmbeddedCounterJSON(t *testing.T) {
timeFormat := "2006-01-02T15:04:05Z07:00" timeFormat := "2006-01-02T15:04:05Z07:00"
metricTime, _ := time.Parse(timeFormat, "2017-02-22T15:33:03.662+03:00") metricTime, _ := time.Parse(timeFormat, "2017-02-22T15:33:03.662+03:00")
parser := Parser{ parser := NewParser()
MetricRegistryPath: "metrics", parser.MetricRegistryPath = "metrics"
TagsPath: "tags", parser.TagsPath = "tags"
TimePath: "time", parser.TimePath = "time"
}
metrics, err := parser.Parse([]byte(validEmbeddedCounterJSON)) metrics, err := parser.Parse([]byte(validEmbeddedCounterJSON))
assert.NoError(t, err) assert.NoError(t, err)
@ -109,11 +114,10 @@ func TestParseValidEmbeddedCounterJSON(t *testing.T) {
assert.True(t, metricTime.Equal(metrics[0].Time()), fmt.Sprintf("%s should be equal to %s", metrics[0].Time(), metricTime)) assert.True(t, metricTime.Equal(metrics[0].Time()), fmt.Sprintf("%s should be equal to %s", metrics[0].Time(), metricTime))
// now test json tags through TagPathsMap // now test json tags through TagPathsMap
parser2 := Parser{ parser2 := NewParser()
MetricRegistryPath: "metrics", parser2.MetricRegistryPath = "metrics"
TagPathsMap: map[string]string{"tag1": "tags.tag1"}, parser2.TagPathsMap = map[string]string{"tag1": "tags.tag1"}
TimePath: "time", parser2.TimePath = "time"
}
metrics2, err2 := parser2.Parse([]byte(validEmbeddedCounterJSON)) metrics2, err2 := parser2.Parse([]byte(validEmbeddedCounterJSON))
assert.NoError(t, err2) assert.NoError(t, err2)
assert.Equal(t, map[string]string{"metric_type": "counter", "tag1": "green"}, metrics2[0].Tags()) assert.Equal(t, map[string]string{"metric_type": "counter", "tag1": "green"}, metrics2[0].Tags())
@ -141,7 +145,7 @@ const validMeterJSON1 = `
` `
func TestParseValidMeterJSON1(t *testing.T) { func TestParseValidMeterJSON1(t *testing.T) {
parser := Parser{} parser := NewParser()
metrics, err := parser.Parse([]byte(validMeterJSON1)) metrics, err := parser.Parse([]byte(validMeterJSON1))
assert.NoError(t, err) assert.NoError(t, err)
@ -181,7 +185,7 @@ const validMeterJSON2 = `
` `
func TestParseValidMeterJSON2(t *testing.T) { func TestParseValidMeterJSON2(t *testing.T) {
parser := Parser{} parser := NewParser()
metrics, err := parser.Parse([]byte(validMeterJSON2)) metrics, err := parser.Parse([]byte(validMeterJSON2))
assert.NoError(t, err) assert.NoError(t, err)
@ -215,7 +219,7 @@ const validGaugeJSON = `
` `
func TestParseValidGaugeJSON(t *testing.T) { func TestParseValidGaugeJSON(t *testing.T) {
parser := Parser{} parser := NewParser()
metrics, err := parser.Parse([]byte(validGaugeJSON)) metrics, err := parser.Parse([]byte(validGaugeJSON))
assert.NoError(t, err) assert.NoError(t, err)
@ -254,7 +258,7 @@ const validHistogramJSON = `
` `
func TestParseValidHistogramJSON(t *testing.T) { func TestParseValidHistogramJSON(t *testing.T) {
parser := Parser{} parser := NewParser()
metrics, err := parser.Parse([]byte(validHistogramJSON)) metrics, err := parser.Parse([]byte(validHistogramJSON))
assert.NoError(t, err) assert.NoError(t, err)
@ -309,7 +313,7 @@ const validTimerJSON = `
` `
func TestParseValidTimerJSON(t *testing.T) { func TestParseValidTimerJSON(t *testing.T) {
parser := Parser{} parser := NewParser()
metrics, err := parser.Parse([]byte(validTimerJSON)) metrics, err := parser.Parse([]byte(validTimerJSON))
assert.NoError(t, err) assert.NoError(t, err)
@ -360,7 +364,7 @@ const validAllJSON = `
` `
func TestParseValidAllJSON(t *testing.T) { func TestParseValidAllJSON(t *testing.T) {
parser := Parser{} parser := NewParser()
metrics, err := parser.Parse([]byte(validAllJSON)) metrics, err := parser.Parse([]byte(validAllJSON))
assert.NoError(t, err) assert.NoError(t, err)
@ -369,18 +373,19 @@ func TestParseValidAllJSON(t *testing.T) {
func TestTagParsingProblems(t *testing.T) { func TestTagParsingProblems(t *testing.T) {
// giving a wrong path results in empty tags // giving a wrong path results in empty tags
parser1 := Parser{MetricRegistryPath: "metrics", TagsPath: "tags1"} parser1 := NewParser()
parser1.MetricRegistryPath = "metrics"
parser1.TagsPath = "tags1"
metrics1, err1 := parser1.Parse([]byte(validEmbeddedCounterJSON)) metrics1, err1 := parser1.Parse([]byte(validEmbeddedCounterJSON))
assert.NoError(t, err1) assert.NoError(t, err1)
assert.Len(t, metrics1, 1) assert.Len(t, metrics1, 1)
assert.Equal(t, map[string]string{"metric_type": "counter"}, metrics1[0].Tags()) assert.Equal(t, map[string]string{"metric_type": "counter"}, metrics1[0].Tags())
// giving a wrong TagsPath falls back to TagPathsMap // giving a wrong TagsPath falls back to TagPathsMap
parser2 := Parser{ parser2 := NewParser()
MetricRegistryPath: "metrics", parser2.MetricRegistryPath = "metrics"
TagsPath: "tags1", parser2.TagsPath = "tags1"
TagPathsMap: map[string]string{"tag1": "tags.tag1"}, parser2.TagPathsMap = map[string]string{"tag1": "tags.tag1"}
}
metrics2, err2 := parser2.Parse([]byte(validEmbeddedCounterJSON)) metrics2, err2 := parser2.Parse([]byte(validEmbeddedCounterJSON))
assert.NoError(t, err2) assert.NoError(t, err2)
assert.Len(t, metrics2, 1) assert.Len(t, metrics2, 1)
@ -425,23 +430,21 @@ const sampleTemplateJSON = `
` `
func TestParseSampleTemplateJSON(t *testing.T) { func TestParseSampleTemplateJSON(t *testing.T) {
parser := Parser{ parser := NewParser()
Separator: "_", err := parser.SetTemplates("_", []string{
Templates: []string{ "jenkins.* measurement.metric.metric.field",
"jenkins.* measurement.metric.metric.field", "vm.* measurement.measurement.pool.field",
"vm.* measurement.measurement.pool.field", })
}, require.NoError(t, err)
}
parser.InitTemplating()
metrics, err := parser.Parse([]byte(sampleTemplateJSON)) metrics, err := parser.Parse([]byte(sampleTemplateJSON))
assert.NoError(t, err) require.NoError(t, err)
assert.Len(t, metrics, 11) require.Len(t, metrics, 11)
jenkinsMetric := search(metrics, "jenkins", nil, "") jenkinsMetric := search(metrics, "jenkins", nil, "")
assert.NotNil(t, jenkinsMetric, "the metrics should contain a jenkins measurement") require.NotNil(t, jenkinsMetric, "the metrics should contain a jenkins measurement")
assert.Equal(t, map[string]interface{}{ require.Equal(t, map[string]interface{}{
"duration_count": float64(1), "duration_count": float64(1),
"duration_max": float64(2), "duration_max": float64(2),
"duration_mean": float64(3), "duration_mean": float64(3),
@ -454,21 +457,21 @@ func TestParseSampleTemplateJSON(t *testing.T) {
"duration_p999": float64(10), "duration_p999": float64(10),
"duration_stddev": float64(11), "duration_stddev": float64(11),
}, jenkinsMetric.Fields()) }, jenkinsMetric.Fields())
assert.Equal(t, map[string]string{"metric_type": "histogram", "metric": "job_building"}, jenkinsMetric.Tags()) require.Equal(t, map[string]string{"metric_type": "histogram", "metric": "job_building"}, jenkinsMetric.Tags())
vmMemoryHeapCommitted := search(metrics, "vm_memory", map[string]string{"pool": "heap"}, "committed_value") vmMemoryHeapCommitted := search(metrics, "vm_memory", map[string]string{"pool": "heap"}, "committed_value")
assert.NotNil(t, vmMemoryHeapCommitted) require.NotNil(t, vmMemoryHeapCommitted)
assert.Equal(t, map[string]interface{}{ require.Equal(t, map[string]interface{}{
"committed_value": float64(1), "committed_value": float64(1),
}, vmMemoryHeapCommitted.Fields()) }, vmMemoryHeapCommitted.Fields())
assert.Equal(t, map[string]string{"metric_type": "gauge", "pool": "heap"}, vmMemoryHeapCommitted.Tags()) require.Equal(t, map[string]string{"metric_type": "gauge", "pool": "heap"}, vmMemoryHeapCommitted.Tags())
vmMemoryNonHeapCommitted := search(metrics, "vm_memory", map[string]string{"pool": "non-heap"}, "committed_value") vmMemoryNonHeapCommitted := search(metrics, "vm_memory", map[string]string{"pool": "non-heap"}, "committed_value")
assert.NotNil(t, vmMemoryNonHeapCommitted) require.NotNil(t, vmMemoryNonHeapCommitted)
assert.Equal(t, map[string]interface{}{ require.Equal(t, map[string]interface{}{
"committed_value": float64(6), "committed_value": float64(6),
}, vmMemoryNonHeapCommitted.Fields()) }, vmMemoryNonHeapCommitted.Fields())
assert.Equal(t, map[string]string{"metric_type": "gauge", "pool": "non-heap"}, vmMemoryNonHeapCommitted.Tags()) require.Equal(t, map[string]string{"metric_type": "gauge", "pool": "non-heap"}, vmMemoryNonHeapCommitted.Tags())
} }
func search(metrics []telegraf.Metric, name string, tags map[string]string, fieldName string) telegraf.Metric { func search(metrics []telegraf.Metric, name string, tags map[string]string, fieldName string) telegraf.Metric {
@ -493,3 +496,105 @@ func containsAll(t1 map[string]string, t2 map[string]string) bool {
} }
return true return true
} }
func Metric(v telegraf.Metric, err error) telegraf.Metric {
if err != nil {
panic(err)
}
return v
}
func NoError(t *testing.T, err error) {
require.NoError(t, err)
}
func TestDropWizard(t *testing.T) {
tests := []struct {
name string
input []byte
metrics []telegraf.Metric
errFunc func(t *testing.T, err error)
}{
{
name: "minimal",
input: []byte(`{"version": "3.0.0", "counters": {"cpu": {"value": 42}}}`),
metrics: []telegraf.Metric{
Metric(
metric.New(
"cpu",
map[string]string{
"metric_type": "counter",
},
map[string]interface{}{
"value": 42.0,
},
TimeFunc(),
),
),
},
errFunc: NoError,
},
{
name: "name with space unescaped",
input: []byte(`{"version": "3.0.0", "counters": {"hello world": {"value": 42}}}`),
metrics: []telegraf.Metric{
Metric(
metric.New(
"hello world",
map[string]string{
"metric_type": "counter",
},
map[string]interface{}{
"value": 42.0,
},
TimeFunc(),
),
),
},
errFunc: NoError,
},
{
name: "name with space single slash escaped is not valid JSON",
input: []byte(`{"version": "3.0.0", "counters": {"hello\ world": {"value": 42}}}`),
errFunc: func(t *testing.T, err error) {
require.Error(t, err)
},
},
{
name: "name with space double slash escape",
input: []byte(`{"version": "3.0.0", "counters": {"hello\\ world": {"value": 42}}}`),
metrics: []telegraf.Metric{
Metric(
metric.New(
"hello world",
map[string]string{
"metric_type": "counter",
},
map[string]interface{}{
"value": 42.0,
},
TimeFunc(),
),
),
},
errFunc: NoError,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
parser := NewParser()
parser.SetTimeFunc(TimeFunc)
metrics, err := parser.Parse(tt.input)
tt.errFunc(t, err)
require.Equal(t, len(tt.metrics), len(metrics))
for i, expected := range tt.metrics {
require.Equal(t, expected.Name(), metrics[i].Name())
require.Equal(t, expected.Tags(), metrics[i].Tags())
require.Equal(t, expected.Fields(), metrics[i].Fields())
require.Equal(t, expected.Time(), metrics[i].Time())
}
})
}
}

File diff suppressed because it is too large Load Diff

View File

@ -221,6 +221,8 @@ discard_line :=
# main machine. # main machine.
align := align :=
(space* comment)* space* measurement_start @hold_recover %eof(yield); (space* comment)* space* measurement_start @hold_recover %eof(yield);
series := measurement tagset $err(parse_error) eol;
}%% }%%
%% write data; %% write data;
@ -242,12 +244,30 @@ type machine struct {
p, pe, eof int p, pe, eof int
pb int pb int
handler Handler handler Handler
initState int
err error err error
} }
func NewMachine(handler Handler) *machine { func NewMachine(handler Handler) *machine {
m := &machine{ m := &machine{
handler: handler, handler: handler,
initState: LineProtocol_en_align,
}
%% access m.;
%% variable p m.p;
%% variable pe m.pe;
%% variable eof m.eof;
%% variable data m.data;
%% write init;
return m
}
func NewSeriesMachine(handler Handler) *machine {
m := &machine{
handler: handler,
initState: LineProtocol_en_series,
} }
%% access m.; %% access m.;
@ -269,7 +289,7 @@ func (m *machine) SetData(data []byte) {
m.err = nil m.err = nil
%% write init; %% write init;
m.cs = LineProtocol_en_align m.cs = m.initState
} }
// ParseLine parses a line of input and returns true if more data can be // ParseLine parses a line of input and returns true if more data can be

View File

@ -1390,3 +1390,80 @@ func BenchmarkMachineProcstat(b *testing.B) {
} }
} }
} }
func TestSeriesMachine(t *testing.T) {
var tests = []struct {
name string
input []byte
results []Result
err error
}{
{
name: "empty string",
input: []byte(""),
results: nil,
},
{
name: "no tags",
input: []byte("cpu"),
results: []Result{
Result{
Name: Measurement,
Value: []byte("cpu"),
},
},
},
{
name: "tags",
input: []byte("cpu,a=x,b=y"),
results: []Result{
Result{
Name: Measurement,
Value: []byte("cpu"),
},
Result{
Name: TagKey,
Value: []byte("a"),
},
Result{
Name: TagValue,
Value: []byte("x"),
},
Result{
Name: TagKey,
Value: []byte("b"),
},
Result{
Name: TagValue,
Value: []byte("y"),
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
handler := &TestingHandler{}
fsm := NewSeriesMachine(handler)
fsm.SetData(tt.input)
count := 0
for fsm.ParseLine() {
if fsm.Err() != nil {
handler.AddError(fsm.Err())
}
count++
if count > 20 {
break
}
}
if fsm.Err() != nil {
handler.AddError(fsm.Err())
}
results := handler.Results()
require.Equal(t, tt.results, results)
})
}
}

View File

@ -38,6 +38,7 @@ type Parser struct {
handler *MetricHandler handler *MetricHandler
} }
// NewParser returns a Parser than accepts line protocol
func NewParser(handler *MetricHandler) *Parser { func NewParser(handler *MetricHandler) *Parser {
return &Parser{ return &Parser{
machine: NewMachine(handler), machine: NewMachine(handler),
@ -45,6 +46,14 @@ func NewParser(handler *MetricHandler) *Parser {
} }
} }
// NewSeriesParser returns a Parser than accepts a measurement and tagset
func NewSeriesParser(handler *MetricHandler) *Parser {
return &Parser{
machine: NewSeriesMachine(handler),
handler: handler,
}
}
func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) { func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
p.Lock() p.Lock()
defer p.Unlock() defer p.Unlock()

View File

@ -616,3 +616,83 @@ func BenchmarkParser(b *testing.B) {
}) })
} }
} }
func TestSeriesParser(t *testing.T) {
var tests = []struct {
name string
input []byte
timeFunc func() time.Time
precision time.Duration
metrics []telegraf.Metric
err error
}{
{
name: "empty",
input: []byte(""),
metrics: []telegraf.Metric{},
},
{
name: "minimal",
input: []byte("cpu"),
metrics: []telegraf.Metric{
Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{},
time.Unix(0, 0),
),
),
},
},
{
name: "tags",
input: []byte("cpu,a=x,b=y"),
metrics: []telegraf.Metric{
Metric(
metric.New(
"cpu",
map[string]string{
"a": "x",
"b": "y",
},
map[string]interface{}{},
time.Unix(0, 0),
),
),
},
},
{
name: "missing tag value",
input: []byte("cpu,a="),
metrics: []telegraf.Metric{},
err: &ParseError{
Offset: 6,
msg: ErrTagParse.Error(),
buf: "cpu,a=",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
handler := NewMetricHandler()
handler.SetTimeFunc(DefaultTime)
if tt.timeFunc != nil {
handler.SetTimeFunc(tt.timeFunc)
}
if tt.precision > 0 {
handler.SetTimePrecision(tt.precision)
}
parser := NewSeriesParser(handler)
metrics, err := parser.Parse(tt.input)
require.Equal(t, tt.err, err)
require.Equal(t, len(tt.metrics), len(metrics))
for i, expected := range tt.metrics {
require.Equal(t, expected.Name(), metrics[i].Name())
require.Equal(t, expected.Tags(), metrics[i].Tags())
}
})
}
}

View File

@ -26,11 +26,15 @@ type Parser interface {
// Parse takes a byte buffer separated by newlines // Parse takes a byte buffer separated by newlines
// ie, `cpu.usage.idle 90\ncpu.usage.busy 10` // ie, `cpu.usage.idle 90\ncpu.usage.busy 10`
// and parses it into telegraf metrics // and parses it into telegraf metrics
//
// Must be thread-safe.
Parse(buf []byte) ([]telegraf.Metric, error) Parse(buf []byte) ([]telegraf.Metric, error)
// ParseLine takes a single string metric // ParseLine takes a single string metric
// ie, "cpu.usage.idle 90" // ie, "cpu.usage.idle 90"
// and parses it into a telegraf metric. // and parses it into a telegraf metric.
//
// Must be thread-safe.
ParseLine(line string) (telegraf.Metric, error) ParseLine(line string) (telegraf.Metric, error)
// SetDefaultTags tells the parser to add all of the given tags // SetDefaultTags tells the parser to add all of the given tags
@ -107,9 +111,15 @@ func NewParser(config *Config) (Parser, error) {
parser, err = NewCollectdParser(config.CollectdAuthFile, parser, err = NewCollectdParser(config.CollectdAuthFile,
config.CollectdSecurityLevel, config.CollectdTypesDB) config.CollectdSecurityLevel, config.CollectdTypesDB)
case "dropwizard": case "dropwizard":
parser, err = NewDropwizardParser(config.DropwizardMetricRegistryPath, parser, err = NewDropwizardParser(
config.DropwizardTimePath, config.DropwizardTimeFormat, config.DropwizardTagsPath, config.DropwizardTagPathsMap, config.DefaultTags, config.DropwizardMetricRegistryPath,
config.Separator, config.Templates) config.DropwizardTimePath,
config.DropwizardTimeFormat,
config.DropwizardTagsPath,
config.DropwizardTagPathsMap,
config.DefaultTags,
config.Separator,
config.Templates)
default: default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat) err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
} }
@ -177,17 +187,16 @@ func NewDropwizardParser(
templates []string, templates []string,
) (Parser, error) { ) (Parser, error) {
parser := &dropwizard.Parser{ parser := dropwizard.NewParser()
MetricRegistryPath: metricRegistryPath, parser.MetricRegistryPath = metricRegistryPath
TimePath: timePath, parser.TimePath = timePath
TimeFormat: timeFormat, parser.TimeFormat = timeFormat
TagsPath: tagsPath, parser.TagsPath = tagsPath
TagPathsMap: tagPathsMap, parser.TagPathsMap = tagPathsMap
DefaultTags: defaultTags, parser.DefaultTags = defaultTags
Separator: separator, err := parser.SetTemplates(separator, templates)
Templates: templates, if err != nil {
return nil, err
} }
err := parser.InitTemplating()
return parser, err return parser, err
} }