Use chunked transfer encoding in InfluxDB output (#3307)
This commit is contained in:
parent
6e1fa559a3
commit
cce40c515a
|
@ -4,13 +4,7 @@ import "io"
|
||||||
|
|
||||||
type Client interface {
|
type Client interface {
|
||||||
Query(command string) error
|
Query(command string) error
|
||||||
|
WriteStream(b io.Reader) error
|
||||||
Write(b []byte) (int, error)
|
|
||||||
WriteWithParams(b []byte, params WriteParams) (int, error)
|
|
||||||
|
|
||||||
WriteStream(b io.Reader, contentLength int) (int, error)
|
|
||||||
WriteStreamWithParams(b io.Reader, contentLength int, params WriteParams) (int, error)
|
|
||||||
|
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -137,60 +137,13 @@ func (c *httpClient) Query(command string) error {
|
||||||
return c.doRequest(req, http.StatusOK)
|
return c.doRequest(req, http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *httpClient) Write(b []byte) (int, error) {
|
func (c *httpClient) WriteStream(r io.Reader) error {
|
||||||
req, err := c.makeWriteRequest(bytes.NewReader(b), len(b), c.writeURL)
|
req, err := c.makeWriteRequest(r, c.writeURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.doRequest(req, http.StatusNoContent)
|
return c.doRequest(req, http.StatusNoContent)
|
||||||
if err == nil {
|
|
||||||
return len(b), nil
|
|
||||||
}
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *httpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
|
|
||||||
req, err := c.makeWriteRequest(bytes.NewReader(b), len(b), writeURL(c.url, wp))
|
|
||||||
if err != nil {
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
err = c.doRequest(req, http.StatusNoContent)
|
|
||||||
if err == nil {
|
|
||||||
return len(b), nil
|
|
||||||
}
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *httpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
|
|
||||||
req, err := c.makeWriteRequest(r, contentLength, c.writeURL)
|
|
||||||
if err != nil {
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
err = c.doRequest(req, http.StatusNoContent)
|
|
||||||
if err == nil {
|
|
||||||
return contentLength, nil
|
|
||||||
}
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *httpClient) WriteStreamWithParams(
|
|
||||||
r io.Reader,
|
|
||||||
contentLength int,
|
|
||||||
wp WriteParams,
|
|
||||||
) (int, error) {
|
|
||||||
req, err := c.makeWriteRequest(r, contentLength, writeURL(c.url, wp))
|
|
||||||
if err != nil {
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
err = c.doRequest(req, http.StatusNoContent)
|
|
||||||
if err == nil {
|
|
||||||
return contentLength, nil
|
|
||||||
}
|
|
||||||
return 0, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *httpClient) doRequest(
|
func (c *httpClient) doRequest(
|
||||||
|
@ -232,7 +185,6 @@ func (c *httpClient) doRequest(
|
||||||
|
|
||||||
func (c *httpClient) makeWriteRequest(
|
func (c *httpClient) makeWriteRequest(
|
||||||
body io.Reader,
|
body io.Reader,
|
||||||
contentLength int,
|
|
||||||
writeURL string,
|
writeURL string,
|
||||||
) (*http.Request, error) {
|
) (*http.Request, error) {
|
||||||
req, err := c.makeRequest(writeURL, body)
|
req, err := c.makeRequest(writeURL, body)
|
||||||
|
@ -241,8 +193,6 @@ func (c *httpClient) makeWriteRequest(
|
||||||
}
|
}
|
||||||
if c.config.ContentEncoding == "gzip" {
|
if c.config.ContentEncoding == "gzip" {
|
||||||
req.Header.Set("Content-Encoding", "gzip")
|
req.Header.Set("Content-Encoding", "gzip")
|
||||||
} else {
|
|
||||||
req.Header.Set("Content-Length", fmt.Sprint(contentLength))
|
|
||||||
}
|
}
|
||||||
return req, nil
|
return req, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -110,66 +110,8 @@ func TestHTTPClient_Write(t *testing.T) {
|
||||||
client, err := NewHTTP(config, wp)
|
client, err := NewHTTP(config, wp)
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
n, err := client.Write([]byte("cpu value=99\n"))
|
|
||||||
assert.Equal(t, 13, n)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
_, err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n")), 13)
|
err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n")))
|
||||||
assert.NoError(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestHTTPClient_WriteParamsOverride(t *testing.T) {
|
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
switch r.URL.Path {
|
|
||||||
case "/write":
|
|
||||||
// test that database is set properly
|
|
||||||
if r.FormValue("db") != "override" {
|
|
||||||
w.WriteHeader(http.StatusTeapot)
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
|
||||||
fmt.Fprintln(w, `{"results":[{}],"error":"wrong db name"}`)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Validate the request body:
|
|
||||||
buf := make([]byte, 100)
|
|
||||||
n, _ := r.Body.Read(buf)
|
|
||||||
expected := "cpu value=99"
|
|
||||||
got := string(buf[0 : n-1])
|
|
||||||
if expected != got {
|
|
||||||
w.WriteHeader(http.StatusTeapot)
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
|
||||||
msg := fmt.Sprintf(`{"results":[{}],"error":"expected [%s], got [%s]"}`, expected, got)
|
|
||||||
fmt.Fprintln(w, msg)
|
|
||||||
}
|
|
||||||
|
|
||||||
w.WriteHeader(http.StatusNoContent)
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
|
||||||
case "/query":
|
|
||||||
w.WriteHeader(http.StatusOK)
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
|
||||||
fmt.Fprintln(w, `{"results":[{}]}`)
|
|
||||||
}
|
|
||||||
}))
|
|
||||||
defer ts.Close()
|
|
||||||
|
|
||||||
config := HTTPConfig{
|
|
||||||
URL: ts.URL,
|
|
||||||
}
|
|
||||||
defaultWP := WriteParams{
|
|
||||||
Database: "test",
|
|
||||||
}
|
|
||||||
client, err := NewHTTP(config, defaultWP)
|
|
||||||
defer client.Close()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
// test that WriteWithParams overrides the default write params
|
|
||||||
wp := WriteParams{
|
|
||||||
Database: "override",
|
|
||||||
}
|
|
||||||
n, err := client.WriteWithParams([]byte("cpu value=99\n"), wp)
|
|
||||||
assert.Equal(t, 13, n)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
_, err = client.WriteStreamWithParams(bytes.NewReader([]byte("cpu value=99\n")), 13, wp)
|
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -197,23 +139,7 @@ func TestHTTPClient_Write_Errors(t *testing.T) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
lp := []byte("cpu value=99\n")
|
lp := []byte("cpu value=99\n")
|
||||||
n, err := client.Write(lp)
|
err = client.WriteStream(bytes.NewReader(lp))
|
||||||
assert.Equal(t, 0, n)
|
|
||||||
assert.Error(t, err)
|
|
||||||
|
|
||||||
n, err = client.WriteStream(bytes.NewReader(lp), 13)
|
|
||||||
assert.Equal(t, 0, n)
|
|
||||||
assert.Error(t, err)
|
|
||||||
|
|
||||||
wp := WriteParams{
|
|
||||||
Database: "override",
|
|
||||||
}
|
|
||||||
n, err = client.WriteWithParams(lp, wp)
|
|
||||||
assert.Equal(t, 0, n)
|
|
||||||
assert.Error(t, err)
|
|
||||||
|
|
||||||
n, err = client.WriteStreamWithParams(bytes.NewReader(lp), 13, wp)
|
|
||||||
assert.Equal(t, 0, n)
|
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -404,8 +330,6 @@ func TestHTTPClient_PathPrefix(t *testing.T) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
err = client.Query("CREATE DATABASE test")
|
err = client.Query("CREATE DATABASE test")
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
_, err = client.Write([]byte("cpu value=99\n"))
|
err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n")))
|
||||||
assert.NoError(t, err)
|
|
||||||
_, err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n")), 13)
|
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
|
@ -62,18 +61,8 @@ func (c *udpClient) Query(command string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write will send the byte stream to the given UDP client endpoint
|
|
||||||
func (c *udpClient) Write(b []byte) (int, error) {
|
|
||||||
return c.WriteStream(bytes.NewReader(b), -1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// WriteWithParams are ignored by the UDP client, will forward to WriteStream
|
|
||||||
func (c *udpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
|
|
||||||
return c.WriteStream(bytes.NewReader(b), -1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// WriteStream will send the provided data through to the client, contentLength is ignored by the UDP client
|
// WriteStream will send the provided data through to the client, contentLength is ignored by the UDP client
|
||||||
func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
|
func (c *udpClient) WriteStream(r io.Reader) error {
|
||||||
var totaln int
|
var totaln int
|
||||||
for {
|
for {
|
||||||
nR, err := r.Read(c.buffer)
|
nR, err := r.Read(c.buffer)
|
||||||
|
@ -81,14 +70,14 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if err != io.EOF && err != nil {
|
if err != io.EOF && err != nil {
|
||||||
return totaln, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.buffer[nR-1] == uint8('\n') {
|
if c.buffer[nR-1] == uint8('\n') {
|
||||||
nW, err := c.conn.Write(c.buffer[0:nR])
|
nW, err := c.conn.Write(c.buffer[0:nR])
|
||||||
totaln += nW
|
totaln += nW
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return totaln, err
|
return err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Printf("E! Could not fit point into UDP payload; dropping")
|
log.Printf("E! Could not fit point into UDP payload; dropping")
|
||||||
|
@ -99,7 +88,7 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if err != io.EOF && err != nil {
|
if err != io.EOF && err != nil {
|
||||||
return totaln, err
|
return err
|
||||||
}
|
}
|
||||||
if c.buffer[nR-1] == uint8('\n') {
|
if c.buffer[nR-1] == uint8('\n') {
|
||||||
break
|
break
|
||||||
|
@ -107,13 +96,7 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return totaln, nil
|
return nil
|
||||||
}
|
|
||||||
|
|
||||||
// WriteStreamWithParams will forward the stream to the client backend, contentLength is ignored by the UDP client
|
|
||||||
// write params are ignored by the UDP client
|
|
||||||
func (c *udpClient) WriteStreamWithParams(r io.Reader, contentLength int, wp WriteParams) (int, error) {
|
|
||||||
return c.WriteStream(r, -1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close will terminate the provided client connection
|
// Close will terminate the provided client connection
|
||||||
|
|
|
@ -9,7 +9,6 @@ import (
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestUDPClient(t *testing.T) {
|
func TestUDPClient(t *testing.T) {
|
||||||
|
@ -65,43 +64,6 @@ func TestUDPClient_Write(t *testing.T) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// test sending simple metric
|
|
||||||
n, err := client.Write([]byte("cpu value=99\n"))
|
|
||||||
assert.Equal(t, n, 13)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
pkt := <-packets
|
|
||||||
assert.Equal(t, "cpu value=99\n", pkt)
|
|
||||||
|
|
||||||
wp := WriteParams{}
|
|
||||||
//
|
|
||||||
// Using WriteStream() & a metric.Reader:
|
|
||||||
config3 := UDPConfig{
|
|
||||||
URL: "udp://localhost:8199",
|
|
||||||
PayloadSize: 40,
|
|
||||||
}
|
|
||||||
client3, err := NewUDP(config3)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
now := time.Unix(1484142942, 0)
|
|
||||||
m1, _ := metric.New("test", map[string]string{},
|
|
||||||
map[string]interface{}{"value": 1.1}, now)
|
|
||||||
m2, _ := metric.New("test", map[string]string{},
|
|
||||||
map[string]interface{}{"value": 1.1}, now)
|
|
||||||
m3, _ := metric.New("test", map[string]string{},
|
|
||||||
map[string]interface{}{"value": 1.1}, now)
|
|
||||||
ms := []telegraf.Metric{m1, m2, m3}
|
|
||||||
mReader := metric.NewReader(ms)
|
|
||||||
n, err = client3.WriteStreamWithParams(mReader, 10, wp)
|
|
||||||
// 3 metrics at 35 bytes each (including the newline)
|
|
||||||
assert.Equal(t, 105, n)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
pkt = <-packets
|
|
||||||
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
|
|
||||||
pkt = <-packets
|
|
||||||
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
|
|
||||||
pkt = <-packets
|
|
||||||
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
|
|
||||||
|
|
||||||
assert.NoError(t, client.Close())
|
assert.NoError(t, client.Close())
|
||||||
|
|
||||||
config = UDPConfig{
|
config = UDPConfig{
|
||||||
|
@ -112,17 +74,15 @@ func TestUDPClient_Write(t *testing.T) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
ts := time.Unix(1484142943, 0)
|
ts := time.Unix(1484142943, 0)
|
||||||
m1, _ = metric.New("test", map[string]string{},
|
m1, _ := metric.New("test", map[string]string{},
|
||||||
map[string]interface{}{"this_is_a_very_long_field_name": 1.1}, ts)
|
map[string]interface{}{"this_is_a_very_long_field_name": 1.1}, ts)
|
||||||
m2, _ = metric.New("test", map[string]string{},
|
m2, _ := metric.New("test", map[string]string{},
|
||||||
map[string]interface{}{"value": 1.1}, ts)
|
map[string]interface{}{"value": 1.1}, ts)
|
||||||
ms = []telegraf.Metric{m1, m2}
|
ms := []telegraf.Metric{m1, m2}
|
||||||
reader := metric.NewReader(ms)
|
reader := metric.NewReader(ms)
|
||||||
n, err = client4.WriteStream(reader, 0)
|
err = client4.WriteStream(reader)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
require.Equal(t, 35, n)
|
pkt := <-packets
|
||||||
assert.NoError(t, err)
|
|
||||||
pkt = <-packets
|
|
||||||
assert.Equal(t, "test value=1.1 1484142943000000000\n", pkt)
|
assert.Equal(t, "test value=1.1 1484142943000000000\n", pkt)
|
||||||
|
|
||||||
assert.NoError(t, client4.Close())
|
assert.NoError(t, client4.Close())
|
||||||
|
|
|
@ -183,12 +183,6 @@ func (i *InfluxDB) Description() string {
|
||||||
// Write will choose a random server in the cluster to write to until a successful write
|
// Write will choose a random server in the cluster to write to until a successful write
|
||||||
// occurs, logging each unsuccessful. If all servers fail, return error.
|
// occurs, logging each unsuccessful. If all servers fail, return error.
|
||||||
func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
bufsize := 0
|
|
||||||
for _, m := range metrics {
|
|
||||||
bufsize += m.Len()
|
|
||||||
}
|
|
||||||
|
|
||||||
r := metric.NewReader(metrics)
|
r := metric.NewReader(metrics)
|
||||||
|
|
||||||
// This will get set to nil if a successful write occurs
|
// This will get set to nil if a successful write occurs
|
||||||
|
@ -196,7 +190,7 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
p := rand.Perm(len(i.clients))
|
p := rand.Perm(len(i.clients))
|
||||||
for _, n := range p {
|
for _, n := range p {
|
||||||
if _, e := i.clients[n].WriteStream(r, bufsize); e != nil {
|
if e := i.clients[n].WriteStream(r); e != nil {
|
||||||
// If the database was not found, try to recreate it:
|
// If the database was not found, try to recreate it:
|
||||||
if strings.Contains(e.Error(), "database not found") {
|
if strings.Contains(e.Error(), "database not found") {
|
||||||
errc := i.clients[n].Query(fmt.Sprintf(`CREATE DATABASE "%s"`, qiReplacer.Replace(i.Database)))
|
errc := i.clients[n].Query(fmt.Sprintf(`CREATE DATABASE "%s"`, qiReplacer.Replace(i.Database)))
|
||||||
|
|
Loading…
Reference in New Issue