From f32b064d6a6f38c6c809b09ca8ae4ef53937e9d8 Mon Sep 17 00:00:00 2001 From: Pontus Rydin Date: Thu, 25 Apr 2019 15:22:48 -0400 Subject: [PATCH] Fix race condition in the Wavefront parser (#5764) --- plugins/parsers/wavefront/parser.go | 69 +++++++++++++++++++---------- 1 file changed, 46 insertions(+), 23 deletions(-) diff --git a/plugins/parsers/wavefront/parser.go b/plugins/parsers/wavefront/parser.go index f5fc88dbf..62fe8f5ef 100644 --- a/plugins/parsers/wavefront/parser.go +++ b/plugins/parsers/wavefront/parser.go @@ -6,6 +6,7 @@ import ( "io" "log" "strconv" + "sync" "time" "github.com/influxdata/telegraf" @@ -22,7 +23,12 @@ type Point struct { Tags map[string]string } -// Parser represents a parser. +type WavefrontParser struct { + parsers *sync.Pool + defaultTags map[string]string +} + +// PointParser is a thread-unsafe parser and must be kept in a pool. type PointParser struct { s *PointScanner buf struct { @@ -30,10 +36,10 @@ type PointParser struct { lit []string // last read n literals n int // unscanned buffer size (max=2) } - scanBuf bytes.Buffer // buffer reused for scanning tokens - writeBuf bytes.Buffer // buffer reused for parsing elements - Elements []ElementParser - defaultTags map[string]string + scanBuf bytes.Buffer // buffer reused for scanning tokens + writeBuf bytes.Buffer // buffer reused for parsing elements + Elements []ElementParser + parent *WavefrontParser } // Returns a slice of ElementParser's for the Graphite format @@ -47,9 +53,40 @@ func NewWavefrontElements() []ElementParser { return elements } -func NewWavefrontParser(defaultTags map[string]string) *PointParser { +func NewWavefrontParser(defaultTags map[string]string) *WavefrontParser { + wp := &WavefrontParser{defaultTags: defaultTags} + wp.parsers = &sync.Pool{ + New: func() interface{} { + return NewPointParser(wp) + }, + } + return wp +} + +func NewPointParser(parent *WavefrontParser) *PointParser { elements := NewWavefrontElements() - return &PointParser{Elements: elements, defaultTags: defaultTags} + return &PointParser{Elements: elements, parent: parent} +} + +func (p *WavefrontParser) ParseLine(line string) (telegraf.Metric, error) { + buf := []byte(line) + + metrics, err := p.Parse(buf) + if err != nil { + return nil, err + } + + if len(metrics) > 0 { + return metrics[0], nil + } + + return nil, nil +} + +func (p *WavefrontParser) Parse(buf []byte) ([]telegraf.Metric, error) { + pp := p.parsers.Get().(*PointParser) + defer p.parsers.Put(pp) + return pp.Parse(buf) } func (p *PointParser) Parse(buf []byte) ([]telegraf.Metric, error) { @@ -91,21 +128,7 @@ func (p *PointParser) Parse(buf []byte) ([]telegraf.Metric, error) { return metrics, nil } -func (p *PointParser) ParseLine(line string) (telegraf.Metric, error) { - buf := []byte(line) - metrics, err := p.Parse(buf) - if err != nil { - return nil, err - } - - if len(metrics) > 0 { - return metrics[0], nil - } - - return nil, nil -} - -func (p *PointParser) SetDefaultTags(tags map[string]string) { +func (p *WavefrontParser) SetDefaultTags(tags map[string]string) { p.defaultTags = tags } @@ -119,7 +142,7 @@ func (p *PointParser) convertPointToTelegrafMetric(points []Point) ([]telegraf.M tags[k] = v } // apply default tags after parsed tags - for k, v := range p.defaultTags { + for k, v := range p.parent.defaultTags { tags[k] = v }