From 4301b8e32ab782a3a52d20d229929bd0e86a5af8 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Thu, 5 Oct 2017 16:14:21 -0700 Subject: [PATCH] Use chunked transfer encoding in InfluxDB output (#3307) (cherry picked from commit cce40c515afc2e9a03c073dc627cb02f88da80db) --- plugins/outputs/influxdb/client/client.go | 8 +- plugins/outputs/influxdb/client/http.go | 58 +--------- plugins/outputs/influxdb/client/http_test.go | 112 ++++++------------- plugins/outputs/influxdb/client/udp.go | 27 +---- plugins/outputs/influxdb/client/udp_test.go | 50 +-------- plugins/outputs/influxdb/influxdb.go | 8 +- 6 files changed, 52 insertions(+), 211 deletions(-) diff --git a/plugins/outputs/influxdb/client/client.go b/plugins/outputs/influxdb/client/client.go index 3f52752ad..4bcaceb74 100644 --- a/plugins/outputs/influxdb/client/client.go +++ b/plugins/outputs/influxdb/client/client.go @@ -4,13 +4,7 @@ import "io" type Client interface { Query(command string) error - - Write(b []byte) (int, error) - WriteWithParams(b []byte, params WriteParams) (int, error) - - WriteStream(b io.Reader, contentLength int) (int, error) - WriteStreamWithParams(b io.Reader, contentLength int, params WriteParams) (int, error) - + WriteStream(b io.Reader) error Close() error } diff --git a/plugins/outputs/influxdb/client/http.go b/plugins/outputs/influxdb/client/http.go index d8c1951f5..98be4bf31 100644 --- a/plugins/outputs/influxdb/client/http.go +++ b/plugins/outputs/influxdb/client/http.go @@ -136,60 +136,13 @@ func (c *httpClient) Query(command string) error { return c.doRequest(req, http.StatusOK) } -func (c *httpClient) Write(b []byte) (int, error) { - req, err := c.makeWriteRequest(bytes.NewReader(b), len(b), c.writeURL) +func (c *httpClient) WriteStream(r io.Reader) error { + req, err := c.makeWriteRequest(r, c.writeURL) if err != nil { - return 0, nil + return err } - err = c.doRequest(req, http.StatusNoContent) - if err == nil { - return len(b), nil - } - return 0, err -} - -func (c *httpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) { - req, err := c.makeWriteRequest(bytes.NewReader(b), len(b), writeURL(c.url, wp)) - if err != nil { - return 0, nil - } - - err = c.doRequest(req, http.StatusNoContent) - if err == nil { - return len(b), nil - } - return 0, err -} - -func (c *httpClient) WriteStream(r io.Reader, contentLength int) (int, error) { - req, err := c.makeWriteRequest(r, contentLength, c.writeURL) - if err != nil { - return 0, nil - } - - err = c.doRequest(req, http.StatusNoContent) - if err == nil { - return contentLength, nil - } - return 0, err -} - -func (c *httpClient) WriteStreamWithParams( - r io.Reader, - contentLength int, - wp WriteParams, -) (int, error) { - req, err := c.makeWriteRequest(r, contentLength, writeURL(c.url, wp)) - if err != nil { - return 0, nil - } - - err = c.doRequest(req, http.StatusNoContent) - if err == nil { - return contentLength, nil - } - return 0, err + return c.doRequest(req, http.StatusNoContent) } func (c *httpClient) doRequest( @@ -231,7 +184,6 @@ func (c *httpClient) doRequest( func (c *httpClient) makeWriteRequest( body io.Reader, - contentLength int, writeURL string, ) (*http.Request, error) { req, err := c.makeRequest(writeURL, body) @@ -240,8 +192,6 @@ func (c *httpClient) makeWriteRequest( } if c.config.ContentEncoding == "gzip" { req.Header.Set("Content-Encoding", "gzip") - } else { - req.Header.Set("Content-Length", fmt.Sprint(contentLength)) } return req, nil } diff --git a/plugins/outputs/influxdb/client/http_test.go b/plugins/outputs/influxdb/client/http_test.go index d094078ea..2cb0182e8 100644 --- a/plugins/outputs/influxdb/client/http_test.go +++ b/plugins/outputs/influxdb/client/http_test.go @@ -110,66 +110,8 @@ func TestHTTPClient_Write(t *testing.T) { client, err := NewHTTP(config, wp) defer client.Close() assert.NoError(t, err) - n, err := client.Write([]byte("cpu value=99\n")) - assert.Equal(t, 13, n) - assert.NoError(t, err) - _, err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n")), 13) - assert.NoError(t, err) -} - -func TestHTTPClient_WriteParamsOverride(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - switch r.URL.Path { - case "/write": - // test that database is set properly - if r.FormValue("db") != "override" { - w.WriteHeader(http.StatusTeapot) - w.Header().Set("Content-Type", "application/json") - fmt.Fprintln(w, `{"results":[{}],"error":"wrong db name"}`) - } - - // Validate the request body: - buf := make([]byte, 100) - n, _ := r.Body.Read(buf) - expected := "cpu value=99" - got := string(buf[0 : n-1]) - if expected != got { - w.WriteHeader(http.StatusTeapot) - w.Header().Set("Content-Type", "application/json") - msg := fmt.Sprintf(`{"results":[{}],"error":"expected [%s], got [%s]"}`, expected, got) - fmt.Fprintln(w, msg) - } - - w.WriteHeader(http.StatusNoContent) - w.Header().Set("Content-Type", "application/json") - case "/query": - w.WriteHeader(http.StatusOK) - w.Header().Set("Content-Type", "application/json") - fmt.Fprintln(w, `{"results":[{}]}`) - } - })) - defer ts.Close() - - config := HTTPConfig{ - URL: ts.URL, - } - defaultWP := WriteParams{ - Database: "test", - } - client, err := NewHTTP(config, defaultWP) - defer client.Close() - assert.NoError(t, err) - - // test that WriteWithParams overrides the default write params - wp := WriteParams{ - Database: "override", - } - n, err := client.WriteWithParams([]byte("cpu value=99\n"), wp) - assert.Equal(t, 13, n) - assert.NoError(t, err) - - _, err = client.WriteStreamWithParams(bytes.NewReader([]byte("cpu value=99\n")), 13, wp) + err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n"))) assert.NoError(t, err) } @@ -197,23 +139,7 @@ func TestHTTPClient_Write_Errors(t *testing.T) { assert.NoError(t, err) lp := []byte("cpu value=99\n") - n, err := client.Write(lp) - assert.Equal(t, 0, n) - assert.Error(t, err) - - n, err = client.WriteStream(bytes.NewReader(lp), 13) - assert.Equal(t, 0, n) - assert.Error(t, err) - - wp := WriteParams{ - Database: "override", - } - n, err = client.WriteWithParams(lp, wp) - assert.Equal(t, 0, n) - assert.Error(t, err) - - n, err = client.WriteStreamWithParams(bytes.NewReader(lp), 13, wp) - assert.Equal(t, 0, n) + err = client.WriteStream(bytes.NewReader(lp)) assert.Error(t, err) } @@ -373,3 +299,37 @@ func TestGzipCompression(t *testing.T) { assert.Equal(t, []byte(influxLine), uncompressed.Bytes()) } + +func TestHTTPClient_PathPrefix(t *testing.T) { + prefix := "/some/random/prefix" + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case prefix + "/write": + w.WriteHeader(http.StatusNoContent) + w.Header().Set("Content-Type", "application/json") + case prefix + "/query": + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"results":[{}]}`) + default: + w.WriteHeader(http.StatusNotFound) + msg := fmt.Sprintf("Path not found: %s", r.URL.Path) + fmt.Fprintln(w, msg) + } + })) + defer ts.Close() + + config := HTTPConfig{ + URL: ts.URL + prefix, + } + wp := WriteParams{ + Database: "test", + } + client, err := NewHTTP(config, wp) + defer client.Close() + assert.NoError(t, err) + err = client.Query("CREATE DATABASE test") + assert.NoError(t, err) + err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n"))) + assert.NoError(t, err) +} diff --git a/plugins/outputs/influxdb/client/udp.go b/plugins/outputs/influxdb/client/udp.go index 1dd4d9936..786b047fa 100644 --- a/plugins/outputs/influxdb/client/udp.go +++ b/plugins/outputs/influxdb/client/udp.go @@ -1,7 +1,6 @@ package client import ( - "bytes" "fmt" "io" "log" @@ -62,18 +61,8 @@ func (c *udpClient) Query(command string) error { return nil } -// Write will send the byte stream to the given UDP client endpoint -func (c *udpClient) Write(b []byte) (int, error) { - return c.WriteStream(bytes.NewReader(b), -1) -} - -// WriteWithParams are ignored by the UDP client, will forward to WriteStream -func (c *udpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) { - return c.WriteStream(bytes.NewReader(b), -1) -} - // WriteStream will send the provided data through to the client, contentLength is ignored by the UDP client -func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) { +func (c *udpClient) WriteStream(r io.Reader) error { var totaln int for { nR, err := r.Read(c.buffer) @@ -81,14 +70,14 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) { break } if err != io.EOF && err != nil { - return totaln, err + return err } if c.buffer[nR-1] == uint8('\n') { nW, err := c.conn.Write(c.buffer[0:nR]) totaln += nW if err != nil { - return totaln, err + return err } } else { log.Printf("E! Could not fit point into UDP payload; dropping") @@ -99,7 +88,7 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) { break } if err != io.EOF && err != nil { - return totaln, err + return err } if c.buffer[nR-1] == uint8('\n') { break @@ -107,13 +96,7 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) { } } } - return totaln, nil -} - -// WriteStreamWithParams will forward the stream to the client backend, contentLength is ignored by the UDP client -// write params are ignored by the UDP client -func (c *udpClient) WriteStreamWithParams(r io.Reader, contentLength int, wp WriteParams) (int, error) { - return c.WriteStream(r, -1) + return nil } // Close will terminate the provided client connection diff --git a/plugins/outputs/influxdb/client/udp_test.go b/plugins/outputs/influxdb/client/udp_test.go index 9308144b5..545f142f5 100644 --- a/plugins/outputs/influxdb/client/udp_test.go +++ b/plugins/outputs/influxdb/client/udp_test.go @@ -9,7 +9,6 @@ import ( "github.com/influxdata/telegraf/metric" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestUDPClient(t *testing.T) { @@ -65,43 +64,6 @@ func TestUDPClient_Write(t *testing.T) { } }() - // test sending simple metric - n, err := client.Write([]byte("cpu value=99\n")) - assert.Equal(t, n, 13) - assert.NoError(t, err) - pkt := <-packets - assert.Equal(t, "cpu value=99\n", pkt) - - wp := WriteParams{} - // - // Using WriteStream() & a metric.Reader: - config3 := UDPConfig{ - URL: "udp://localhost:8199", - PayloadSize: 40, - } - client3, err := NewUDP(config3) - assert.NoError(t, err) - - now := time.Unix(1484142942, 0) - m1, _ := metric.New("test", map[string]string{}, - map[string]interface{}{"value": 1.1}, now) - m2, _ := metric.New("test", map[string]string{}, - map[string]interface{}{"value": 1.1}, now) - m3, _ := metric.New("test", map[string]string{}, - map[string]interface{}{"value": 1.1}, now) - ms := []telegraf.Metric{m1, m2, m3} - mReader := metric.NewReader(ms) - n, err = client3.WriteStreamWithParams(mReader, 10, wp) - // 3 metrics at 35 bytes each (including the newline) - assert.Equal(t, 105, n) - assert.NoError(t, err) - pkt = <-packets - assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt) - pkt = <-packets - assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt) - pkt = <-packets - assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt) - assert.NoError(t, client.Close()) config = UDPConfig{ @@ -112,17 +74,15 @@ func TestUDPClient_Write(t *testing.T) { assert.NoError(t, err) ts := time.Unix(1484142943, 0) - m1, _ = metric.New("test", map[string]string{}, + m1, _ := metric.New("test", map[string]string{}, map[string]interface{}{"this_is_a_very_long_field_name": 1.1}, ts) - m2, _ = metric.New("test", map[string]string{}, + m2, _ := metric.New("test", map[string]string{}, map[string]interface{}{"value": 1.1}, ts) - ms = []telegraf.Metric{m1, m2} + ms := []telegraf.Metric{m1, m2} reader := metric.NewReader(ms) - n, err = client4.WriteStream(reader, 0) + err = client4.WriteStream(reader) assert.NoError(t, err) - require.Equal(t, 35, n) - assert.NoError(t, err) - pkt = <-packets + pkt := <-packets assert.Equal(t, "test value=1.1 1484142943000000000\n", pkt) assert.NoError(t, client4.Close()) diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index f2f946513..98bf252c3 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -185,12 +185,6 @@ func (i *InfluxDB) Description() string { // Write will choose a random server in the cluster to write to until a successful write // occurs, logging each unsuccessful. If all servers fail, return error. func (i *InfluxDB) Write(metrics []telegraf.Metric) error { - - bufsize := 0 - for _, m := range metrics { - bufsize += m.Len() - } - r := metric.NewReader(metrics) // This will get set to nil if a successful write occurs @@ -198,7 +192,7 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { p := rand.Perm(len(i.clients)) for _, n := range p { - if _, e := i.clients[n].WriteStream(r, bufsize); e != nil { + if e := i.clients[n].WriteStream(r); e != nil { // If the database was not found, try to recreate it: if strings.Contains(e.Error(), "database not found") { errc := i.clients[n].Query(fmt.Sprintf(`CREATE DATABASE "%s"`, qiReplacer.Replace(i.Database)))