Use chunked transfer encoding in InfluxDB output (#3307)

(cherry picked from commit cce40c515a)
This commit is contained in:
Daniel Nelson 2017-10-05 16:14:21 -07:00 committed by Daniel Nelson
parent 3c9d7db0a0
commit 4301b8e32a
No known key found for this signature in database
GPG Key ID: CAAD59C9444F6155
6 changed files with 52 additions and 211 deletions

View File

@ -4,13 +4,7 @@ import "io"
type Client interface { type Client interface {
Query(command string) error Query(command string) error
WriteStream(b io.Reader) error
Write(b []byte) (int, error)
WriteWithParams(b []byte, params WriteParams) (int, error)
WriteStream(b io.Reader, contentLength int) (int, error)
WriteStreamWithParams(b io.Reader, contentLength int, params WriteParams) (int, error)
Close() error Close() error
} }

View File

@ -136,60 +136,13 @@ func (c *httpClient) Query(command string) error {
return c.doRequest(req, http.StatusOK) return c.doRequest(req, http.StatusOK)
} }
func (c *httpClient) Write(b []byte) (int, error) { func (c *httpClient) WriteStream(r io.Reader) error {
req, err := c.makeWriteRequest(bytes.NewReader(b), len(b), c.writeURL) req, err := c.makeWriteRequest(r, c.writeURL)
if err != nil { if err != nil {
return 0, nil return err
} }
err = c.doRequest(req, http.StatusNoContent) return c.doRequest(req, http.StatusNoContent)
if err == nil {
return len(b), nil
}
return 0, err
}
func (c *httpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
req, err := c.makeWriteRequest(bytes.NewReader(b), len(b), writeURL(c.url, wp))
if err != nil {
return 0, nil
}
err = c.doRequest(req, http.StatusNoContent)
if err == nil {
return len(b), nil
}
return 0, err
}
func (c *httpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
req, err := c.makeWriteRequest(r, contentLength, c.writeURL)
if err != nil {
return 0, nil
}
err = c.doRequest(req, http.StatusNoContent)
if err == nil {
return contentLength, nil
}
return 0, err
}
func (c *httpClient) WriteStreamWithParams(
r io.Reader,
contentLength int,
wp WriteParams,
) (int, error) {
req, err := c.makeWriteRequest(r, contentLength, writeURL(c.url, wp))
if err != nil {
return 0, nil
}
err = c.doRequest(req, http.StatusNoContent)
if err == nil {
return contentLength, nil
}
return 0, err
} }
func (c *httpClient) doRequest( func (c *httpClient) doRequest(
@ -231,7 +184,6 @@ func (c *httpClient) doRequest(
func (c *httpClient) makeWriteRequest( func (c *httpClient) makeWriteRequest(
body io.Reader, body io.Reader,
contentLength int,
writeURL string, writeURL string,
) (*http.Request, error) { ) (*http.Request, error) {
req, err := c.makeRequest(writeURL, body) req, err := c.makeRequest(writeURL, body)
@ -240,8 +192,6 @@ func (c *httpClient) makeWriteRequest(
} }
if c.config.ContentEncoding == "gzip" { if c.config.ContentEncoding == "gzip" {
req.Header.Set("Content-Encoding", "gzip") req.Header.Set("Content-Encoding", "gzip")
} else {
req.Header.Set("Content-Length", fmt.Sprint(contentLength))
} }
return req, nil return req, nil
} }

View File

@ -110,66 +110,8 @@ func TestHTTPClient_Write(t *testing.T) {
client, err := NewHTTP(config, wp) client, err := NewHTTP(config, wp)
defer client.Close() defer client.Close()
assert.NoError(t, err) assert.NoError(t, err)
n, err := client.Write([]byte("cpu value=99\n"))
assert.Equal(t, 13, n)
assert.NoError(t, err)
_, err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n")), 13) err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n")))
assert.NoError(t, err)
}
func TestHTTPClient_WriteParamsOverride(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
// test that database is set properly
if r.FormValue("db") != "override" {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"wrong db name"}`)
}
// Validate the request body:
buf := make([]byte, 100)
n, _ := r.Body.Read(buf)
expected := "cpu value=99"
got := string(buf[0 : n-1])
if expected != got {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
msg := fmt.Sprintf(`{"results":[{}],"error":"expected [%s], got [%s]"}`, expected, got)
fmt.Fprintln(w, msg)
}
w.WriteHeader(http.StatusNoContent)
w.Header().Set("Content-Type", "application/json")
case "/query":
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}]}`)
}
}))
defer ts.Close()
config := HTTPConfig{
URL: ts.URL,
}
defaultWP := WriteParams{
Database: "test",
}
client, err := NewHTTP(config, defaultWP)
defer client.Close()
assert.NoError(t, err)
// test that WriteWithParams overrides the default write params
wp := WriteParams{
Database: "override",
}
n, err := client.WriteWithParams([]byte("cpu value=99\n"), wp)
assert.Equal(t, 13, n)
assert.NoError(t, err)
_, err = client.WriteStreamWithParams(bytes.NewReader([]byte("cpu value=99\n")), 13, wp)
assert.NoError(t, err) assert.NoError(t, err)
} }
@ -197,23 +139,7 @@ func TestHTTPClient_Write_Errors(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
lp := []byte("cpu value=99\n") lp := []byte("cpu value=99\n")
n, err := client.Write(lp) err = client.WriteStream(bytes.NewReader(lp))
assert.Equal(t, 0, n)
assert.Error(t, err)
n, err = client.WriteStream(bytes.NewReader(lp), 13)
assert.Equal(t, 0, n)
assert.Error(t, err)
wp := WriteParams{
Database: "override",
}
n, err = client.WriteWithParams(lp, wp)
assert.Equal(t, 0, n)
assert.Error(t, err)
n, err = client.WriteStreamWithParams(bytes.NewReader(lp), 13, wp)
assert.Equal(t, 0, n)
assert.Error(t, err) assert.Error(t, err)
} }
@ -373,3 +299,37 @@ func TestGzipCompression(t *testing.T) {
assert.Equal(t, []byte(influxLine), uncompressed.Bytes()) assert.Equal(t, []byte(influxLine), uncompressed.Bytes())
} }
func TestHTTPClient_PathPrefix(t *testing.T) {
prefix := "/some/random/prefix"
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case prefix + "/write":
w.WriteHeader(http.StatusNoContent)
w.Header().Set("Content-Type", "application/json")
case prefix + "/query":
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}]}`)
default:
w.WriteHeader(http.StatusNotFound)
msg := fmt.Sprintf("Path not found: %s", r.URL.Path)
fmt.Fprintln(w, msg)
}
}))
defer ts.Close()
config := HTTPConfig{
URL: ts.URL + prefix,
}
wp := WriteParams{
Database: "test",
}
client, err := NewHTTP(config, wp)
defer client.Close()
assert.NoError(t, err)
err = client.Query("CREATE DATABASE test")
assert.NoError(t, err)
err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n")))
assert.NoError(t, err)
}

View File

@ -1,7 +1,6 @@
package client package client
import ( import (
"bytes"
"fmt" "fmt"
"io" "io"
"log" "log"
@ -62,18 +61,8 @@ func (c *udpClient) Query(command string) error {
return nil return nil
} }
// Write will send the byte stream to the given UDP client endpoint
func (c *udpClient) Write(b []byte) (int, error) {
return c.WriteStream(bytes.NewReader(b), -1)
}
// WriteWithParams are ignored by the UDP client, will forward to WriteStream
func (c *udpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
return c.WriteStream(bytes.NewReader(b), -1)
}
// WriteStream will send the provided data through to the client, contentLength is ignored by the UDP client // WriteStream will send the provided data through to the client, contentLength is ignored by the UDP client
func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) { func (c *udpClient) WriteStream(r io.Reader) error {
var totaln int var totaln int
for { for {
nR, err := r.Read(c.buffer) nR, err := r.Read(c.buffer)
@ -81,14 +70,14 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
break break
} }
if err != io.EOF && err != nil { if err != io.EOF && err != nil {
return totaln, err return err
} }
if c.buffer[nR-1] == uint8('\n') { if c.buffer[nR-1] == uint8('\n') {
nW, err := c.conn.Write(c.buffer[0:nR]) nW, err := c.conn.Write(c.buffer[0:nR])
totaln += nW totaln += nW
if err != nil { if err != nil {
return totaln, err return err
} }
} else { } else {
log.Printf("E! Could not fit point into UDP payload; dropping") log.Printf("E! Could not fit point into UDP payload; dropping")
@ -99,7 +88,7 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
break break
} }
if err != io.EOF && err != nil { if err != io.EOF && err != nil {
return totaln, err return err
} }
if c.buffer[nR-1] == uint8('\n') { if c.buffer[nR-1] == uint8('\n') {
break break
@ -107,13 +96,7 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
} }
} }
} }
return totaln, nil return nil
}
// WriteStreamWithParams will forward the stream to the client backend, contentLength is ignored by the UDP client
// write params are ignored by the UDP client
func (c *udpClient) WriteStreamWithParams(r io.Reader, contentLength int, wp WriteParams) (int, error) {
return c.WriteStream(r, -1)
} }
// Close will terminate the provided client connection // Close will terminate the provided client connection

View File

@ -9,7 +9,6 @@ import (
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestUDPClient(t *testing.T) { func TestUDPClient(t *testing.T) {
@ -65,43 +64,6 @@ func TestUDPClient_Write(t *testing.T) {
} }
}() }()
// test sending simple metric
n, err := client.Write([]byte("cpu value=99\n"))
assert.Equal(t, n, 13)
assert.NoError(t, err)
pkt := <-packets
assert.Equal(t, "cpu value=99\n", pkt)
wp := WriteParams{}
//
// Using WriteStream() & a metric.Reader:
config3 := UDPConfig{
URL: "udp://localhost:8199",
PayloadSize: 40,
}
client3, err := NewUDP(config3)
assert.NoError(t, err)
now := time.Unix(1484142942, 0)
m1, _ := metric.New("test", map[string]string{},
map[string]interface{}{"value": 1.1}, now)
m2, _ := metric.New("test", map[string]string{},
map[string]interface{}{"value": 1.1}, now)
m3, _ := metric.New("test", map[string]string{},
map[string]interface{}{"value": 1.1}, now)
ms := []telegraf.Metric{m1, m2, m3}
mReader := metric.NewReader(ms)
n, err = client3.WriteStreamWithParams(mReader, 10, wp)
// 3 metrics at 35 bytes each (including the newline)
assert.Equal(t, 105, n)
assert.NoError(t, err)
pkt = <-packets
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
pkt = <-packets
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
pkt = <-packets
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
assert.NoError(t, client.Close()) assert.NoError(t, client.Close())
config = UDPConfig{ config = UDPConfig{
@ -112,17 +74,15 @@ func TestUDPClient_Write(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
ts := time.Unix(1484142943, 0) ts := time.Unix(1484142943, 0)
m1, _ = metric.New("test", map[string]string{}, m1, _ := metric.New("test", map[string]string{},
map[string]interface{}{"this_is_a_very_long_field_name": 1.1}, ts) map[string]interface{}{"this_is_a_very_long_field_name": 1.1}, ts)
m2, _ = metric.New("test", map[string]string{}, m2, _ := metric.New("test", map[string]string{},
map[string]interface{}{"value": 1.1}, ts) map[string]interface{}{"value": 1.1}, ts)
ms = []telegraf.Metric{m1, m2} ms := []telegraf.Metric{m1, m2}
reader := metric.NewReader(ms) reader := metric.NewReader(ms)
n, err = client4.WriteStream(reader, 0) err = client4.WriteStream(reader)
assert.NoError(t, err) assert.NoError(t, err)
require.Equal(t, 35, n) pkt := <-packets
assert.NoError(t, err)
pkt = <-packets
assert.Equal(t, "test value=1.1 1484142943000000000\n", pkt) assert.Equal(t, "test value=1.1 1484142943000000000\n", pkt)
assert.NoError(t, client4.Close()) assert.NoError(t, client4.Close())

View File

@ -185,12 +185,6 @@ func (i *InfluxDB) Description() string {
// Write will choose a random server in the cluster to write to until a successful write // Write will choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error. // occurs, logging each unsuccessful. If all servers fail, return error.
func (i *InfluxDB) Write(metrics []telegraf.Metric) error { func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
bufsize := 0
for _, m := range metrics {
bufsize += m.Len()
}
r := metric.NewReader(metrics) r := metric.NewReader(metrics)
// This will get set to nil if a successful write occurs // This will get set to nil if a successful write occurs
@ -198,7 +192,7 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
p := rand.Perm(len(i.clients)) p := rand.Perm(len(i.clients))
for _, n := range p { for _, n := range p {
if _, e := i.clients[n].WriteStream(r, bufsize); e != nil { if e := i.clients[n].WriteStream(r); e != nil {
// If the database was not found, try to recreate it: // If the database was not found, try to recreate it:
if strings.Contains(e.Error(), "database not found") { if strings.Contains(e.Error(), "database not found") {
errc := i.clients[n].Query(fmt.Sprintf(`CREATE DATABASE "%s"`, qiReplacer.Replace(i.Database))) errc := i.clients[n].Query(fmt.Sprintf(`CREATE DATABASE "%s"`, qiReplacer.Replace(i.Database)))