Add Wavefront parser (#4402)
This commit is contained in:
parent
b9ff1d042b
commit
6454319062
|
@ -10,6 +10,7 @@ Telegraf is able to parse the following input data formats into metrics:
|
|||
1. [Collectd](#collectd)
|
||||
1. [Dropwizard](#dropwizard)
|
||||
1. [Grok](#grok)
|
||||
1. [Wavefront](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#wavefront)
|
||||
|
||||
Telegraf metrics, like InfluxDB
|
||||
[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/),
|
||||
|
@ -881,3 +882,29 @@ the file output will only print once per `flush_interval`.
|
|||
- Continue one token at a time until the entire line is successfully parsed.
|
||||
|
||||
|
||||
```
|
||||
|
||||
# Wavefront:
|
||||
|
||||
Wavefront Data Format is metrics are parsed directly into Telegraf metrics.
|
||||
For more information about the Wavefront Data Format see
|
||||
[here](https://docs.wavefront.com/wavefront_data_format.html).
|
||||
|
||||
There are no additional configuration options for Wavefront Data Format line-protocol.
|
||||
|
||||
#### Wavefront Configuration:
|
||||
|
||||
```toml
|
||||
[[inputs.exec]]
|
||||
## Commands array
|
||||
commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"]
|
||||
|
||||
## measurement name suffix (for separating different commands)
|
||||
name_suffix = "_mycollector"
|
||||
|
||||
## Data format to consume.
|
||||
## Each data format has its own unique set of configuration options, read
|
||||
## more about them here:
|
||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||
data_format = "wavefront"
|
||||
```
|
||||
|
|
|
@ -189,26 +189,32 @@ func buildTags(mTags map[string]string, w *Wavefront) (string, map[string]string
|
|||
}
|
||||
|
||||
var source string
|
||||
sourceTagFound := false
|
||||
|
||||
for _, s := range w.SourceOverride {
|
||||
for k, v := range mTags {
|
||||
if k == s {
|
||||
source = v
|
||||
mTags["telegraf_host"] = mTags["host"]
|
||||
sourceTagFound = true
|
||||
delete(mTags, k)
|
||||
if s, ok := mTags["source"]; ok {
|
||||
source = s
|
||||
delete(mTags, "source")
|
||||
} else {
|
||||
sourceTagFound := false
|
||||
for _, s := range w.SourceOverride {
|
||||
for k, v := range mTags {
|
||||
if k == s {
|
||||
source = v
|
||||
mTags["telegraf_host"] = mTags["host"]
|
||||
sourceTagFound = true
|
||||
delete(mTags, k)
|
||||
break
|
||||
}
|
||||
}
|
||||
if sourceTagFound {
|
||||
break
|
||||
}
|
||||
}
|
||||
if sourceTagFound {
|
||||
break
|
||||
|
||||
if !sourceTagFound {
|
||||
source = mTags["host"]
|
||||
}
|
||||
}
|
||||
|
||||
if !sourceTagFound {
|
||||
source = mTags["host"]
|
||||
}
|
||||
delete(mTags, "host")
|
||||
|
||||
return tagValueReplacer.Replace(source), mTags
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/influxdata/telegraf/plugins/parsers/json"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/nagios"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/value"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/wavefront"
|
||||
)
|
||||
|
||||
// ParserInput is an interface for input plugins that are able to parse
|
||||
|
@ -131,6 +132,8 @@ func NewParser(config *Config) (Parser, error) {
|
|||
config.DefaultTags,
|
||||
config.Separator,
|
||||
config.Templates)
|
||||
case "wavefront":
|
||||
parser, err = NewWavefrontParser(config.DefaultTags)
|
||||
case "grok":
|
||||
parser, err = newGrokParser(
|
||||
config.MetricName,
|
||||
|
@ -238,3 +241,7 @@ func NewDropwizardParser(
|
|||
}
|
||||
return parser, err
|
||||
}
|
||||
|
||||
func NewWavefrontParser(defaultTags map[string]string) (Parser, error) {
|
||||
return wavefront.NewWavefrontParser(defaultTags), nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,238 @@
|
|||
package wavefront
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrEOF = errors.New("EOF")
|
||||
ErrInvalidTimestamp = errors.New("Invalid timestamp")
|
||||
)
|
||||
|
||||
// Interface for parsing line elements.
|
||||
type ElementParser interface {
|
||||
parse(p *PointParser, pt *Point) error
|
||||
}
|
||||
|
||||
type NameParser struct{}
|
||||
type ValueParser struct{}
|
||||
type TimestampParser struct {
|
||||
optional bool
|
||||
}
|
||||
type WhiteSpaceParser struct {
|
||||
nextOptional bool
|
||||
}
|
||||
type TagParser struct{}
|
||||
type LoopedParser struct {
|
||||
wrappedParser ElementParser
|
||||
wsPaser *WhiteSpaceParser
|
||||
}
|
||||
type LiteralParser struct {
|
||||
literal string
|
||||
}
|
||||
|
||||
func (ep *NameParser) parse(p *PointParser, pt *Point) error {
|
||||
//Valid characters are: a-z, A-Z, 0-9, hyphen ("-"), underscore ("_"), dot (".").
|
||||
// Forward slash ("/") and comma (",") are allowed if metricName is enclosed in double quotes.
|
||||
name, err := parseLiteral(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pt.Name = name
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ep *ValueParser) parse(p *PointParser, pt *Point) error {
|
||||
tok, lit := p.scan()
|
||||
if tok == EOF {
|
||||
return fmt.Errorf("found %q, expected number", lit)
|
||||
}
|
||||
|
||||
p.writeBuf.Reset()
|
||||
if tok == MINUS_SIGN {
|
||||
p.writeBuf.WriteString(lit)
|
||||
tok, lit = p.scan()
|
||||
}
|
||||
|
||||
for tok != EOF && (tok == LETTER || tok == NUMBER || tok == DOT) {
|
||||
p.writeBuf.WriteString(lit)
|
||||
tok, lit = p.scan()
|
||||
}
|
||||
p.unscan()
|
||||
|
||||
pt.Value = p.writeBuf.String()
|
||||
_, err := strconv.ParseFloat(pt.Value, 64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid metric value %s", pt.Value)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ep *TimestampParser) parse(p *PointParser, pt *Point) error {
|
||||
tok, lit := p.scan()
|
||||
if tok == EOF {
|
||||
if ep.optional {
|
||||
p.unscanTokens(2)
|
||||
return setTimestamp(pt, 0, 1)
|
||||
}
|
||||
return fmt.Errorf("found %q, expected number", lit)
|
||||
}
|
||||
|
||||
if tok != NUMBER {
|
||||
if ep.optional {
|
||||
p.unscanTokens(2)
|
||||
return setTimestamp(pt, 0, 1)
|
||||
}
|
||||
return ErrInvalidTimestamp
|
||||
}
|
||||
|
||||
p.writeBuf.Reset()
|
||||
for tok != EOF && tok == NUMBER {
|
||||
p.writeBuf.WriteString(lit)
|
||||
tok, lit = p.scan()
|
||||
}
|
||||
p.unscan()
|
||||
|
||||
tsStr := p.writeBuf.String()
|
||||
ts, err := strconv.ParseInt(tsStr, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return setTimestamp(pt, ts, len(tsStr))
|
||||
}
|
||||
|
||||
func setTimestamp(pt *Point, ts int64, numDigits int) error {
|
||||
|
||||
if numDigits == 19 {
|
||||
// nanoseconds
|
||||
ts = ts / 1e9
|
||||
} else if numDigits == 16 {
|
||||
// microseconds
|
||||
ts = ts / 1e6
|
||||
} else if numDigits == 13 {
|
||||
// milliseconds
|
||||
ts = ts / 1e3
|
||||
} else if numDigits != 10 {
|
||||
// must be in seconds, return error if not 0
|
||||
if ts == 0 {
|
||||
ts = getCurrentTime()
|
||||
} else {
|
||||
return ErrInvalidTimestamp
|
||||
}
|
||||
}
|
||||
pt.Timestamp = ts
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ep *LoopedParser) parse(p *PointParser, pt *Point) error {
|
||||
for {
|
||||
err := ep.wrappedParser.parse(p, pt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = ep.wsPaser.parse(p, pt)
|
||||
if err == ErrEOF {
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ep *TagParser) parse(p *PointParser, pt *Point) error {
|
||||
k, err := parseLiteral(p)
|
||||
if err != nil {
|
||||
if k == "" {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
next, lit := p.scan()
|
||||
if next != EQUALS {
|
||||
return fmt.Errorf("found %q, expected equals", lit)
|
||||
}
|
||||
|
||||
v, err := parseLiteral(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(pt.Tags) == 0 {
|
||||
pt.Tags = make(map[string]string)
|
||||
}
|
||||
pt.Tags[k] = v
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ep *WhiteSpaceParser) parse(p *PointParser, pt *Point) error {
|
||||
tok := WS
|
||||
for tok != EOF && tok == WS {
|
||||
tok, _ = p.scan()
|
||||
}
|
||||
|
||||
if tok == EOF {
|
||||
if !ep.nextOptional {
|
||||
return ErrEOF
|
||||
}
|
||||
return nil
|
||||
}
|
||||
p.unscan()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ep *LiteralParser) parse(p *PointParser, pt *Point) error {
|
||||
l, err := parseLiteral(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if l != ep.literal {
|
||||
return fmt.Errorf("found %s, expected %s", l, ep.literal)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseQuotedLiteral(p *PointParser) (string, error) {
|
||||
p.writeBuf.Reset()
|
||||
|
||||
escaped := false
|
||||
tok, lit := p.scan()
|
||||
for tok != EOF && (tok != QUOTES || (tok == QUOTES && escaped)) {
|
||||
// let everything through
|
||||
escaped = tok == BACKSLASH
|
||||
p.writeBuf.WriteString(lit)
|
||||
tok, lit = p.scan()
|
||||
}
|
||||
if tok == EOF {
|
||||
return "", fmt.Errorf("found %q, expected quotes", lit)
|
||||
}
|
||||
return p.writeBuf.String(), nil
|
||||
}
|
||||
|
||||
func parseLiteral(p *PointParser) (string, error) {
|
||||
tok, lit := p.scan()
|
||||
if tok == EOF {
|
||||
return "", fmt.Errorf("found %q, expected literal", lit)
|
||||
}
|
||||
|
||||
if tok == QUOTES {
|
||||
return parseQuotedLiteral(p)
|
||||
}
|
||||
|
||||
p.writeBuf.Reset()
|
||||
for tok != EOF && tok > literal_beg && tok < literal_end {
|
||||
p.writeBuf.WriteString(lit)
|
||||
tok, lit = p.scan()
|
||||
}
|
||||
if tok == QUOTES {
|
||||
return "", errors.New("found quote inside unquoted literal")
|
||||
}
|
||||
p.unscan()
|
||||
return p.writeBuf.String(), nil
|
||||
}
|
||||
|
||||
func getCurrentTime() int64 {
|
||||
return time.Now().UnixNano() / 1e9
|
||||
}
|
|
@ -0,0 +1,203 @@
|
|||
package wavefront
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"io"
|
||||
"log"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
)
|
||||
|
||||
const MAX_BUFFER_SIZE = 2
|
||||
|
||||
type Point struct {
|
||||
Name string
|
||||
Value string
|
||||
Timestamp int64
|
||||
Source string
|
||||
Tags map[string]string
|
||||
}
|
||||
|
||||
// Parser represents a parser.
|
||||
type PointParser struct {
|
||||
s *PointScanner
|
||||
buf struct {
|
||||
tok []Token // last read n tokens
|
||||
lit []string // last read n literals
|
||||
n int // unscanned buffer size (max=2)
|
||||
}
|
||||
scanBuf bytes.Buffer // buffer reused for scanning tokens
|
||||
writeBuf bytes.Buffer // buffer reused for parsing elements
|
||||
Elements []ElementParser
|
||||
defaultTags map[string]string
|
||||
}
|
||||
|
||||
// Returns a slice of ElementParser's for the Graphite format
|
||||
func NewWavefrontElements() []ElementParser {
|
||||
var elements []ElementParser
|
||||
wsParser := WhiteSpaceParser{}
|
||||
wsParserNextOpt := WhiteSpaceParser{nextOptional: true}
|
||||
repeatParser := LoopedParser{wrappedParser: &TagParser{}, wsPaser: &wsParser}
|
||||
elements = append(elements, &NameParser{}, &wsParser, &ValueParser{}, &wsParserNextOpt,
|
||||
&TimestampParser{optional: true}, &wsParserNextOpt, &repeatParser)
|
||||
return elements
|
||||
}
|
||||
|
||||
func NewWavefrontParser(defaultTags map[string]string) *PointParser {
|
||||
elements := NewWavefrontElements()
|
||||
return &PointParser{Elements: elements, defaultTags: defaultTags}
|
||||
}
|
||||
|
||||
func (p *PointParser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
||||
|
||||
// parse even if the buffer begins with a newline
|
||||
buf = bytes.TrimPrefix(buf, []byte("\n"))
|
||||
// add newline to end if not exists:
|
||||
if len(buf) > 0 && !bytes.HasSuffix(buf, []byte("\n")) {
|
||||
buf = append(buf, []byte("\n")...)
|
||||
}
|
||||
|
||||
points := make([]Point, 0)
|
||||
|
||||
buffer := bytes.NewBuffer(buf)
|
||||
reader := bufio.NewReader(buffer)
|
||||
for {
|
||||
// Read up to the next newline.
|
||||
buf, err := reader.ReadBytes('\n')
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
p.reset(buf)
|
||||
point := Point{}
|
||||
for _, element := range p.Elements {
|
||||
err := element.parse(p, &point)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
points = append(points, point)
|
||||
}
|
||||
|
||||
metrics, err := p.convertPointToTelegrafMetric(points)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
func (p *PointParser) ParseLine(line string) (telegraf.Metric, error) {
|
||||
buf := []byte(line)
|
||||
metrics, err := p.Parse(buf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(metrics) > 0 {
|
||||
return metrics[0], nil
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (p *PointParser) SetDefaultTags(tags map[string]string) {
|
||||
p.defaultTags = tags
|
||||
}
|
||||
|
||||
func (p *PointParser) convertPointToTelegrafMetric(points []Point) ([]telegraf.Metric, error) {
|
||||
|
||||
metrics := make([]telegraf.Metric, 0)
|
||||
|
||||
for _, point := range points {
|
||||
tags := make(map[string]string)
|
||||
for k, v := range point.Tags {
|
||||
tags[k] = v
|
||||
}
|
||||
// apply default tags after parsed tags
|
||||
for k, v := range p.defaultTags {
|
||||
tags[k] = v
|
||||
}
|
||||
|
||||
// single field for value
|
||||
fields := make(map[string]interface{})
|
||||
v, err := strconv.ParseFloat(point.Value, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fields["value"] = v
|
||||
|
||||
m, err := metric.New(point.Name, tags, fields, time.Unix(point.Timestamp, 0))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
metrics = append(metrics, m)
|
||||
}
|
||||
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
// scan returns the next token from the underlying scanner.
|
||||
// If a token has been unscanned then read that from the internal buffer instead.
|
||||
func (p *PointParser) scan() (Token, string) {
|
||||
// If we have a token on the buffer, then return it.
|
||||
if p.buf.n != 0 {
|
||||
idx := p.buf.n % MAX_BUFFER_SIZE
|
||||
tok, lit := p.buf.tok[idx], p.buf.lit[idx]
|
||||
p.buf.n -= 1
|
||||
return tok, lit
|
||||
}
|
||||
|
||||
// Otherwise read the next token from the scanner.
|
||||
tok, lit := p.s.Scan()
|
||||
|
||||
// Save it to the buffer in case we unscan later.
|
||||
p.buffer(tok, lit)
|
||||
|
||||
return tok, lit
|
||||
}
|
||||
|
||||
func (p *PointParser) buffer(tok Token, lit string) {
|
||||
// create the buffer if it is empty
|
||||
if len(p.buf.tok) == 0 {
|
||||
p.buf.tok = make([]Token, MAX_BUFFER_SIZE)
|
||||
p.buf.lit = make([]string, MAX_BUFFER_SIZE)
|
||||
}
|
||||
|
||||
// for now assume a simple circular buffer of length two
|
||||
p.buf.tok[0], p.buf.lit[0] = p.buf.tok[1], p.buf.lit[1]
|
||||
p.buf.tok[1], p.buf.lit[1] = tok, lit
|
||||
}
|
||||
|
||||
// unscan pushes the previously read token back onto the buffer.
|
||||
func (p *PointParser) unscan() {
|
||||
p.unscanTokens(1)
|
||||
}
|
||||
|
||||
func (p *PointParser) unscanTokens(n int) {
|
||||
if n > MAX_BUFFER_SIZE {
|
||||
// just log for now
|
||||
log.Printf("cannot unscan more than %d tokens", MAX_BUFFER_SIZE)
|
||||
}
|
||||
p.buf.n += n
|
||||
}
|
||||
|
||||
func (p *PointParser) reset(buf []byte) {
|
||||
|
||||
// reset the scan buffer and write new byte
|
||||
p.scanBuf.Reset()
|
||||
p.scanBuf.Write(buf)
|
||||
|
||||
if p.s == nil {
|
||||
p.s = NewScanner(&p.scanBuf)
|
||||
} else {
|
||||
// reset p.s.r passing in the buffer as the reader
|
||||
p.s.r.Reset(&p.scanBuf)
|
||||
}
|
||||
p.buf.n = 0
|
||||
}
|
|
@ -0,0 +1,204 @@
|
|||
package wavefront
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestParse(t *testing.T) {
|
||||
parser := NewWavefrontParser(nil)
|
||||
|
||||
parsedMetrics, err := parser.Parse([]byte("test.metric 1"))
|
||||
assert.NoError(t, err)
|
||||
testMetric, err := metric.New("test.metric", map[string]string{}, map[string]interface{}{"value": 1.}, time.Unix(0, 0))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, parsedMetrics[0].Name(), testMetric.Name())
|
||||
assert.Equal(t, parsedMetrics[0].Fields(), testMetric.Fields())
|
||||
|
||||
parsedMetrics, err = parser.Parse([]byte("test.metric 1 1530939936"))
|
||||
assert.NoError(t, err)
|
||||
testMetric, err = metric.New("test.metric", map[string]string{}, map[string]interface{}{"value": 1.}, time.Unix(1530939936, 0))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, parsedMetrics[0], testMetric)
|
||||
|
||||
parsedMetrics, err = parser.Parse([]byte("test.metric 1 1530939936 source=mysource"))
|
||||
assert.NoError(t, err)
|
||||
testMetric, err = metric.New("test.metric", map[string]string{"source": "mysource"}, map[string]interface{}{"value": 1.}, time.Unix(1530939936, 0))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, parsedMetrics[0], testMetric)
|
||||
|
||||
parsedMetrics, err = parser.Parse([]byte("\"test.metric\" 1.1234 1530939936 source=\"mysource\""))
|
||||
assert.NoError(t, err)
|
||||
testMetric, err = metric.New("test.metric", map[string]string{"source": "mysource"}, map[string]interface{}{"value": 1.1234}, time.Unix(1530939936, 0))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, parsedMetrics[0], testMetric)
|
||||
|
||||
parsedMetrics, err = parser.Parse([]byte("\"test.metric\" 1.1234 1530939936 \"source\"=\"mysource\" tag2=value2"))
|
||||
assert.NoError(t, err)
|
||||
testMetric, err = metric.New("test.metric", map[string]string{"source": "mysource", "tag2": "value2"}, map[string]interface{}{"value": 1.1234}, time.Unix(1530939936, 0))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, parsedMetrics[0], testMetric)
|
||||
|
||||
parsedMetrics, err = parser.Parse([]byte("test.metric 1.1234 1530939936 source=\"mysource\" tag2=value2 "))
|
||||
assert.NoError(t, err)
|
||||
testMetric, err = metric.New("test.metric", map[string]string{"source": "mysource", "tag2": "value2"}, map[string]interface{}{"value": 1.1234}, time.Unix(1530939936, 0))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, parsedMetrics[0], testMetric)
|
||||
|
||||
}
|
||||
|
||||
func TestParseLine(t *testing.T) {
|
||||
parser := NewWavefrontParser(nil)
|
||||
|
||||
parsedMetric, err := parser.ParseLine("test.metric 1")
|
||||
assert.NoError(t, err)
|
||||
testMetric, err := metric.New("test.metric", map[string]string{}, map[string]interface{}{"value": 1.}, time.Unix(0, 0))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, parsedMetric.Name(), testMetric.Name())
|
||||
assert.Equal(t, parsedMetric.Fields(), testMetric.Fields())
|
||||
|
||||
parsedMetric, err = parser.ParseLine("test.metric 1 1530939936")
|
||||
assert.NoError(t, err)
|
||||
testMetric, err = metric.New("test.metric", map[string]string{}, map[string]interface{}{"value": 1.}, time.Unix(1530939936, 0))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, parsedMetric, testMetric)
|
||||
|
||||
parsedMetric, err = parser.ParseLine("test.metric 1 1530939936 source=mysource")
|
||||
assert.NoError(t, err)
|
||||
testMetric, err = metric.New("test.metric", map[string]string{"source": "mysource"}, map[string]interface{}{"value": 1.}, time.Unix(1530939936, 0))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, parsedMetric, testMetric)
|
||||
|
||||
parsedMetric, err = parser.ParseLine("\"test.metric\" 1.1234 1530939936 source=\"mysource\"")
|
||||
assert.NoError(t, err)
|
||||
testMetric, err = metric.New("test.metric", map[string]string{"source": "mysource"}, map[string]interface{}{"value": 1.1234}, time.Unix(1530939936, 0))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, parsedMetric, testMetric)
|
||||
|
||||
parsedMetric, err = parser.ParseLine("\"test.metric\" 1.1234 1530939936 \"source\"=\"mysource\" tag2=value2")
|
||||
assert.NoError(t, err)
|
||||
testMetric, err = metric.New("test.metric", map[string]string{"source": "mysource", "tag2": "value2"}, map[string]interface{}{"value": 1.1234}, time.Unix(1530939936, 0))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, parsedMetric, testMetric)
|
||||
|
||||
parsedMetric, err = parser.ParseLine("test.metric 1.1234 1530939936 source=\"mysource\" tag2=value2 ")
|
||||
assert.NoError(t, err)
|
||||
testMetric, err = metric.New("test.metric", map[string]string{"source": "mysource", "tag2": "value2"}, map[string]interface{}{"value": 1.1234}, time.Unix(1530939936, 0))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, parsedMetric, testMetric)
|
||||
}
|
||||
|
||||
func TestParseMultiple(t *testing.T) {
|
||||
parser := NewWavefrontParser(nil)
|
||||
|
||||
parsedMetrics, err := parser.Parse([]byte("test.metric 1\ntest.metric2 2 1530939936"))
|
||||
assert.NoError(t, err)
|
||||
testMetric1, err := metric.New("test.metric", map[string]string{}, map[string]interface{}{"value": 1.}, time.Unix(0, 0))
|
||||
assert.NoError(t, err)
|
||||
testMetric2, err := metric.New("test.metric2", map[string]string{}, map[string]interface{}{"value": 2.}, time.Unix(1530939936, 0))
|
||||
assert.NoError(t, err)
|
||||
testMetrics := []telegraf.Metric{testMetric1, testMetric2}
|
||||
assert.Equal(t, parsedMetrics[0].Name(), testMetrics[0].Name())
|
||||
assert.Equal(t, parsedMetrics[0].Fields(), testMetrics[0].Fields())
|
||||
assert.EqualValues(t, parsedMetrics[1], testMetrics[1])
|
||||
|
||||
parsedMetrics, err = parser.Parse([]byte("test.metric 1 1530939936 source=mysource\n\"test.metric\" 1.1234 1530939936 source=\"mysource\""))
|
||||
assert.NoError(t, err)
|
||||
testMetric1, err = metric.New("test.metric", map[string]string{"source": "mysource"}, map[string]interface{}{"value": 1.}, time.Unix(1530939936, 0))
|
||||
assert.NoError(t, err)
|
||||
testMetric2, err = metric.New("test.metric", map[string]string{"source": "mysource"}, map[string]interface{}{"value": 1.1234}, time.Unix(1530939936, 0))
|
||||
assert.NoError(t, err)
|
||||
testMetrics = []telegraf.Metric{testMetric1, testMetric2}
|
||||
assert.EqualValues(t, parsedMetrics, testMetrics)
|
||||
|
||||
parsedMetrics, err = parser.Parse([]byte("\"test.metric\" 1.1234 1530939936 \"source\"=\"mysource\" tag2=value2\ntest.metric 1.1234 1530939936 source=\"mysource\" tag2=value2 "))
|
||||
assert.NoError(t, err)
|
||||
testMetric1, err = metric.New("test.metric", map[string]string{"source": "mysource", "tag2": "value2"}, map[string]interface{}{"value": 1.1234}, time.Unix(1530939936, 0))
|
||||
assert.NoError(t, err)
|
||||
testMetric2, err = metric.New("test.metric", map[string]string{"source": "mysource", "tag2": "value2"}, map[string]interface{}{"value": 1.1234}, time.Unix(1530939936, 0))
|
||||
assert.NoError(t, err)
|
||||
testMetrics = []telegraf.Metric{testMetric1, testMetric2}
|
||||
assert.EqualValues(t, parsedMetrics, testMetrics)
|
||||
|
||||
parsedMetrics, err = parser.Parse([]byte("test.metric 1 1530939936 source=mysource\n\"test.metric\" 1.1234 1530939936 source=\"mysource\"\ntest.metric3 333 1530939936 tagit=valueit"))
|
||||
assert.NoError(t, err)
|
||||
testMetric1, err = metric.New("test.metric", map[string]string{"source": "mysource"}, map[string]interface{}{"value": 1.}, time.Unix(1530939936, 0))
|
||||
assert.NoError(t, err)
|
||||
testMetric2, err = metric.New("test.metric", map[string]string{"source": "mysource"}, map[string]interface{}{"value": 1.1234}, time.Unix(1530939936, 0))
|
||||
assert.NoError(t, err)
|
||||
testMetric3, err := metric.New("test.metric3", map[string]string{"tagit": "valueit"}, map[string]interface{}{"value": 333.}, time.Unix(1530939936, 0))
|
||||
assert.NoError(t, err)
|
||||
testMetrics = []telegraf.Metric{testMetric1, testMetric2, testMetric3}
|
||||
assert.EqualValues(t, parsedMetrics, testMetrics)
|
||||
|
||||
}
|
||||
|
||||
func TestParseSpecial(t *testing.T) {
|
||||
parser := NewWavefrontParser(nil)
|
||||
|
||||
parsedMetric, err := parser.ParseLine("\"test.metric\" 1 1530939936")
|
||||
assert.NoError(t, err)
|
||||
testMetric, err := metric.New("test.metric", map[string]string{}, map[string]interface{}{"value": 1.}, time.Unix(1530939936, 0))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, parsedMetric, testMetric)
|
||||
|
||||
parsedMetric, err = parser.ParseLine("test.metric 1 1530939936 tag1=\"val\\\"ue1\"")
|
||||
assert.NoError(t, err)
|
||||
testMetric, err = metric.New("test.metric", map[string]string{"tag1": "val\\\"ue1"}, map[string]interface{}{"value": 1.}, time.Unix(1530939936, 0))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, parsedMetric, testMetric)
|
||||
|
||||
}
|
||||
|
||||
func TestParseInvalid(t *testing.T) {
|
||||
parser := NewWavefrontParser(nil)
|
||||
|
||||
_, err := parser.Parse([]byte("test.metric"))
|
||||
assert.Error(t, err)
|
||||
|
||||
_, err = parser.Parse([]byte("test.metric string"))
|
||||
assert.Error(t, err)
|
||||
|
||||
_, err = parser.Parse([]byte("test.metric 1 string"))
|
||||
assert.Error(t, err)
|
||||
|
||||
_, err = parser.Parse([]byte("test.metric 1 1530939936 tag_no_pair"))
|
||||
assert.Error(t, err)
|
||||
|
||||
_, err = parser.Parse([]byte("test.metric 1 1530939936 tag_broken_value=\""))
|
||||
assert.Error(t, err)
|
||||
|
||||
_, err = parser.Parse([]byte("\"test.metric 1 1530939936"))
|
||||
assert.Error(t, err)
|
||||
|
||||
_, err = parser.Parse([]byte("test.metric 1 1530939936 tag1=val\\\"ue1"))
|
||||
assert.Error(t, err)
|
||||
|
||||
}
|
||||
|
||||
func TestParseDefaultTags(t *testing.T) {
|
||||
parser := NewWavefrontParser(map[string]string{"myDefault": "value1", "another": "test2"})
|
||||
|
||||
parsedMetrics, err := parser.Parse([]byte("test.metric 1 1530939936"))
|
||||
assert.NoError(t, err)
|
||||
testMetric, err := metric.New("test.metric", map[string]string{"myDefault": "value1", "another": "test2"}, map[string]interface{}{"value": 1.}, time.Unix(1530939936, 0))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, parsedMetrics[0], testMetric)
|
||||
|
||||
parsedMetrics, err = parser.Parse([]byte("test.metric 1 1530939936 source=mysource"))
|
||||
assert.NoError(t, err)
|
||||
testMetric, err = metric.New("test.metric", map[string]string{"myDefault": "value1", "another": "test2", "source": "mysource"}, map[string]interface{}{"value": 1.}, time.Unix(1530939936, 0))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, parsedMetrics[0], testMetric)
|
||||
|
||||
parsedMetrics, err = parser.Parse([]byte("\"test.metric\" 1.1234 1530939936 another=\"test3\""))
|
||||
assert.NoError(t, err)
|
||||
testMetric, err = metric.New("test.metric", map[string]string{"myDefault": "value1", "another": "test2"}, map[string]interface{}{"value": 1.1234}, time.Unix(1530939936, 0))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, parsedMetrics[0], testMetric)
|
||||
|
||||
}
|
|
@ -0,0 +1,69 @@
|
|||
package wavefront
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"io"
|
||||
)
|
||||
|
||||
// Lexical Point Scanner
|
||||
type PointScanner struct {
|
||||
r *bufio.Reader
|
||||
}
|
||||
|
||||
func NewScanner(r io.Reader) *PointScanner {
|
||||
return &PointScanner{r: bufio.NewReader(r)}
|
||||
}
|
||||
|
||||
// read reads the next rune from the buffered reader.
|
||||
// Returns rune(0) if an error occurs (or io.EOF is returned).
|
||||
func (s *PointScanner) read() rune {
|
||||
ch, _, err := s.r.ReadRune()
|
||||
if err != nil {
|
||||
return eof
|
||||
}
|
||||
return ch
|
||||
}
|
||||
|
||||
// unread places the previously read rune back on the reader.
|
||||
func (s *PointScanner) unread() {
|
||||
_ = s.r.UnreadRune()
|
||||
}
|
||||
|
||||
// Scan returns the next token and literal value.
|
||||
func (s *PointScanner) Scan() (Token, string) {
|
||||
|
||||
// Read the next rune
|
||||
ch := s.read()
|
||||
if isWhitespace(ch) {
|
||||
return WS, string(ch)
|
||||
} else if isLetter(ch) {
|
||||
return LETTER, string(ch)
|
||||
} else if isNumber(ch) {
|
||||
return NUMBER, string(ch)
|
||||
}
|
||||
|
||||
// Otherwise read the individual character.
|
||||
switch ch {
|
||||
case eof:
|
||||
return EOF, ""
|
||||
case '\n':
|
||||
return NEWLINE, string(ch)
|
||||
case '.':
|
||||
return DOT, string(ch)
|
||||
case '-':
|
||||
return MINUS_SIGN, string(ch)
|
||||
case '_':
|
||||
return UNDERSCORE, string(ch)
|
||||
case '/':
|
||||
return SLASH, string(ch)
|
||||
case '\\':
|
||||
return BACKSLASH, string(ch)
|
||||
case ',':
|
||||
return COMMA, string(ch)
|
||||
case '"':
|
||||
return QUOTES, string(ch)
|
||||
case '=':
|
||||
return EQUALS, string(ch)
|
||||
}
|
||||
return ILLEGAL, string(ch)
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
package wavefront
|
||||
|
||||
type Token int
|
||||
|
||||
const (
|
||||
// Special tokens
|
||||
ILLEGAL Token = iota
|
||||
EOF
|
||||
WS
|
||||
|
||||
// Literals
|
||||
literal_beg
|
||||
LETTER // metric name, source/point tags
|
||||
NUMBER
|
||||
MINUS_SIGN
|
||||
UNDERSCORE
|
||||
DOT
|
||||
SLASH
|
||||
BACKSLASH
|
||||
COMMA
|
||||
literal_end
|
||||
|
||||
// Misc characters
|
||||
QUOTES
|
||||
EQUALS
|
||||
NEWLINE
|
||||
)
|
||||
|
||||
func isWhitespace(ch rune) bool {
|
||||
return ch == ' ' || ch == '\t' || ch == '\n'
|
||||
}
|
||||
|
||||
func isLetter(ch rune) bool {
|
||||
return (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z')
|
||||
}
|
||||
|
||||
func isNumber(ch rune) bool {
|
||||
return ch >= '0' && ch <= '9'
|
||||
}
|
||||
|
||||
var eof = rune(0)
|
Loading…
Reference in New Issue