From 03a69106896c74b5ec3921619ada8699453a0127 Mon Sep 17 00:00:00 2001 From: Chris Goller Date: Mon, 2 Dec 2019 12:49:04 -0600 Subject: [PATCH] perf(inputs/influxdb_listener): benchmark serving writes (#6673) * perf(inputs/influxdb_listener): benchmark serving writes * chore(inputs/influxdb_listener): remove stray comment --- .../influxdb_listener_test.go | 114 ++++++++++++++++++ testutil/accumulator.go | 19 +++ 2 files changed, 133 insertions(+) create mode 100644 plugins/inputs/influxdb_listener/influxdb_listener_test.go diff --git a/plugins/inputs/influxdb_listener/influxdb_listener_test.go b/plugins/inputs/influxdb_listener/influxdb_listener_test.go new file mode 100644 index 000000000..5badc1213 --- /dev/null +++ b/plugins/inputs/influxdb_listener/influxdb_listener_test.go @@ -0,0 +1,114 @@ +package http_listener + +import ( + "fmt" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/selfstat" + "github.com/influxdata/telegraf/testutil" +) + +// newListener is the minimal HTTPListener construction to serve writes. +func newListener() *HTTPListener { + listener := &HTTPListener{ + TimeFunc: time.Now, + acc: &testutil.NopAccumulator{}, + BytesRecv: selfstat.Register("http_listener", "bytes_received", map[string]string{}), + handler: influx.NewMetricHandler(), + pool: NewPool(200, DEFAULT_MAX_LINE_SIZE), + MaxLineSize: internal.Size{ + Size: DEFAULT_MAX_LINE_SIZE, + }, + MaxBodySize: internal.Size{ + Size: DEFAULT_MAX_BODY_SIZE, + }, + } + listener.parser = influx.NewParser(listener.handler) + return listener +} + +func BenchmarkHTTPListener_serveWrite(b *testing.B) { + res := httptest.NewRecorder() + addr := "http://localhost/write?db=mydb" + + benchmarks := []struct { + name string + lines string + }{ + { + name: "single line, tag, and field", + lines: lines(1, 1, 1), + }, + { + name: "single line, 10 tags and fields", + lines: lines(1, 10, 10), + }, + { + name: "single line, 100 tags and fields", + lines: lines(1, 100, 100), + }, + { + name: "1k lines, single tag and field", + lines: lines(1000, 1, 1), + }, + { + name: "1k lines, 10 tags and fields", + lines: lines(1000, 10, 10), + }, + { + name: "10k lines, 10 tags and fields", + lines: lines(10000, 10, 10), + }, + { + name: "100k lines, 10 tags and fields", + lines: lines(100000, 10, 10), + }, + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + listener := newListener() + + b.ResetTimer() + for n := 0; n < b.N; n++ { + req, err := http.NewRequest("POST", addr, strings.NewReader(bm.lines)) + if err != nil { + b.Error(err) + } + listener.serveWrite(res, req) + if res.Code != http.StatusNoContent { + b.Errorf("unexpected status %d", res.Code) + } + } + }) + } +} + +func lines(lines, numTags, numFields int) string { + lp := make([]string, lines) + for i := 0; i < lines; i++ { + tags := make([]string, numTags) + for j := 0; j < numTags; j++ { + tags[j] = fmt.Sprintf("t%d=v%d", j, j) + } + + fields := make([]string, numFields) + for k := 0; k < numFields; k++ { + fields[k] = fmt.Sprintf("f%d=%d", k, k) + } + + lp[i] = fmt.Sprintf("m%d,%s %s", + i, + strings.Join(tags, ","), + strings.Join(fields, ","), + ) + } + + return strings.Join(lp, "\n") +} diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 64c3d19fe..65592b5a0 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -715,3 +715,22 @@ func (a *Accumulator) BoolField(measurement string, field string) (bool, bool) { return false, false } + +// NopAccumulator is used for benchmarking to isolate the plugin from the internal +// telegraf accumulator machinary. +type NopAccumulator struct{} + +func (n *NopAccumulator) AddFields(measurement string, fields map[string]interface{}, tags map[string]string, t ...time.Time) { +} +func (n *NopAccumulator) AddGauge(measurement string, fields map[string]interface{}, tags map[string]string, t ...time.Time) { +} +func (n *NopAccumulator) AddCounter(measurement string, fields map[string]interface{}, tags map[string]string, t ...time.Time) { +} +func (n *NopAccumulator) AddSummary(measurement string, fields map[string]interface{}, tags map[string]string, t ...time.Time) { +} +func (n *NopAccumulator) AddHistogram(measurement string, fields map[string]interface{}, tags map[string]string, t ...time.Time) { +} +func (n *NopAccumulator) AddMetric(telegraf.Metric) {} +func (n *NopAccumulator) SetPrecision(precision time.Duration) {} +func (n *NopAccumulator) AddError(err error) {} +func (n *NopAccumulator) WithTracking(maxTracked int) telegraf.TrackingAccumulator { return nil }