Fix race condition in the Wavefront parser (#5764)

This commit is contained in:
Pontus Rydin 2019-04-25 15:22:48 -04:00 committed by Daniel Nelson
parent f5b44fd0bd
commit f32b064d6a
1 changed files with 46 additions and 23 deletions

View File

@ -6,6 +6,7 @@ import (
"io"
"log"
"strconv"
"sync"
"time"
"github.com/influxdata/telegraf"
@ -22,7 +23,12 @@ type Point struct {
Tags map[string]string
}
// Parser represents a parser.
type WavefrontParser struct {
parsers *sync.Pool
defaultTags map[string]string
}
// PointParser is a thread-unsafe parser and must be kept in a pool.
type PointParser struct {
s *PointScanner
buf struct {
@ -30,10 +36,10 @@ type PointParser struct {
lit []string // last read n literals
n int // unscanned buffer size (max=2)
}
scanBuf bytes.Buffer // buffer reused for scanning tokens
writeBuf bytes.Buffer // buffer reused for parsing elements
Elements []ElementParser
defaultTags map[string]string
scanBuf bytes.Buffer // buffer reused for scanning tokens
writeBuf bytes.Buffer // buffer reused for parsing elements
Elements []ElementParser
parent *WavefrontParser
}
// Returns a slice of ElementParser's for the Graphite format
@ -47,9 +53,40 @@ func NewWavefrontElements() []ElementParser {
return elements
}
func NewWavefrontParser(defaultTags map[string]string) *PointParser {
func NewWavefrontParser(defaultTags map[string]string) *WavefrontParser {
wp := &WavefrontParser{defaultTags: defaultTags}
wp.parsers = &sync.Pool{
New: func() interface{} {
return NewPointParser(wp)
},
}
return wp
}
func NewPointParser(parent *WavefrontParser) *PointParser {
elements := NewWavefrontElements()
return &PointParser{Elements: elements, defaultTags: defaultTags}
return &PointParser{Elements: elements, parent: parent}
}
func (p *WavefrontParser) ParseLine(line string) (telegraf.Metric, error) {
buf := []byte(line)
metrics, err := p.Parse(buf)
if err != nil {
return nil, err
}
if len(metrics) > 0 {
return metrics[0], nil
}
return nil, nil
}
func (p *WavefrontParser) Parse(buf []byte) ([]telegraf.Metric, error) {
pp := p.parsers.Get().(*PointParser)
defer p.parsers.Put(pp)
return pp.Parse(buf)
}
func (p *PointParser) Parse(buf []byte) ([]telegraf.Metric, error) {
@ -91,21 +128,7 @@ func (p *PointParser) Parse(buf []byte) ([]telegraf.Metric, error) {
return metrics, nil
}
func (p *PointParser) ParseLine(line string) (telegraf.Metric, error) {
buf := []byte(line)
metrics, err := p.Parse(buf)
if err != nil {
return nil, err
}
if len(metrics) > 0 {
return metrics[0], nil
}
return nil, nil
}
func (p *PointParser) SetDefaultTags(tags map[string]string) {
func (p *WavefrontParser) SetDefaultTags(tags map[string]string) {
p.defaultTags = tags
}
@ -119,7 +142,7 @@ func (p *PointParser) convertPointToTelegrafMetric(points []Point) ([]telegraf.M
tags[k] = v
}
// apply default tags after parsed tags
for k, v := range p.defaultTags {
for k, v := range p.parent.defaultTags {
tags[k] = v
}