From 4451e1620386105f211306979b63bc7c46e1a759 Mon Sep 17 00:00:00 2001 From: Alex Russell-Saw Date: Mon, 10 Oct 2016 17:05:11 +0100 Subject: [PATCH] plugins/inputs/statsd: optimise byte slice and string usage in parser --- plugins/inputs/statsd/statsd.go | 78 +++++++----- plugins/inputs/statsd/statsd_test.go | 178 +++++++++++++-------------- 2 files changed, 137 insertions(+), 119 deletions(-) diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index a46af0a87..c92b92bdc 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -1,15 +1,18 @@ package statsd import ( + "bytes" "errors" "fmt" "log" "net" + "reflect" "sort" "strconv" "strings" "sync" "time" + "unsafe" "github.com/influxdata/telegraf/plugins/parsers/graphite" @@ -317,10 +320,10 @@ func (s *Statsd) parser() error { case <-s.done: return nil case packet = <-s.in: - lines := strings.Split(string(packet), "\n") + lines := bytes.Split(packet, []byte{'\n'}) for _, line := range lines { - line = strings.TrimSpace(line) - if line != "" { + line = bytes.TrimSpace(line) + if len(line) != 0 { s.parseStatsdLine(line) } } @@ -328,51 +331,57 @@ func (s *Statsd) parser() error { } } +func bytesToString(b []byte) string { + bh := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + sh := reflect.StringHeader{Data: bh.Data, Len: bh.Len} + return *(*string)(unsafe.Pointer(&sh)) + "" +} + // parseStatsdLine will parse the given statsd line, validating it as it goes. // If the line is valid, it will be cached for the next call to Gather() -func (s *Statsd) parseStatsdLine(line string) error { +func (s *Statsd) parseStatsdLine(line []byte) error { s.Lock() defer s.Unlock() - lineTags := make(map[string]string) + lineTags := make(map[string][]byte) if s.ParseDataDogTags { - recombinedSegments := make([]string, 0) + var recombinedSegments [][]byte // datadog tags look like this: // users.online:1|c|@0.5|#country:china,environment:production // users.online:1|c|#sometagwithnovalue // we will split on the pipe and remove any elements that are datadog // tags, parse them, and rebuild the line sans the datadog tags - pipesplit := strings.Split(line, "|") + pipesplit := bytes.Split(line, []byte{'|'}) for _, segment := range pipesplit { if len(segment) > 0 && segment[0] == '#' { // we have ourselves a tag; they are comma separated tagstr := segment[1:] - tags := strings.Split(tagstr, ",") + tags := bytes.Split(tagstr, []byte{','}) for _, tag := range tags { - ts := strings.SplitN(tag, ":", 2) - var k, v string + ts := bytes.SplitN(tag, []byte{':'}, 2) + var k, v []byte switch len(ts) { case 1: // just a tag k = ts[0] - v = "" + v = []byte{} case 2: k = ts[0] v = ts[1] } - if k != "" { - lineTags[k] = v + if len(k) != 0 { + lineTags[string(k)] = v } } } else { recombinedSegments = append(recombinedSegments, segment) } } - line = strings.Join(recombinedSegments, "|") + line = bytes.Join(recombinedSegments, []byte{'|'}) } // Validate splitting the line on ":" - bits := strings.Split(line, ":") + bits := bytes.Split(line, []byte{':'}) if len(bits) < 2 { log.Printf("E! Error: splitting ':', Unable to parse metric: %s\n", line) return errors.New("Error Parsing statsd line") @@ -385,10 +394,10 @@ func (s *Statsd) parseStatsdLine(line string) error { for _, bit := range bits { m := metric{} - m.bucket = bucketName + m.bucket = bytesToString(bucketName) // Validate splitting the bit on "|" - pipesplit := strings.Split(bit, "|") + pipesplit := bytes.Split(bit, []byte{'|'}) if len(pipesplit) < 2 { log.Printf("E! Error: splitting '|', Unable to parse metric: %s\n", line) return errors.New("Error Parsing statsd line") @@ -396,8 +405,8 @@ func (s *Statsd) parseStatsdLine(line string) error { sr := pipesplit[2] errmsg := "E! Error: parsing sample rate, %s, it must be in format like: " + "@0.1, @0.5, etc. Ignoring sample rate for line: %s\n" - if strings.Contains(sr, "@") && len(sr) > 1 { - samplerate, err := strconv.ParseFloat(sr[1:], 64) + if bytes.IndexByte(sr, '@') != -1 && len(sr) > 1 { + samplerate, err := strconv.ParseFloat(string(sr[1:]), 64) if err != nil { log.Printf(errmsg, err.Error(), line) } else { @@ -409,17 +418,21 @@ func (s *Statsd) parseStatsdLine(line string) error { } } + // avoid loads of []byte <> string conversions + strPipeSplit0 := bytesToString(pipesplit[0]) + strPipeSplit1 := bytesToString(pipesplit[1]) + // Validate metric type - switch pipesplit[1] { + switch strPipeSplit1 { case "g", "c", "s", "ms", "h": - m.mtype = pipesplit[1] + m.mtype = strPipeSplit1 default: - log.Printf("E! Error: Statsd Metric type %s unsupported", pipesplit[1]) + log.Printf("E! Error: Statsd Metric type %s unsupported", strPipeSplit1) return errors.New("Error Parsing statsd line") } // Parse the value - if strings.HasPrefix(pipesplit[0], "-") || strings.HasPrefix(pipesplit[0], "+") { + if pipesplit[0][0] == '-' || pipesplit[0][0] == '+' { if m.mtype != "g" { log.Printf("E! Error: +- values are only supported for gauges: %s\n", line) return errors.New("Error Parsing statsd line") @@ -429,7 +442,7 @@ func (s *Statsd) parseStatsdLine(line string) error { switch m.mtype { case "g", "ms", "h": - v, err := strconv.ParseFloat(pipesplit[0], 64) + v, err := strconv.ParseFloat(strPipeSplit0, 64) if err != nil { log.Printf("E! Error: parsing value to float64: %s\n", line) return errors.New("Error Parsing statsd line") @@ -437,9 +450,9 @@ func (s *Statsd) parseStatsdLine(line string) error { m.floatvalue = v case "c", "s": var v int64 - v, err := strconv.ParseInt(pipesplit[0], 10, 64) + v, err := strconv.ParseInt(strPipeSplit0, 10, 64) if err != nil { - v2, err2 := strconv.ParseFloat(pipesplit[0], 64) + v2, err2 := strconv.ParseFloat(strPipeSplit0, 64) if err2 != nil { log.Printf("E! Error: parsing value to int64: %s\n", line) return errors.New("Error Parsing statsd line") @@ -470,17 +483,22 @@ func (s *Statsd) parseStatsdLine(line string) error { if len(lineTags) > 0 { for k, v := range lineTags { - m.tags[k] = v + m.tags[k] = bytesToString(v) } } // Make a unique key for the measurement name/tags - var tg []string + var tg = make([]string, len(m.tags)) + i := 0 for k, v := range m.tags { - tg = append(tg, fmt.Sprintf("%s=%s", k, v)) + tg[i] = k + "=" + v + i++ } sort.Strings(tg) - m.hash = fmt.Sprintf("%s%s", strings.Join(tg, ""), m.name) + for i := range tg { + m.hash = m.hash + tg[i] + } + m.hash = m.hash + m.name s.aggregate(m) } diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index bb0d68c16..c5ef26392 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -36,7 +36,7 @@ func TestParse_ValidLines(t *testing.T) { } for _, line := range valid_lines { - err := s.parseStatsdLine(line) + err := s.parseStatsdLine([]byte(line)) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } @@ -67,7 +67,7 @@ func TestParse_Gauges(t *testing.T) { } for _, line := range valid_lines { - err := s.parseStatsdLine(line) + err := s.parseStatsdLine([]byte(line)) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } @@ -142,7 +142,7 @@ func TestParse_Sets(t *testing.T) { } for _, line := range valid_lines { - err := s.parseStatsdLine(line) + err := s.parseStatsdLine([]byte(line)) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } @@ -193,7 +193,7 @@ func TestParse_Counters(t *testing.T) { } for _, line := range valid_lines { - err := s.parseStatsdLine(line) + err := s.parseStatsdLine([]byte(line)) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } @@ -249,7 +249,7 @@ func TestParse_Timings(t *testing.T) { } for _, line := range valid_lines { - err := s.parseStatsdLine(line) + err := s.parseStatsdLine([]byte(line)) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } @@ -278,7 +278,7 @@ func TestParseScientificNotation(t *testing.T) { "scientific.notation:4.6968460083008E-5|h", } for _, line := range sciNotationLines { - err := s.parseStatsdLine(line) + err := s.parseStatsdLine([]byte(line)) if err != nil { t.Errorf("Parsing line [%s] should not have resulted in error: %s\n", line, err) } @@ -302,7 +302,7 @@ func TestParse_InvalidLines(t *testing.T) { "invalid.value:1d1|c", } for _, line := range invalid_lines { - err := s.parseStatsdLine(line) + err := s.parseStatsdLine([]byte(line)) if err == nil { t.Errorf("Parsing line %s should have resulted in an error\n", line) } @@ -320,7 +320,7 @@ func TestParse_InvalidSampleRate(t *testing.T) { } for _, line := range invalid_lines { - err := s.parseStatsdLine(line) + err := s.parseStatsdLine([]byte(line)) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } @@ -370,7 +370,7 @@ func TestParse_DefaultNameParsing(t *testing.T) { } for _, line := range valid_lines { - err := s.parseStatsdLine(line) + err := s.parseStatsdLine([]byte(line)) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } @@ -411,7 +411,7 @@ func TestParse_Template(t *testing.T) { } for _, line := range lines { - err := s.parseStatsdLine(line) + err := s.parseStatsdLine([]byte(line)) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } @@ -453,7 +453,7 @@ func TestParse_TemplateFilter(t *testing.T) { } for _, line := range lines { - err := s.parseStatsdLine(line) + err := s.parseStatsdLine([]byte(line)) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } @@ -495,7 +495,7 @@ func TestParse_TemplateSpecificity(t *testing.T) { } for _, line := range lines { - err := s.parseStatsdLine(line) + err := s.parseStatsdLine([]byte(line)) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } @@ -543,7 +543,7 @@ func TestParse_TemplateFields(t *testing.T) { } for _, line := range lines { - err := s.parseStatsdLine(line) + err := s.parseStatsdLine([]byte(line)) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } @@ -721,7 +721,7 @@ func TestParse_DataDogTags(t *testing.T) { } for _, line := range lines { - err := s.parseStatsdLine(line) + err := s.parseStatsdLine([]byte(line)) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } @@ -836,7 +836,7 @@ func TestParse_MeasurementsWithSameName(t *testing.T) { } for _, line := range valid_lines { - err := s.parseStatsdLine(line) + err := s.parseStatsdLine([]byte(line)) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } @@ -889,14 +889,14 @@ func TestParse_MeasurementsWithMultipleValues(t *testing.T) { s_multiple := NewTestStatsd() for _, line := range single_lines { - err := s_single.parseStatsdLine(line) + err := s_single.parseStatsdLine([]byte(line)) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } } for _, line := range multiple_lines { - err := s_multiple.parseStatsdLine(line) + err := s_multiple.parseStatsdLine([]byte(line)) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } @@ -999,7 +999,7 @@ func TestParse_Timings_MultipleFieldsWithTemplate(t *testing.T) { } for _, line := range validLines { - err := s.parseStatsdLine(line) + err := s.parseStatsdLine([]byte(line)) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } @@ -1034,17 +1034,17 @@ func TestParse_Timings_MultipleFieldsWithoutTemplate(t *testing.T) { s.Percentiles = []int{90} acc := &testutil.Accumulator{} - validLines := []string{ - "test_timing.success:1|ms", - "test_timing.success:11|ms", - "test_timing.success:1|ms", - "test_timing.success:1|ms", - "test_timing.success:1|ms", - "test_timing.error:2|ms", - "test_timing.error:22|ms", - "test_timing.error:2|ms", - "test_timing.error:2|ms", - "test_timing.error:2|ms", + validLines := [][]byte{ + []byte("test_timing.success:1|ms"), + []byte("test_timing.success:11|ms"), + []byte("test_timing.success:1|ms"), + []byte("test_timing.success:1|ms"), + []byte("test_timing.success:1|ms"), + []byte("test_timing.error:2|ms"), + []byte("test_timing.error:22|ms"), + []byte("test_timing.error:2|ms"), + []byte("test_timing.error:2|ms"), + []byte("test_timing.error:2|ms"), } for _, line := range validLines { @@ -1078,17 +1078,17 @@ func TestParse_Timings_MultipleFieldsWithoutTemplate(t *testing.T) { func BenchmarkParse(b *testing.B) { s := NewTestStatsd() - validLines := []string{ - "test.timing.success:1|ms", - "test.timing.success:11|ms", - "test.timing.success:1|ms", - "test.timing.success:1|ms", - "test.timing.success:1|ms", - "test.timing.error:2|ms", - "test.timing.error:22|ms", - "test.timing.error:2|ms", - "test.timing.error:2|ms", - "test.timing.error:2|ms", + validLines := [][]byte{ + []byte("test_timing.success:1|ms"), + []byte("test_timing.success:11|ms"), + []byte("test_timing.success:1|ms"), + []byte("test_timing.success:1|ms"), + []byte("test_timing.success:1|ms"), + []byte("test_timing.error:2|ms"), + []byte("test_timing.error:22|ms"), + []byte("test_timing.error:2|ms"), + []byte("test_timing.error:2|ms"), + []byte("test_timing.error:2|ms"), } for n := 0; n < b.N; n++ { for _, line := range validLines { @@ -1103,17 +1103,17 @@ func BenchmarkParse(b *testing.B) { func BenchmarkParseWithTemplate(b *testing.B) { s := NewTestStatsd() s.Templates = []string{"measurement.measurement.field"} - validLines := []string{ - "test.timing.success:1|ms", - "test.timing.success:11|ms", - "test.timing.success:1|ms", - "test.timing.success:1|ms", - "test.timing.success:1|ms", - "test.timing.error:2|ms", - "test.timing.error:22|ms", - "test.timing.error:2|ms", - "test.timing.error:2|ms", - "test.timing.error:2|ms", + validLines := [][]byte{ + []byte("test_timing.success:1|ms"), + []byte("test_timing.success:11|ms"), + []byte("test_timing.success:1|ms"), + []byte("test_timing.success:1|ms"), + []byte("test_timing.success:1|ms"), + []byte("test_timing.error:2|ms"), + []byte("test_timing.error:22|ms"), + []byte("test_timing.error:2|ms"), + []byte("test_timing.error:2|ms"), + []byte("test_timing.error:2|ms"), } for n := 0; n < b.N; n++ { for _, line := range validLines { @@ -1128,17 +1128,17 @@ func BenchmarkParseWithTemplate(b *testing.B) { func BenchmarkParseWithTemplateAndFilter(b *testing.B) { s := NewTestStatsd() s.Templates = []string{"cpu* measurement.measurement.field"} - validLines := []string{ - "test.timing.success:1|ms", - "test.timing.success:11|ms", - "test.timing.success:1|ms", - "cpu.timing.success:1|ms", - "cpu.timing.success:1|ms", - "cpu.timing.error:2|ms", - "cpu.timing.error:22|ms", - "test.timing.error:2|ms", - "test.timing.error:2|ms", - "test.timing.error:2|ms", + validLines := [][]byte{ + []byte("test_timing.success:1|ms"), + []byte("test_timing.success:11|ms"), + []byte("test_timing.success:1|ms"), + []byte("test_timing.success:1|ms"), + []byte("test_timing.success:1|ms"), + []byte("test_timing.error:2|ms"), + []byte("test_timing.error:22|ms"), + []byte("test_timing.error:2|ms"), + []byte("test_timing.error:2|ms"), + []byte("test_timing.error:2|ms"), } for n := 0; n < b.N; n++ { for _, line := range validLines { @@ -1156,21 +1156,21 @@ func BenchmarkParseWith2TemplatesAndFilter(b *testing.B) { "cpu1* measurement.measurement.field", "cpu2* measurement.measurement.field", } - validLines := []string{ - "test.timing.success:1|ms", - "test.timing.success:11|ms", - "test.timing.success:1|ms", - "cpu1.timing.success:1|ms", - "cpu1.timing.success:1|ms", - "cpu2.timing.error:2|ms", - "cpu2.timing.error:22|ms", - "test.timing.error:2|ms", - "test.timing.error:2|ms", - "test.timing.error:2|ms", + validLines := [][]byte{ + []byte("test_timing.success:1|ms"), + []byte("test_timing.success:11|ms"), + []byte("test_timing.success:1|ms"), + []byte("test_timing.success:1|ms"), + []byte("test_timing.success:1|ms"), + []byte("test_timing.error:2|ms"), + []byte("test_timing.error:22|ms"), + []byte("test_timing.error:2|ms"), + []byte("test_timing.error:2|ms"), + []byte("test_timing.error:2|ms"), } for n := 0; n < b.N; n++ { for _, line := range validLines { - err := s.parseStatsdLine(line) + err := s.parseStatsdLine([]byte(line)) if err != nil { b.Errorf("Parsing line %s should not have resulted in an error\n", line) } @@ -1184,17 +1184,17 @@ func BenchmarkParseWith2Templates3TagsAndFilter(b *testing.B) { "cpu1* measurement.measurement.region.city.rack.field", "cpu2* measurement.measurement.region.city.rack.field", } - validLines := []string{ - "test.timing.us-east.nyc.rack01.success:1|ms", - "test.timing.us-east.nyc.rack01.success:11|ms", - "test.timing.us-west.sf.rack01.success:1|ms", - "cpu1.timing.us-west.sf.rack01.success:1|ms", - "cpu1.timing.us-east.nyc.rack01.success:1|ms", - "cpu2.timing.us-east.nyc.rack01.error:2|ms", - "cpu2.timing.us-west.sf.rack01.error:22|ms", - "test.timing.us-west.sf.rack01.error:2|ms", - "test.timing.us-west.sf.rack01.error:2|ms", - "test.timing.us-east.nyc.rack01.error:2|ms", + validLines := [][]byte{ + []byte("test.timing.us-east.nyc.rack01.success:1|ms"), + []byte("test.timing.us-east.nyc.rack01.success:11|ms"), + []byte("test.timing.us-west.sf.rack01.success:1|ms"), + []byte("cpu1.timing.us-west.sf.rack01.success:1|ms"), + []byte("cpu1.timing.us-east.nyc.rack01.success:1|ms"), + []byte("cpu2.timing.us-east.nyc.rack01.error:2|ms"), + []byte("cpu2.timing.us-west.sf.rack01.error:22|ms"), + []byte("test.timing.us-west.sf.rack01.error:2|ms"), + []byte("test.timing.us-west.sf.rack01.error:2|ms"), + []byte("test.timing.us-east.nyc.rack01.error:2|ms"), } for n := 0; n < b.N; n++ { for _, line := range validLines { @@ -1213,7 +1213,7 @@ func TestParse_Timings_Delete(t *testing.T) { var err error line := "timing:100|ms" - err = s.parseStatsdLine(line) + err = s.parseStatsdLine([]byte(line)) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } @@ -1237,7 +1237,7 @@ func TestParse_Gauges_Delete(t *testing.T) { var err error line := "current.users:100|g" - err = s.parseStatsdLine(line) + err = s.parseStatsdLine([]byte(line)) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } @@ -1263,7 +1263,7 @@ func TestParse_Sets_Delete(t *testing.T) { var err error line := "unique.user.ids:100|s" - err = s.parseStatsdLine(line) + err = s.parseStatsdLine([]byte(line)) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } @@ -1289,7 +1289,7 @@ func TestParse_Counters_Delete(t *testing.T) { var err error line := "total.users:100|c" - err = s.parseStatsdLine(line) + err = s.parseStatsdLine([]byte(line)) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) }