Use chunked transfer encoding in InfluxDB output (#3307)
This commit is contained in:
parent
852a419fa5
commit
bb448d5af7
|
@ -4,13 +4,7 @@ import "io"
|
|||
|
||||
type Client interface {
|
||||
Query(command string) error
|
||||
|
||||
Write(b []byte) (int, error)
|
||||
WriteWithParams(b []byte, params WriteParams) (int, error)
|
||||
|
||||
WriteStream(b io.Reader, contentLength int) (int, error)
|
||||
WriteStreamWithParams(b io.Reader, contentLength int, params WriteParams) (int, error)
|
||||
|
||||
WriteStream(b io.Reader) error
|
||||
Close() error
|
||||
}
|
||||
|
||||
|
|
|
@ -137,60 +137,13 @@ func (c *httpClient) Query(command string) error {
|
|||
return c.doRequest(req, http.StatusOK)
|
||||
}
|
||||
|
||||
func (c *httpClient) Write(b []byte) (int, error) {
|
||||
req, err := c.makeWriteRequest(bytes.NewReader(b), len(b), c.writeURL)
|
||||
func (c *httpClient) WriteStream(r io.Reader) error {
|
||||
req, err := c.makeWriteRequest(r, c.writeURL)
|
||||
if err != nil {
|
||||
return 0, nil
|
||||
return err
|
||||
}
|
||||
|
||||
err = c.doRequest(req, http.StatusNoContent)
|
||||
if err == nil {
|
||||
return len(b), nil
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
|
||||
func (c *httpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
|
||||
req, err := c.makeWriteRequest(bytes.NewReader(b), len(b), writeURL(c.url, wp))
|
||||
if err != nil {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
err = c.doRequest(req, http.StatusNoContent)
|
||||
if err == nil {
|
||||
return len(b), nil
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
|
||||
func (c *httpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
|
||||
req, err := c.makeWriteRequest(r, contentLength, c.writeURL)
|
||||
if err != nil {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
err = c.doRequest(req, http.StatusNoContent)
|
||||
if err == nil {
|
||||
return contentLength, nil
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
|
||||
func (c *httpClient) WriteStreamWithParams(
|
||||
r io.Reader,
|
||||
contentLength int,
|
||||
wp WriteParams,
|
||||
) (int, error) {
|
||||
req, err := c.makeWriteRequest(r, contentLength, writeURL(c.url, wp))
|
||||
if err != nil {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
err = c.doRequest(req, http.StatusNoContent)
|
||||
if err == nil {
|
||||
return contentLength, nil
|
||||
}
|
||||
return 0, err
|
||||
return c.doRequest(req, http.StatusNoContent)
|
||||
}
|
||||
|
||||
func (c *httpClient) doRequest(
|
||||
|
@ -232,7 +185,6 @@ func (c *httpClient) doRequest(
|
|||
|
||||
func (c *httpClient) makeWriteRequest(
|
||||
body io.Reader,
|
||||
contentLength int,
|
||||
writeURL string,
|
||||
) (*http.Request, error) {
|
||||
req, err := c.makeRequest(writeURL, body)
|
||||
|
@ -241,8 +193,6 @@ func (c *httpClient) makeWriteRequest(
|
|||
}
|
||||
if c.config.ContentEncoding == "gzip" {
|
||||
req.Header.Set("Content-Encoding", "gzip")
|
||||
} else {
|
||||
req.Header.Set("Content-Length", fmt.Sprint(contentLength))
|
||||
}
|
||||
return req, nil
|
||||
}
|
||||
|
|
|
@ -110,66 +110,8 @@ func TestHTTPClient_Write(t *testing.T) {
|
|||
client, err := NewHTTP(config, wp)
|
||||
defer client.Close()
|
||||
assert.NoError(t, err)
|
||||
n, err := client.Write([]byte("cpu value=99\n"))
|
||||
assert.Equal(t, 13, n)
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n")), 13)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestHTTPClient_WriteParamsOverride(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.URL.Path {
|
||||
case "/write":
|
||||
// test that database is set properly
|
||||
if r.FormValue("db") != "override" {
|
||||
w.WriteHeader(http.StatusTeapot)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprintln(w, `{"results":[{}],"error":"wrong db name"}`)
|
||||
}
|
||||
|
||||
// Validate the request body:
|
||||
buf := make([]byte, 100)
|
||||
n, _ := r.Body.Read(buf)
|
||||
expected := "cpu value=99"
|
||||
got := string(buf[0 : n-1])
|
||||
if expected != got {
|
||||
w.WriteHeader(http.StatusTeapot)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
msg := fmt.Sprintf(`{"results":[{}],"error":"expected [%s], got [%s]"}`, expected, got)
|
||||
fmt.Fprintln(w, msg)
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
case "/query":
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprintln(w, `{"results":[{}]}`)
|
||||
}
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
config := HTTPConfig{
|
||||
URL: ts.URL,
|
||||
}
|
||||
defaultWP := WriteParams{
|
||||
Database: "test",
|
||||
}
|
||||
client, err := NewHTTP(config, defaultWP)
|
||||
defer client.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// test that WriteWithParams overrides the default write params
|
||||
wp := WriteParams{
|
||||
Database: "override",
|
||||
}
|
||||
n, err := client.WriteWithParams([]byte("cpu value=99\n"), wp)
|
||||
assert.Equal(t, 13, n)
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = client.WriteStreamWithParams(bytes.NewReader([]byte("cpu value=99\n")), 13, wp)
|
||||
err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n")))
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
|
@ -197,23 +139,7 @@ func TestHTTPClient_Write_Errors(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
|
||||
lp := []byte("cpu value=99\n")
|
||||
n, err := client.Write(lp)
|
||||
assert.Equal(t, 0, n)
|
||||
assert.Error(t, err)
|
||||
|
||||
n, err = client.WriteStream(bytes.NewReader(lp), 13)
|
||||
assert.Equal(t, 0, n)
|
||||
assert.Error(t, err)
|
||||
|
||||
wp := WriteParams{
|
||||
Database: "override",
|
||||
}
|
||||
n, err = client.WriteWithParams(lp, wp)
|
||||
assert.Equal(t, 0, n)
|
||||
assert.Error(t, err)
|
||||
|
||||
n, err = client.WriteStreamWithParams(bytes.NewReader(lp), 13, wp)
|
||||
assert.Equal(t, 0, n)
|
||||
err = client.WriteStream(bytes.NewReader(lp))
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
|
@ -404,8 +330,6 @@ func TestHTTPClient_PathPrefix(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
err = client.Query("CREATE DATABASE test")
|
||||
assert.NoError(t, err)
|
||||
_, err = client.Write([]byte("cpu value=99\n"))
|
||||
assert.NoError(t, err)
|
||||
_, err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n")), 13)
|
||||
err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n")))
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
|
@ -62,18 +61,8 @@ func (c *udpClient) Query(command string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Write will send the byte stream to the given UDP client endpoint
|
||||
func (c *udpClient) Write(b []byte) (int, error) {
|
||||
return c.WriteStream(bytes.NewReader(b), -1)
|
||||
}
|
||||
|
||||
// WriteWithParams are ignored by the UDP client, will forward to WriteStream
|
||||
func (c *udpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
|
||||
return c.WriteStream(bytes.NewReader(b), -1)
|
||||
}
|
||||
|
||||
// WriteStream will send the provided data through to the client, contentLength is ignored by the UDP client
|
||||
func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
|
||||
func (c *udpClient) WriteStream(r io.Reader) error {
|
||||
var totaln int
|
||||
for {
|
||||
nR, err := r.Read(c.buffer)
|
||||
|
@ -81,14 +70,14 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
|
|||
break
|
||||
}
|
||||
if err != io.EOF && err != nil {
|
||||
return totaln, err
|
||||
return err
|
||||
}
|
||||
|
||||
if c.buffer[nR-1] == uint8('\n') {
|
||||
nW, err := c.conn.Write(c.buffer[0:nR])
|
||||
totaln += nW
|
||||
if err != nil {
|
||||
return totaln, err
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
log.Printf("E! Could not fit point into UDP payload; dropping")
|
||||
|
@ -99,7 +88,7 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
|
|||
break
|
||||
}
|
||||
if err != io.EOF && err != nil {
|
||||
return totaln, err
|
||||
return err
|
||||
}
|
||||
if c.buffer[nR-1] == uint8('\n') {
|
||||
break
|
||||
|
@ -107,13 +96,7 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
|
|||
}
|
||||
}
|
||||
}
|
||||
return totaln, nil
|
||||
}
|
||||
|
||||
// WriteStreamWithParams will forward the stream to the client backend, contentLength is ignored by the UDP client
|
||||
// write params are ignored by the UDP client
|
||||
func (c *udpClient) WriteStreamWithParams(r io.Reader, contentLength int, wp WriteParams) (int, error) {
|
||||
return c.WriteStream(r, -1)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close will terminate the provided client connection
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
"github.com/influxdata/telegraf/metric"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestUDPClient(t *testing.T) {
|
||||
|
@ -65,43 +64,6 @@ func TestUDPClient_Write(t *testing.T) {
|
|||
}
|
||||
}()
|
||||
|
||||
// test sending simple metric
|
||||
n, err := client.Write([]byte("cpu value=99\n"))
|
||||
assert.Equal(t, n, 13)
|
||||
assert.NoError(t, err)
|
||||
pkt := <-packets
|
||||
assert.Equal(t, "cpu value=99\n", pkt)
|
||||
|
||||
wp := WriteParams{}
|
||||
//
|
||||
// Using WriteStream() & a metric.Reader:
|
||||
config3 := UDPConfig{
|
||||
URL: "udp://localhost:8199",
|
||||
PayloadSize: 40,
|
||||
}
|
||||
client3, err := NewUDP(config3)
|
||||
assert.NoError(t, err)
|
||||
|
||||
now := time.Unix(1484142942, 0)
|
||||
m1, _ := metric.New("test", map[string]string{},
|
||||
map[string]interface{}{"value": 1.1}, now)
|
||||
m2, _ := metric.New("test", map[string]string{},
|
||||
map[string]interface{}{"value": 1.1}, now)
|
||||
m3, _ := metric.New("test", map[string]string{},
|
||||
map[string]interface{}{"value": 1.1}, now)
|
||||
ms := []telegraf.Metric{m1, m2, m3}
|
||||
mReader := metric.NewReader(ms)
|
||||
n, err = client3.WriteStreamWithParams(mReader, 10, wp)
|
||||
// 3 metrics at 35 bytes each (including the newline)
|
||||
assert.Equal(t, 105, n)
|
||||
assert.NoError(t, err)
|
||||
pkt = <-packets
|
||||
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
|
||||
pkt = <-packets
|
||||
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
|
||||
pkt = <-packets
|
||||
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
|
||||
|
||||
assert.NoError(t, client.Close())
|
||||
|
||||
config = UDPConfig{
|
||||
|
@ -112,17 +74,15 @@ func TestUDPClient_Write(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
|
||||
ts := time.Unix(1484142943, 0)
|
||||
m1, _ = metric.New("test", map[string]string{},
|
||||
m1, _ := metric.New("test", map[string]string{},
|
||||
map[string]interface{}{"this_is_a_very_long_field_name": 1.1}, ts)
|
||||
m2, _ = metric.New("test", map[string]string{},
|
||||
m2, _ := metric.New("test", map[string]string{},
|
||||
map[string]interface{}{"value": 1.1}, ts)
|
||||
ms = []telegraf.Metric{m1, m2}
|
||||
ms := []telegraf.Metric{m1, m2}
|
||||
reader := metric.NewReader(ms)
|
||||
n, err = client4.WriteStream(reader, 0)
|
||||
err = client4.WriteStream(reader)
|
||||
assert.NoError(t, err)
|
||||
require.Equal(t, 35, n)
|
||||
assert.NoError(t, err)
|
||||
pkt = <-packets
|
||||
pkt := <-packets
|
||||
assert.Equal(t, "test value=1.1 1484142943000000000\n", pkt)
|
||||
|
||||
assert.NoError(t, client4.Close())
|
||||
|
|
|
@ -183,12 +183,6 @@ func (i *InfluxDB) Description() string {
|
|||
// Write will choose a random server in the cluster to write to until a successful write
|
||||
// occurs, logging each unsuccessful. If all servers fail, return error.
|
||||
func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
||||
|
||||
bufsize := 0
|
||||
for _, m := range metrics {
|
||||
bufsize += m.Len()
|
||||
}
|
||||
|
||||
r := metric.NewReader(metrics)
|
||||
|
||||
// This will get set to nil if a successful write occurs
|
||||
|
@ -196,7 +190,7 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
|||
|
||||
p := rand.Perm(len(i.clients))
|
||||
for _, n := range p {
|
||||
if _, e := i.clients[n].WriteStream(r, bufsize); e != nil {
|
||||
if e := i.clients[n].WriteStream(r); e != nil {
|
||||
// If the database was not found, try to recreate it:
|
||||
if strings.Contains(e.Error(), "database not found") {
|
||||
errc := i.clients[n].Query(fmt.Sprintf(`CREATE DATABASE "%s"`, qiReplacer.Replace(i.Database)))
|
||||
|
|
Loading…
Reference in New Issue