diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index 417238ec3..00ead6e38 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -10,6 +10,7 @@ Telegraf is able to parse the following input data formats into metrics: 1. [Collectd](#collectd) 1. [Dropwizard](#dropwizard) 1. [Grok](#grok) +1. [Wavefront](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#wavefront) Telegraf metrics, like InfluxDB [points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/), @@ -881,3 +882,29 @@ the file output will only print once per `flush_interval`. - Continue one token at a time until the entire line is successfully parsed. +``` + +# Wavefront: + +Wavefront Data Format is metrics are parsed directly into Telegraf metrics. +For more information about the Wavefront Data Format see +[here](https://docs.wavefront.com/wavefront_data_format.html). + +There are no additional configuration options for Wavefront Data Format line-protocol. + +#### Wavefront Configuration: + +```toml +[[inputs.exec]] + ## Commands array + commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"] + + ## measurement name suffix (for separating different commands) + name_suffix = "_mycollector" + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "wavefront" +``` diff --git a/plugins/outputs/wavefront/wavefront.go b/plugins/outputs/wavefront/wavefront.go index 18c5a6495..ef36d1804 100644 --- a/plugins/outputs/wavefront/wavefront.go +++ b/plugins/outputs/wavefront/wavefront.go @@ -189,26 +189,32 @@ func buildTags(mTags map[string]string, w *Wavefront) (string, map[string]string } var source string - sourceTagFound := false - for _, s := range w.SourceOverride { - for k, v := range mTags { - if k == s { - source = v - mTags["telegraf_host"] = mTags["host"] - sourceTagFound = true - delete(mTags, k) + if s, ok := mTags["source"]; ok { + source = s + delete(mTags, "source") + } else { + sourceTagFound := false + for _, s := range w.SourceOverride { + for k, v := range mTags { + if k == s { + source = v + mTags["telegraf_host"] = mTags["host"] + sourceTagFound = true + delete(mTags, k) + break + } + } + if sourceTagFound { break } } - if sourceTagFound { - break + + if !sourceTagFound { + source = mTags["host"] } } - if !sourceTagFound { - source = mTags["host"] - } delete(mTags, "host") return tagValueReplacer.Replace(source), mTags diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 24e73d4b6..1e395047a 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -13,6 +13,7 @@ import ( "github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/plugins/parsers/nagios" "github.com/influxdata/telegraf/plugins/parsers/value" + "github.com/influxdata/telegraf/plugins/parsers/wavefront" ) // ParserInput is an interface for input plugins that are able to parse @@ -131,6 +132,8 @@ func NewParser(config *Config) (Parser, error) { config.DefaultTags, config.Separator, config.Templates) + case "wavefront": + parser, err = NewWavefrontParser(config.DefaultTags) case "grok": parser, err = newGrokParser( config.MetricName, @@ -238,3 +241,7 @@ func NewDropwizardParser( } return parser, err } + +func NewWavefrontParser(defaultTags map[string]string) (Parser, error) { + return wavefront.NewWavefrontParser(defaultTags), nil +} diff --git a/plugins/parsers/wavefront/element.go b/plugins/parsers/wavefront/element.go new file mode 100644 index 000000000..4e40238e7 --- /dev/null +++ b/plugins/parsers/wavefront/element.go @@ -0,0 +1,238 @@ +package wavefront + +import ( + "errors" + "fmt" + "strconv" + "time" +) + +var ( + ErrEOF = errors.New("EOF") + ErrInvalidTimestamp = errors.New("Invalid timestamp") +) + +// Interface for parsing line elements. +type ElementParser interface { + parse(p *PointParser, pt *Point) error +} + +type NameParser struct{} +type ValueParser struct{} +type TimestampParser struct { + optional bool +} +type WhiteSpaceParser struct { + nextOptional bool +} +type TagParser struct{} +type LoopedParser struct { + wrappedParser ElementParser + wsPaser *WhiteSpaceParser +} +type LiteralParser struct { + literal string +} + +func (ep *NameParser) parse(p *PointParser, pt *Point) error { + //Valid characters are: a-z, A-Z, 0-9, hyphen ("-"), underscore ("_"), dot ("."). + // Forward slash ("/") and comma (",") are allowed if metricName is enclosed in double quotes. + name, err := parseLiteral(p) + if err != nil { + return err + } + pt.Name = name + return nil +} + +func (ep *ValueParser) parse(p *PointParser, pt *Point) error { + tok, lit := p.scan() + if tok == EOF { + return fmt.Errorf("found %q, expected number", lit) + } + + p.writeBuf.Reset() + if tok == MINUS_SIGN { + p.writeBuf.WriteString(lit) + tok, lit = p.scan() + } + + for tok != EOF && (tok == LETTER || tok == NUMBER || tok == DOT) { + p.writeBuf.WriteString(lit) + tok, lit = p.scan() + } + p.unscan() + + pt.Value = p.writeBuf.String() + _, err := strconv.ParseFloat(pt.Value, 64) + if err != nil { + return fmt.Errorf("invalid metric value %s", pt.Value) + } + return nil +} + +func (ep *TimestampParser) parse(p *PointParser, pt *Point) error { + tok, lit := p.scan() + if tok == EOF { + if ep.optional { + p.unscanTokens(2) + return setTimestamp(pt, 0, 1) + } + return fmt.Errorf("found %q, expected number", lit) + } + + if tok != NUMBER { + if ep.optional { + p.unscanTokens(2) + return setTimestamp(pt, 0, 1) + } + return ErrInvalidTimestamp + } + + p.writeBuf.Reset() + for tok != EOF && tok == NUMBER { + p.writeBuf.WriteString(lit) + tok, lit = p.scan() + } + p.unscan() + + tsStr := p.writeBuf.String() + ts, err := strconv.ParseInt(tsStr, 10, 64) + if err != nil { + return err + } + return setTimestamp(pt, ts, len(tsStr)) +} + +func setTimestamp(pt *Point, ts int64, numDigits int) error { + + if numDigits == 19 { + // nanoseconds + ts = ts / 1e9 + } else if numDigits == 16 { + // microseconds + ts = ts / 1e6 + } else if numDigits == 13 { + // milliseconds + ts = ts / 1e3 + } else if numDigits != 10 { + // must be in seconds, return error if not 0 + if ts == 0 { + ts = getCurrentTime() + } else { + return ErrInvalidTimestamp + } + } + pt.Timestamp = ts + return nil +} + +func (ep *LoopedParser) parse(p *PointParser, pt *Point) error { + for { + err := ep.wrappedParser.parse(p, pt) + if err != nil { + return err + } + err = ep.wsPaser.parse(p, pt) + if err == ErrEOF { + break + } + } + return nil +} + +func (ep *TagParser) parse(p *PointParser, pt *Point) error { + k, err := parseLiteral(p) + if err != nil { + if k == "" { + return nil + } + return err + } + + next, lit := p.scan() + if next != EQUALS { + return fmt.Errorf("found %q, expected equals", lit) + } + + v, err := parseLiteral(p) + if err != nil { + return err + } + if len(pt.Tags) == 0 { + pt.Tags = make(map[string]string) + } + pt.Tags[k] = v + return nil +} + +func (ep *WhiteSpaceParser) parse(p *PointParser, pt *Point) error { + tok := WS + for tok != EOF && tok == WS { + tok, _ = p.scan() + } + + if tok == EOF { + if !ep.nextOptional { + return ErrEOF + } + return nil + } + p.unscan() + return nil +} + +func (ep *LiteralParser) parse(p *PointParser, pt *Point) error { + l, err := parseLiteral(p) + if err != nil { + return err + } + + if l != ep.literal { + return fmt.Errorf("found %s, expected %s", l, ep.literal) + } + return nil +} + +func parseQuotedLiteral(p *PointParser) (string, error) { + p.writeBuf.Reset() + + escaped := false + tok, lit := p.scan() + for tok != EOF && (tok != QUOTES || (tok == QUOTES && escaped)) { + // let everything through + escaped = tok == BACKSLASH + p.writeBuf.WriteString(lit) + tok, lit = p.scan() + } + if tok == EOF { + return "", fmt.Errorf("found %q, expected quotes", lit) + } + return p.writeBuf.String(), nil +} + +func parseLiteral(p *PointParser) (string, error) { + tok, lit := p.scan() + if tok == EOF { + return "", fmt.Errorf("found %q, expected literal", lit) + } + + if tok == QUOTES { + return parseQuotedLiteral(p) + } + + p.writeBuf.Reset() + for tok != EOF && tok > literal_beg && tok < literal_end { + p.writeBuf.WriteString(lit) + tok, lit = p.scan() + } + if tok == QUOTES { + return "", errors.New("found quote inside unquoted literal") + } + p.unscan() + return p.writeBuf.String(), nil +} + +func getCurrentTime() int64 { + return time.Now().UnixNano() / 1e9 +} diff --git a/plugins/parsers/wavefront/parser.go b/plugins/parsers/wavefront/parser.go new file mode 100644 index 000000000..f5fc88dbf --- /dev/null +++ b/plugins/parsers/wavefront/parser.go @@ -0,0 +1,203 @@ +package wavefront + +import ( + "bufio" + "bytes" + "io" + "log" + "strconv" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" +) + +const MAX_BUFFER_SIZE = 2 + +type Point struct { + Name string + Value string + Timestamp int64 + Source string + Tags map[string]string +} + +// Parser represents a parser. +type PointParser struct { + s *PointScanner + buf struct { + tok []Token // last read n tokens + lit []string // last read n literals + n int // unscanned buffer size (max=2) + } + scanBuf bytes.Buffer // buffer reused for scanning tokens + writeBuf bytes.Buffer // buffer reused for parsing elements + Elements []ElementParser + defaultTags map[string]string +} + +// Returns a slice of ElementParser's for the Graphite format +func NewWavefrontElements() []ElementParser { + var elements []ElementParser + wsParser := WhiteSpaceParser{} + wsParserNextOpt := WhiteSpaceParser{nextOptional: true} + repeatParser := LoopedParser{wrappedParser: &TagParser{}, wsPaser: &wsParser} + elements = append(elements, &NameParser{}, &wsParser, &ValueParser{}, &wsParserNextOpt, + &TimestampParser{optional: true}, &wsParserNextOpt, &repeatParser) + return elements +} + +func NewWavefrontParser(defaultTags map[string]string) *PointParser { + elements := NewWavefrontElements() + return &PointParser{Elements: elements, defaultTags: defaultTags} +} + +func (p *PointParser) Parse(buf []byte) ([]telegraf.Metric, error) { + + // parse even if the buffer begins with a newline + buf = bytes.TrimPrefix(buf, []byte("\n")) + // add newline to end if not exists: + if len(buf) > 0 && !bytes.HasSuffix(buf, []byte("\n")) { + buf = append(buf, []byte("\n")...) + } + + points := make([]Point, 0) + + buffer := bytes.NewBuffer(buf) + reader := bufio.NewReader(buffer) + for { + // Read up to the next newline. + buf, err := reader.ReadBytes('\n') + if err == io.EOF { + break + } + + p.reset(buf) + point := Point{} + for _, element := range p.Elements { + err := element.parse(p, &point) + if err != nil { + return nil, err + } + } + + points = append(points, point) + } + + metrics, err := p.convertPointToTelegrafMetric(points) + if err != nil { + return nil, err + } + return metrics, nil +} + +func (p *PointParser) ParseLine(line string) (telegraf.Metric, error) { + buf := []byte(line) + metrics, err := p.Parse(buf) + if err != nil { + return nil, err + } + + if len(metrics) > 0 { + return metrics[0], nil + } + + return nil, nil +} + +func (p *PointParser) SetDefaultTags(tags map[string]string) { + p.defaultTags = tags +} + +func (p *PointParser) convertPointToTelegrafMetric(points []Point) ([]telegraf.Metric, error) { + + metrics := make([]telegraf.Metric, 0) + + for _, point := range points { + tags := make(map[string]string) + for k, v := range point.Tags { + tags[k] = v + } + // apply default tags after parsed tags + for k, v := range p.defaultTags { + tags[k] = v + } + + // single field for value + fields := make(map[string]interface{}) + v, err := strconv.ParseFloat(point.Value, 64) + if err != nil { + return nil, err + } + fields["value"] = v + + m, err := metric.New(point.Name, tags, fields, time.Unix(point.Timestamp, 0)) + if err != nil { + return nil, err + } + + metrics = append(metrics, m) + } + + return metrics, nil +} + +// scan returns the next token from the underlying scanner. +// If a token has been unscanned then read that from the internal buffer instead. +func (p *PointParser) scan() (Token, string) { + // If we have a token on the buffer, then return it. + if p.buf.n != 0 { + idx := p.buf.n % MAX_BUFFER_SIZE + tok, lit := p.buf.tok[idx], p.buf.lit[idx] + p.buf.n -= 1 + return tok, lit + } + + // Otherwise read the next token from the scanner. + tok, lit := p.s.Scan() + + // Save it to the buffer in case we unscan later. + p.buffer(tok, lit) + + return tok, lit +} + +func (p *PointParser) buffer(tok Token, lit string) { + // create the buffer if it is empty + if len(p.buf.tok) == 0 { + p.buf.tok = make([]Token, MAX_BUFFER_SIZE) + p.buf.lit = make([]string, MAX_BUFFER_SIZE) + } + + // for now assume a simple circular buffer of length two + p.buf.tok[0], p.buf.lit[0] = p.buf.tok[1], p.buf.lit[1] + p.buf.tok[1], p.buf.lit[1] = tok, lit +} + +// unscan pushes the previously read token back onto the buffer. +func (p *PointParser) unscan() { + p.unscanTokens(1) +} + +func (p *PointParser) unscanTokens(n int) { + if n > MAX_BUFFER_SIZE { + // just log for now + log.Printf("cannot unscan more than %d tokens", MAX_BUFFER_SIZE) + } + p.buf.n += n +} + +func (p *PointParser) reset(buf []byte) { + + // reset the scan buffer and write new byte + p.scanBuf.Reset() + p.scanBuf.Write(buf) + + if p.s == nil { + p.s = NewScanner(&p.scanBuf) + } else { + // reset p.s.r passing in the buffer as the reader + p.s.r.Reset(&p.scanBuf) + } + p.buf.n = 0 +} diff --git a/plugins/parsers/wavefront/parser_test.go b/plugins/parsers/wavefront/parser_test.go new file mode 100644 index 000000000..85367fa1a --- /dev/null +++ b/plugins/parsers/wavefront/parser_test.go @@ -0,0 +1,204 @@ +package wavefront + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/stretchr/testify/assert" +) + +func TestParse(t *testing.T) { + parser := NewWavefrontParser(nil) + + parsedMetrics, err := parser.Parse([]byte("test.metric 1")) + assert.NoError(t, err) + testMetric, err := metric.New("test.metric", map[string]string{}, map[string]interface{}{"value": 1.}, time.Unix(0, 0)) + assert.NoError(t, err) + assert.Equal(t, parsedMetrics[0].Name(), testMetric.Name()) + assert.Equal(t, parsedMetrics[0].Fields(), testMetric.Fields()) + + parsedMetrics, err = parser.Parse([]byte("test.metric 1 1530939936")) + assert.NoError(t, err) + testMetric, err = metric.New("test.metric", map[string]string{}, map[string]interface{}{"value": 1.}, time.Unix(1530939936, 0)) + assert.NoError(t, err) + assert.EqualValues(t, parsedMetrics[0], testMetric) + + parsedMetrics, err = parser.Parse([]byte("test.metric 1 1530939936 source=mysource")) + assert.NoError(t, err) + testMetric, err = metric.New("test.metric", map[string]string{"source": "mysource"}, map[string]interface{}{"value": 1.}, time.Unix(1530939936, 0)) + assert.NoError(t, err) + assert.EqualValues(t, parsedMetrics[0], testMetric) + + parsedMetrics, err = parser.Parse([]byte("\"test.metric\" 1.1234 1530939936 source=\"mysource\"")) + assert.NoError(t, err) + testMetric, err = metric.New("test.metric", map[string]string{"source": "mysource"}, map[string]interface{}{"value": 1.1234}, time.Unix(1530939936, 0)) + assert.NoError(t, err) + assert.EqualValues(t, parsedMetrics[0], testMetric) + + parsedMetrics, err = parser.Parse([]byte("\"test.metric\" 1.1234 1530939936 \"source\"=\"mysource\" tag2=value2")) + assert.NoError(t, err) + testMetric, err = metric.New("test.metric", map[string]string{"source": "mysource", "tag2": "value2"}, map[string]interface{}{"value": 1.1234}, time.Unix(1530939936, 0)) + assert.NoError(t, err) + assert.EqualValues(t, parsedMetrics[0], testMetric) + + parsedMetrics, err = parser.Parse([]byte("test.metric 1.1234 1530939936 source=\"mysource\" tag2=value2 ")) + assert.NoError(t, err) + testMetric, err = metric.New("test.metric", map[string]string{"source": "mysource", "tag2": "value2"}, map[string]interface{}{"value": 1.1234}, time.Unix(1530939936, 0)) + assert.NoError(t, err) + assert.EqualValues(t, parsedMetrics[0], testMetric) + +} + +func TestParseLine(t *testing.T) { + parser := NewWavefrontParser(nil) + + parsedMetric, err := parser.ParseLine("test.metric 1") + assert.NoError(t, err) + testMetric, err := metric.New("test.metric", map[string]string{}, map[string]interface{}{"value": 1.}, time.Unix(0, 0)) + assert.NoError(t, err) + assert.Equal(t, parsedMetric.Name(), testMetric.Name()) + assert.Equal(t, parsedMetric.Fields(), testMetric.Fields()) + + parsedMetric, err = parser.ParseLine("test.metric 1 1530939936") + assert.NoError(t, err) + testMetric, err = metric.New("test.metric", map[string]string{}, map[string]interface{}{"value": 1.}, time.Unix(1530939936, 0)) + assert.NoError(t, err) + assert.EqualValues(t, parsedMetric, testMetric) + + parsedMetric, err = parser.ParseLine("test.metric 1 1530939936 source=mysource") + assert.NoError(t, err) + testMetric, err = metric.New("test.metric", map[string]string{"source": "mysource"}, map[string]interface{}{"value": 1.}, time.Unix(1530939936, 0)) + assert.NoError(t, err) + assert.EqualValues(t, parsedMetric, testMetric) + + parsedMetric, err = parser.ParseLine("\"test.metric\" 1.1234 1530939936 source=\"mysource\"") + assert.NoError(t, err) + testMetric, err = metric.New("test.metric", map[string]string{"source": "mysource"}, map[string]interface{}{"value": 1.1234}, time.Unix(1530939936, 0)) + assert.NoError(t, err) + assert.EqualValues(t, parsedMetric, testMetric) + + parsedMetric, err = parser.ParseLine("\"test.metric\" 1.1234 1530939936 \"source\"=\"mysource\" tag2=value2") + assert.NoError(t, err) + testMetric, err = metric.New("test.metric", map[string]string{"source": "mysource", "tag2": "value2"}, map[string]interface{}{"value": 1.1234}, time.Unix(1530939936, 0)) + assert.NoError(t, err) + assert.EqualValues(t, parsedMetric, testMetric) + + parsedMetric, err = parser.ParseLine("test.metric 1.1234 1530939936 source=\"mysource\" tag2=value2 ") + assert.NoError(t, err) + testMetric, err = metric.New("test.metric", map[string]string{"source": "mysource", "tag2": "value2"}, map[string]interface{}{"value": 1.1234}, time.Unix(1530939936, 0)) + assert.NoError(t, err) + assert.EqualValues(t, parsedMetric, testMetric) +} + +func TestParseMultiple(t *testing.T) { + parser := NewWavefrontParser(nil) + + parsedMetrics, err := parser.Parse([]byte("test.metric 1\ntest.metric2 2 1530939936")) + assert.NoError(t, err) + testMetric1, err := metric.New("test.metric", map[string]string{}, map[string]interface{}{"value": 1.}, time.Unix(0, 0)) + assert.NoError(t, err) + testMetric2, err := metric.New("test.metric2", map[string]string{}, map[string]interface{}{"value": 2.}, time.Unix(1530939936, 0)) + assert.NoError(t, err) + testMetrics := []telegraf.Metric{testMetric1, testMetric2} + assert.Equal(t, parsedMetrics[0].Name(), testMetrics[0].Name()) + assert.Equal(t, parsedMetrics[0].Fields(), testMetrics[0].Fields()) + assert.EqualValues(t, parsedMetrics[1], testMetrics[1]) + + parsedMetrics, err = parser.Parse([]byte("test.metric 1 1530939936 source=mysource\n\"test.metric\" 1.1234 1530939936 source=\"mysource\"")) + assert.NoError(t, err) + testMetric1, err = metric.New("test.metric", map[string]string{"source": "mysource"}, map[string]interface{}{"value": 1.}, time.Unix(1530939936, 0)) + assert.NoError(t, err) + testMetric2, err = metric.New("test.metric", map[string]string{"source": "mysource"}, map[string]interface{}{"value": 1.1234}, time.Unix(1530939936, 0)) + assert.NoError(t, err) + testMetrics = []telegraf.Metric{testMetric1, testMetric2} + assert.EqualValues(t, parsedMetrics, testMetrics) + + parsedMetrics, err = parser.Parse([]byte("\"test.metric\" 1.1234 1530939936 \"source\"=\"mysource\" tag2=value2\ntest.metric 1.1234 1530939936 source=\"mysource\" tag2=value2 ")) + assert.NoError(t, err) + testMetric1, err = metric.New("test.metric", map[string]string{"source": "mysource", "tag2": "value2"}, map[string]interface{}{"value": 1.1234}, time.Unix(1530939936, 0)) + assert.NoError(t, err) + testMetric2, err = metric.New("test.metric", map[string]string{"source": "mysource", "tag2": "value2"}, map[string]interface{}{"value": 1.1234}, time.Unix(1530939936, 0)) + assert.NoError(t, err) + testMetrics = []telegraf.Metric{testMetric1, testMetric2} + assert.EqualValues(t, parsedMetrics, testMetrics) + + parsedMetrics, err = parser.Parse([]byte("test.metric 1 1530939936 source=mysource\n\"test.metric\" 1.1234 1530939936 source=\"mysource\"\ntest.metric3 333 1530939936 tagit=valueit")) + assert.NoError(t, err) + testMetric1, err = metric.New("test.metric", map[string]string{"source": "mysource"}, map[string]interface{}{"value": 1.}, time.Unix(1530939936, 0)) + assert.NoError(t, err) + testMetric2, err = metric.New("test.metric", map[string]string{"source": "mysource"}, map[string]interface{}{"value": 1.1234}, time.Unix(1530939936, 0)) + assert.NoError(t, err) + testMetric3, err := metric.New("test.metric3", map[string]string{"tagit": "valueit"}, map[string]interface{}{"value": 333.}, time.Unix(1530939936, 0)) + assert.NoError(t, err) + testMetrics = []telegraf.Metric{testMetric1, testMetric2, testMetric3} + assert.EqualValues(t, parsedMetrics, testMetrics) + +} + +func TestParseSpecial(t *testing.T) { + parser := NewWavefrontParser(nil) + + parsedMetric, err := parser.ParseLine("\"test.metric\" 1 1530939936") + assert.NoError(t, err) + testMetric, err := metric.New("test.metric", map[string]string{}, map[string]interface{}{"value": 1.}, time.Unix(1530939936, 0)) + assert.NoError(t, err) + assert.EqualValues(t, parsedMetric, testMetric) + + parsedMetric, err = parser.ParseLine("test.metric 1 1530939936 tag1=\"val\\\"ue1\"") + assert.NoError(t, err) + testMetric, err = metric.New("test.metric", map[string]string{"tag1": "val\\\"ue1"}, map[string]interface{}{"value": 1.}, time.Unix(1530939936, 0)) + assert.NoError(t, err) + assert.EqualValues(t, parsedMetric, testMetric) + +} + +func TestParseInvalid(t *testing.T) { + parser := NewWavefrontParser(nil) + + _, err := parser.Parse([]byte("test.metric")) + assert.Error(t, err) + + _, err = parser.Parse([]byte("test.metric string")) + assert.Error(t, err) + + _, err = parser.Parse([]byte("test.metric 1 string")) + assert.Error(t, err) + + _, err = parser.Parse([]byte("test.metric 1 1530939936 tag_no_pair")) + assert.Error(t, err) + + _, err = parser.Parse([]byte("test.metric 1 1530939936 tag_broken_value=\"")) + assert.Error(t, err) + + _, err = parser.Parse([]byte("\"test.metric 1 1530939936")) + assert.Error(t, err) + + _, err = parser.Parse([]byte("test.metric 1 1530939936 tag1=val\\\"ue1")) + assert.Error(t, err) + +} + +func TestParseDefaultTags(t *testing.T) { + parser := NewWavefrontParser(map[string]string{"myDefault": "value1", "another": "test2"}) + + parsedMetrics, err := parser.Parse([]byte("test.metric 1 1530939936")) + assert.NoError(t, err) + testMetric, err := metric.New("test.metric", map[string]string{"myDefault": "value1", "another": "test2"}, map[string]interface{}{"value": 1.}, time.Unix(1530939936, 0)) + assert.NoError(t, err) + assert.EqualValues(t, parsedMetrics[0], testMetric) + + parsedMetrics, err = parser.Parse([]byte("test.metric 1 1530939936 source=mysource")) + assert.NoError(t, err) + testMetric, err = metric.New("test.metric", map[string]string{"myDefault": "value1", "another": "test2", "source": "mysource"}, map[string]interface{}{"value": 1.}, time.Unix(1530939936, 0)) + assert.NoError(t, err) + assert.EqualValues(t, parsedMetrics[0], testMetric) + + parsedMetrics, err = parser.Parse([]byte("\"test.metric\" 1.1234 1530939936 another=\"test3\"")) + assert.NoError(t, err) + testMetric, err = metric.New("test.metric", map[string]string{"myDefault": "value1", "another": "test2"}, map[string]interface{}{"value": 1.1234}, time.Unix(1530939936, 0)) + assert.NoError(t, err) + assert.EqualValues(t, parsedMetrics[0], testMetric) + +} diff --git a/plugins/parsers/wavefront/scanner.go b/plugins/parsers/wavefront/scanner.go new file mode 100644 index 000000000..e64516f54 --- /dev/null +++ b/plugins/parsers/wavefront/scanner.go @@ -0,0 +1,69 @@ +package wavefront + +import ( + "bufio" + "io" +) + +// Lexical Point Scanner +type PointScanner struct { + r *bufio.Reader +} + +func NewScanner(r io.Reader) *PointScanner { + return &PointScanner{r: bufio.NewReader(r)} +} + +// read reads the next rune from the buffered reader. +// Returns rune(0) if an error occurs (or io.EOF is returned). +func (s *PointScanner) read() rune { + ch, _, err := s.r.ReadRune() + if err != nil { + return eof + } + return ch +} + +// unread places the previously read rune back on the reader. +func (s *PointScanner) unread() { + _ = s.r.UnreadRune() +} + +// Scan returns the next token and literal value. +func (s *PointScanner) Scan() (Token, string) { + + // Read the next rune + ch := s.read() + if isWhitespace(ch) { + return WS, string(ch) + } else if isLetter(ch) { + return LETTER, string(ch) + } else if isNumber(ch) { + return NUMBER, string(ch) + } + + // Otherwise read the individual character. + switch ch { + case eof: + return EOF, "" + case '\n': + return NEWLINE, string(ch) + case '.': + return DOT, string(ch) + case '-': + return MINUS_SIGN, string(ch) + case '_': + return UNDERSCORE, string(ch) + case '/': + return SLASH, string(ch) + case '\\': + return BACKSLASH, string(ch) + case ',': + return COMMA, string(ch) + case '"': + return QUOTES, string(ch) + case '=': + return EQUALS, string(ch) + } + return ILLEGAL, string(ch) +} diff --git a/plugins/parsers/wavefront/token.go b/plugins/parsers/wavefront/token.go new file mode 100644 index 000000000..bbcbf4e76 --- /dev/null +++ b/plugins/parsers/wavefront/token.go @@ -0,0 +1,41 @@ +package wavefront + +type Token int + +const ( + // Special tokens + ILLEGAL Token = iota + EOF + WS + + // Literals + literal_beg + LETTER // metric name, source/point tags + NUMBER + MINUS_SIGN + UNDERSCORE + DOT + SLASH + BACKSLASH + COMMA + literal_end + + // Misc characters + QUOTES + EQUALS + NEWLINE +) + +func isWhitespace(ch rune) bool { + return ch == ' ' || ch == '\t' || ch == '\n' +} + +func isLetter(ch rune) bool { + return (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z') +} + +func isNumber(ch rune) bool { + return ch >= '0' && ch <= '9' +} + +var eof = rune(0)