Improve the InfluxDB through-put performance

This changes the current use of the InfluxDB client to instead use a
baked-in client that uses the fasthttp library.

This allows for significantly smaller allocations, the re-use of http
body buffers, and the re-use of the actual bytes of the line-protocol
metric representations.
This commit is contained in:
Cameron Sparr
2016-12-04 20:18:13 +00:00
parent 168270ea5f
commit 4a5d313693
13 changed files with 1735 additions and 85 deletions

View File

@@ -300,6 +300,9 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
}
func (h *HTTPListener) parse(b []byte, t time.Time) error {
if !bytes.HasSuffix(b, []byte("\n")) {
b = append(b, '\n')
}
metrics, err := h.parser.ParseWithDefaultTime(b, t)
for _, m := range metrics {

View File

@@ -0,0 +1,22 @@
package client
import "io"
type Client interface {
Query(command string) error
Write(b []byte) (int, error)
WriteWithParams(b []byte, params WriteParams) (int, error)
WriteStream(b io.Reader, contentLength int) (int, error)
WriteStreamWithParams(b io.Reader, contentLength int, params WriteParams) (int, error)
Close() error
}
type WriteParams struct {
Database string
RetentionPolicy string
Precision string
Consistency string
}

View File

@@ -0,0 +1,258 @@
package client
import (
"crypto/tls"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net/url"
"time"
"github.com/valyala/fasthttp"
)
var (
defaultRequestTimeout = time.Second * 5
)
//
func NewHTTP(config HTTPConfig, defaultWP WriteParams) (Client, error) {
// validate required parameters:
if len(config.URL) == 0 {
return nil, fmt.Errorf("config.URL is required to create an HTTP client")
}
if len(defaultWP.Database) == 0 {
return nil, fmt.Errorf("A default database is required to create an HTTP client")
}
// set defaults:
if config.Timeout == 0 {
config.Timeout = defaultRequestTimeout
}
// parse URL:
u, err := url.Parse(config.URL)
if err != nil {
return nil, fmt.Errorf("error parsing config.URL: %s", err)
}
if u.Scheme != "http" && u.Scheme != "https" {
return nil, fmt.Errorf("config.URL scheme must be http(s), got %s", u.Scheme)
}
wu := writeURL(u, defaultWP)
return &httpClient{
writeURL: []byte(wu),
config: config,
url: u,
client: &fasthttp.Client{
TLSConfig: config.TLSConfig,
},
}, nil
}
type HTTPConfig struct {
// URL should be of the form "http://host:port" (REQUIRED)
URL string
// UserAgent sets the User-Agent header.
UserAgent string
// Timeout is the time to wait for a response to each HTTP request (writes
// and queries).
Timeout time.Duration
// Username is the basic auth username for the server.
Username string
// Password is the basic auth password for the server.
Password string
// TLSConfig is the tls auth settings to use for each request.
TLSConfig *tls.Config
// Gzip, if true, compresses each payload using gzip.
// TODO
// Gzip bool
}
// Response represents a list of statement results.
type Response struct {
// ignore Results:
Results []interface{} `json:"-"`
Err string `json:"error,omitempty"`
}
// Error returns the first error from any statement.
// Returns nil if no errors occurred on any statements.
func (r *Response) Error() error {
if r.Err != "" {
return fmt.Errorf(r.Err)
}
return nil
}
type httpClient struct {
writeURL []byte
config HTTPConfig
client *fasthttp.Client
url *url.URL
}
func (c *httpClient) Query(command string) error {
req := c.makeRequest()
req.Header.SetRequestURI(queryURL(c.url, command))
return c.doRequest(req, fasthttp.StatusOK)
}
func (c *httpClient) Write(b []byte) (int, error) {
req := c.makeWriteRequest(len(b), c.writeURL)
req.SetBody(b)
err := c.doRequest(req, fasthttp.StatusNoContent)
if err == nil {
return len(b), nil
}
return 0, err
}
func (c *httpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
req := c.makeWriteRequest(len(b), []byte(writeURL(c.url, wp)))
req.SetBody(b)
err := c.doRequest(req, fasthttp.StatusNoContent)
if err == nil {
return len(b), nil
}
return 0, err
}
func (c *httpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
req := c.makeWriteRequest(contentLength, c.writeURL)
req.SetBodyStream(r, contentLength)
err := c.doRequest(req, fasthttp.StatusNoContent)
if err == nil {
return contentLength, nil
}
return 0, err
}
func (c *httpClient) WriteStreamWithParams(
r io.Reader,
contentLength int,
wp WriteParams,
) (int, error) {
req := c.makeWriteRequest(contentLength, []byte(writeURL(c.url, wp)))
req.SetBodyStream(r, contentLength)
err := c.doRequest(req, fasthttp.StatusNoContent)
if err == nil {
return contentLength, nil
}
return 0, err
}
func (c *httpClient) doRequest(
req *fasthttp.Request,
expectedCode int,
) error {
resp := fasthttp.AcquireResponse()
err := c.client.DoTimeout(req, resp, c.config.Timeout)
code := resp.StatusCode()
// If it's a "no content" response, then release and return nil
if code == fasthttp.StatusNoContent {
fasthttp.ReleaseResponse(resp)
fasthttp.ReleaseRequest(req)
return nil
}
// not a "no content" response, so parse the result:
var response Response
decErr := json.Unmarshal(resp.Body(), &response)
// If we got a JSON decode error, send that back
if decErr != nil {
err = fmt.Errorf("Unable to decode json: received status code %d err: %s", code, decErr)
}
// Unexpected response code OR error in JSON response body overrides
// a JSON decode error:
if code != expectedCode || response.Error() != nil {
err = fmt.Errorf("Response Error: Status Code [%d], expected [%d], [%v]",
code, expectedCode, response.Error())
}
fasthttp.ReleaseResponse(resp)
fasthttp.ReleaseRequest(req)
return err
}
func (c *httpClient) makeWriteRequest(
contentLength int,
writeURL []byte,
) *fasthttp.Request {
req := c.makeRequest()
req.Header.SetContentLength(contentLength)
req.Header.SetRequestURIBytes(writeURL)
// TODO
// if gzip {
// req.Header.SetBytesKV([]byte("Content-Encoding"), []byte("gzip"))
// }
return req
}
func (c *httpClient) makeRequest() *fasthttp.Request {
req := fasthttp.AcquireRequest()
req.Header.SetContentTypeBytes([]byte("text/plain"))
req.Header.SetMethodBytes([]byte("POST"))
req.Header.SetUserAgent(c.config.UserAgent)
if c.config.Username != "" && c.config.Password != "" {
req.Header.Set("Authorization", "Basic "+basicAuth(c.config.Username, c.config.Password))
}
return req
}
func (c *httpClient) Close() error {
// Nothing to do.
return nil
}
func writeURL(u *url.URL, wp WriteParams) string {
params := url.Values{}
params.Set("db", wp.Database)
if wp.RetentionPolicy != "" {
params.Set("rp", wp.RetentionPolicy)
}
if wp.Precision != "n" && wp.Precision != "" {
params.Set("precision", wp.Precision)
}
if wp.Consistency != "one" && wp.Consistency != "" {
params.Set("consistency", wp.Consistency)
}
u.RawQuery = params.Encode()
u.Path = "write"
return u.String()
}
func queryURL(u *url.URL, command string) string {
params := url.Values{}
params.Set("q", command)
u.RawQuery = params.Encode()
u.Path = "query"
return u.String()
}
// See 2 (end of page 4) http://www.ietf.org/rfc/rfc2617.txt
// "To receive authorization, the httpClient sends the userid and password,
// separated by a single colon (":") character, within a base64
// encoded string in the credentials."
// It is not meant to be urlencoded.
func basicAuth(username, password string) string {
auth := username + ":" + password
return base64.StdEncoding.EncodeToString([]byte(auth))
}

View File

@@ -0,0 +1,343 @@
package client
import (
"bytes"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"github.com/stretchr/testify/assert"
)
func TestHTTPClient_Write(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
// test form values:
if r.FormValue("db") != "test" {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"wrong db name"}`)
}
if r.FormValue("rp") != "policy" {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"wrong rp name"}`)
}
if r.FormValue("precision") != "ns" {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"wrong precision"}`)
}
if r.FormValue("consistency") != "all" {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"wrong consistency"}`)
}
// test that user agent is set properly
if r.UserAgent() != "test-agent" {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"wrong agent name"}`)
}
// test basic auth params
user, pass, ok := r.BasicAuth()
if !ok {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"basic auth not set"}`)
}
if user != "test-user" || pass != "test-password" {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"basic auth incorrect"}`)
}
// Validate Content-Length Header
if r.ContentLength != 13 {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
msg := fmt.Sprintf(`{"results":[{}],"error":"Content-Length: expected [13], got [%d]"}`, r.ContentLength)
fmt.Fprintln(w, msg)
}
// Validate the request body:
buf := make([]byte, 100)
n, _ := r.Body.Read(buf)
expected := "cpu value=99"
got := string(buf[0 : n-1])
if expected != got {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
msg := fmt.Sprintf(`{"results":[{}],"error":"expected [%s], got [%s]"}`, expected, got)
fmt.Fprintln(w, msg)
}
w.WriteHeader(http.StatusNoContent)
w.Header().Set("Content-Type", "application/json")
case "/query":
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}]}`)
}
}))
defer ts.Close()
config := HTTPConfig{
URL: ts.URL,
UserAgent: "test-agent",
Username: "test-user",
Password: "test-password",
}
wp := WriteParams{
Database: "test",
RetentionPolicy: "policy",
Precision: "ns",
Consistency: "all",
}
client, err := NewHTTP(config, wp)
defer client.Close()
assert.NoError(t, err)
n, err := client.Write([]byte("cpu value=99\n"))
assert.Equal(t, 13, n)
assert.NoError(t, err)
_, err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n")), 13)
assert.NoError(t, err)
}
func TestHTTPClient_WriteParamsOverride(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
// test that database is set properly
if r.FormValue("db") != "override" {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"wrong db name"}`)
}
// Validate the request body:
buf := make([]byte, 100)
n, _ := r.Body.Read(buf)
expected := "cpu value=99"
got := string(buf[0 : n-1])
if expected != got {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
msg := fmt.Sprintf(`{"results":[{}],"error":"expected [%s], got [%s]"}`, expected, got)
fmt.Fprintln(w, msg)
}
w.WriteHeader(http.StatusNoContent)
w.Header().Set("Content-Type", "application/json")
case "/query":
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}]}`)
}
}))
defer ts.Close()
config := HTTPConfig{
URL: ts.URL,
}
defaultWP := WriteParams{
Database: "test",
}
client, err := NewHTTP(config, defaultWP)
defer client.Close()
assert.NoError(t, err)
// test that WriteWithParams overrides the default write params
wp := WriteParams{
Database: "override",
}
n, err := client.WriteWithParams([]byte("cpu value=99\n"), wp)
assert.Equal(t, 13, n)
assert.NoError(t, err)
_, err = client.WriteStreamWithParams(bytes.NewReader([]byte("cpu value=99\n")), 13, wp)
assert.NoError(t, err)
}
func TestHTTPClient_Write_Errors(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
w.WriteHeader(http.StatusTeapot)
case "/query":
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}]}`)
}
}))
defer ts.Close()
config := HTTPConfig{
URL: ts.URL,
}
defaultWP := WriteParams{
Database: "test",
}
client, err := NewHTTP(config, defaultWP)
defer client.Close()
assert.NoError(t, err)
lp := []byte("cpu value=99\n")
n, err := client.Write(lp)
assert.Equal(t, 0, n)
assert.Error(t, err)
n, err = client.WriteStream(bytes.NewReader(lp), 13)
assert.Equal(t, 0, n)
assert.Error(t, err)
wp := WriteParams{
Database: "override",
}
n, err = client.WriteWithParams(lp, wp)
assert.Equal(t, 0, n)
assert.Error(t, err)
n, err = client.WriteStreamWithParams(bytes.NewReader(lp), 13, wp)
assert.Equal(t, 0, n)
assert.Error(t, err)
}
func TestNewHTTPErrors(t *testing.T) {
// No URL:
config := HTTPConfig{}
defaultWP := WriteParams{
Database: "test",
}
client, err := NewHTTP(config, defaultWP)
assert.Error(t, err)
assert.Nil(t, client)
// No Database:
config = HTTPConfig{
URL: "http://localhost:8086",
}
defaultWP = WriteParams{}
client, err = NewHTTP(config, defaultWP)
assert.Nil(t, client)
assert.Error(t, err)
// Invalid URL:
config = HTTPConfig{
URL: "http://192.168.0.%31:8080/",
}
defaultWP = WriteParams{
Database: "test",
}
client, err = NewHTTP(config, defaultWP)
assert.Nil(t, client)
assert.Error(t, err)
// Invalid URL scheme:
config = HTTPConfig{
URL: "mailto://localhost:8086",
}
defaultWP = WriteParams{
Database: "test",
}
client, err = NewHTTP(config, defaultWP)
assert.Nil(t, client)
assert.Error(t, err)
}
func TestHTTPClient_Query(t *testing.T) {
command := "CREATE DATABASE test"
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
w.WriteHeader(http.StatusNoContent)
case "/query":
// validate the create database command is correct
got := r.FormValue("q")
if got != command {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
msg := fmt.Sprintf(`{"results":[{}],"error":"got %s, expected %s"}`, got, command)
fmt.Fprintln(w, msg)
}
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}]}`)
}
}))
defer ts.Close()
config := HTTPConfig{
URL: ts.URL,
}
defaultWP := WriteParams{
Database: "test",
}
client, err := NewHTTP(config, defaultWP)
defer client.Close()
assert.NoError(t, err)
err = client.Query(command)
assert.NoError(t, err)
}
func TestHTTPClient_Query_ResponseError(t *testing.T) {
command := "CREATE DATABASE test"
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
w.WriteHeader(http.StatusNoContent)
case "/query":
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
msg := fmt.Sprintf(`{"results":[{}],"error":"couldnt create database"}`)
fmt.Fprintln(w, msg)
}
}))
defer ts.Close()
config := HTTPConfig{
URL: ts.URL,
}
defaultWP := WriteParams{
Database: "test",
}
client, err := NewHTTP(config, defaultWP)
defer client.Close()
assert.NoError(t, err)
err = client.Query(command)
assert.Error(t, err)
}
func TestHTTPClient_Query_JSONDecodeError(t *testing.T) {
command := "CREATE DATABASE test"
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
w.WriteHeader(http.StatusNoContent)
case "/query":
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
// write JSON missing a ']'
msg := fmt.Sprintf(`{"results":[{}}`)
fmt.Fprintln(w, msg)
}
}))
defer ts.Close()
config := HTTPConfig{
URL: ts.URL,
}
defaultWP := WriteParams{
Database: "test",
}
client, err := NewHTTP(config, defaultWP)
defer client.Close()
assert.NoError(t, err)
err = client.Query(command)
assert.Error(t, err)
assert.Contains(t, err.Error(), "json")
}

View File

@@ -0,0 +1,99 @@
package client
import (
"bytes"
"fmt"
"io"
"net"
"net/url"
)
const (
// UDPPayloadSize is a reasonable default payload size for UDP packets that
// could be travelling over the internet.
UDPPayloadSize = 512
)
// UDPConfig is the config data needed to create a UDP Client
type UDPConfig struct {
// URL should be of the form "udp://host:port"
// or "udp://[ipv6-host%zone]:port".
URL string
// PayloadSize is the maximum size of a UDP client message, optional
// Tune this based on your network. Defaults to UDPPayloadSize.
PayloadSize int
}
func NewUDP(config UDPConfig) (Client, error) {
p, err := url.Parse(config.URL)
if err != nil {
return nil, fmt.Errorf("Error parsing UDP url [%s]: %s", config.URL, err)
}
udpAddr, err := net.ResolveUDPAddr("udp", p.Host)
if err != nil {
return nil, fmt.Errorf("Error resolving UDP Address [%s]: %s", p.Host, err)
}
conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
return nil, fmt.Errorf("Error dialing UDP address [%s]: %s",
udpAddr.String(), err)
}
size := config.PayloadSize
if size == 0 {
size = UDPPayloadSize
}
buf := make([]byte, size)
return &udpClient{conn: conn, buffer: buf}, nil
}
type udpClient struct {
conn *net.UDPConn
buffer []byte
}
func (c *udpClient) Query(command string) error {
return nil
}
func (c *udpClient) Write(b []byte) (int, error) {
return c.WriteStream(bytes.NewReader(b), -1)
}
// write params are ignored by the UDP client
func (c *udpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
return c.WriteStream(bytes.NewReader(b), -1)
}
// contentLength is ignored by the UDP client.
func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
var totaln int
for {
nR, err := r.Read(c.buffer)
if nR == 0 {
break
}
if err != io.EOF && err != nil {
return totaln, err
}
nW, err := c.conn.Write(c.buffer[0:nR])
totaln += nW
if err != nil {
return totaln, err
}
}
return totaln, nil
}
// contentLength is ignored by the UDP client.
// write params are ignored by the UDP client
func (c *udpClient) WriteStreamWithParams(r io.Reader, contentLength int, wp WriteParams) (int, error) {
return c.WriteStream(r, -1)
}
func (c *udpClient) Close() error {
return c.conn.Close()
}

View File

@@ -0,0 +1,163 @@
package client
import (
"bytes"
"net"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert"
)
func TestUDPClient(t *testing.T) {
config := UDPConfig{
URL: "udp://localhost:8089",
}
client, err := NewUDP(config)
assert.NoError(t, err)
err = client.Query("ANY QUERY RETURNS NIL")
assert.NoError(t, err)
assert.NoError(t, client.Close())
}
func TestNewUDPClient_Errors(t *testing.T) {
// url.Parse Error
config := UDPConfig{
URL: "udp://localhost%35:8089",
}
_, err := NewUDP(config)
assert.Error(t, err)
// ResolveUDPAddr Error
config = UDPConfig{
URL: "udp://localhost:999999",
}
_, err = NewUDP(config)
assert.Error(t, err)
}
func TestUDPClient_Write(t *testing.T) {
config := UDPConfig{
URL: "udp://localhost:8199",
}
client, err := NewUDP(config)
assert.NoError(t, err)
packets := make(chan string, 100)
address, err := net.ResolveUDPAddr("udp", "localhost:8199")
assert.NoError(t, err)
listener, err := net.ListenUDP("udp", address)
defer listener.Close()
assert.NoError(t, err)
go func() {
buf := make([]byte, 200)
for {
n, _, err := listener.ReadFromUDP(buf)
if err != nil {
packets <- err.Error()
}
packets <- string(buf[0:n])
}
}()
// test sending simple metric
time.Sleep(time.Second)
n, err := client.Write([]byte("cpu value=99\n"))
assert.Equal(t, n, 13)
assert.NoError(t, err)
pkt := <-packets
assert.Equal(t, "cpu value=99\n", pkt)
metrics := `cpu value=99
cpu value=55
cpu value=44
cpu value=101
cpu value=91
cpu value=92
`
// test sending packet with 6 metrics in a stream.
reader := bytes.NewReader([]byte(metrics))
// contentLength is ignored:
n, err = client.WriteStream(reader, 10)
assert.Equal(t, n, len(metrics))
assert.NoError(t, err)
pkt = <-packets
assert.Equal(t, "cpu value=99\ncpu value=55\ncpu value=44\ncpu value=101\ncpu value=91\ncpu value=92\n", pkt)
//
// Test that UDP packets get broken up properly:
config2 := UDPConfig{
URL: "udp://localhost:8199",
PayloadSize: 25,
}
client2, err := NewUDP(config2)
assert.NoError(t, err)
wp := WriteParams{}
//
// Using Write():
buf := []byte(metrics)
n, err = client2.WriteWithParams(buf, wp)
assert.Equal(t, n, len(metrics))
assert.NoError(t, err)
pkt = <-packets
assert.Equal(t, "cpu value=99\ncpu value=55", pkt)
pkt = <-packets
assert.Equal(t, "\ncpu value=44\ncpu value=1", pkt)
pkt = <-packets
assert.Equal(t, "01\ncpu value=91\ncpu value", pkt)
pkt = <-packets
assert.Equal(t, "=92\n", pkt)
//
// Using WriteStream():
reader = bytes.NewReader([]byte(metrics))
n, err = client2.WriteStreamWithParams(reader, 10, wp)
assert.Equal(t, n, len(metrics))
assert.NoError(t, err)
pkt = <-packets
assert.Equal(t, "cpu value=99\ncpu value=55", pkt)
pkt = <-packets
assert.Equal(t, "\ncpu value=44\ncpu value=1", pkt)
pkt = <-packets
assert.Equal(t, "01\ncpu value=91\ncpu value", pkt)
pkt = <-packets
assert.Equal(t, "=92\n", pkt)
//
// Using WriteStream() & a metric.Reader:
config3 := UDPConfig{
URL: "udp://localhost:8199",
PayloadSize: 40,
}
client3, err := NewUDP(config3)
assert.NoError(t, err)
now := time.Unix(1484142942, 0)
m1, _ := metric.New("test", map[string]string{},
map[string]interface{}{"value": 1.1}, now)
m2, _ := metric.New("test", map[string]string{},
map[string]interface{}{"value": 1.1}, now)
m3, _ := metric.New("test", map[string]string{},
map[string]interface{}{"value": 1.1}, now)
ms := []telegraf.Metric{m1, m2, m3}
mReader := metric.NewReader(ms)
n, err = client3.WriteStreamWithParams(mReader, 10, wp)
// 3 metrics at 35 bytes each (including the newline)
assert.Equal(t, 105, n)
assert.NoError(t, err)
pkt = <-packets
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
pkt = <-packets
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
pkt = <-packets
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
assert.NoError(t, client.Close())
}

View File

@@ -1,19 +1,18 @@
package influxdb
import (
"errors"
"fmt"
"log"
"math/rand"
"net/url"
"strings"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
)
type InfluxDB struct {
@@ -41,7 +40,7 @@ type InfluxDB struct {
// Precision is only here for legacy support. It will be ignored.
Precision string
conns []client.Client
clients []client.Client
}
var sampleConfig = `
@@ -88,79 +87,56 @@ func (i *InfluxDB) Connect() error {
urls = append(urls, i.URL)
}
tlsCfg, err := internal.GetTLSConfig(
tlsConfig, err := internal.GetTLSConfig(
i.SSLCert, i.SSLKey, i.SSLCA, i.InsecureSkipVerify)
if err != nil {
return err
}
var conns []client.Client
for _, u := range urls {
switch {
case strings.HasPrefix(u, "udp"):
parsed_url, err := url.Parse(u)
if err != nil {
return err
}
if i.UDPPayload == 0 {
i.UDPPayload = client.UDPPayloadSize
}
c, err := client.NewUDPClient(client.UDPConfig{
Addr: parsed_url.Host,
config := client.UDPConfig{
URL: u,
PayloadSize: i.UDPPayload,
})
if err != nil {
return err
c, err := client.NewUDP(config)
}
conns = append(conns, c)
if err != nil {
return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err)
}
i.clients = append(i.clients, c)
default:
// If URL doesn't start with "udp", assume HTTP client
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: u,
Username: i.Username,
Password: i.Password,
UserAgent: i.UserAgent,
config := client.HTTPConfig{
URL: u,
Timeout: i.Timeout.Duration,
TLSConfig: tlsCfg,
})
if err != nil {
return err
TLSConfig: tlsConfig,
UserAgent: i.UserAgent,
}
wp := client.WriteParams{
Database: i.Database,
RetentionPolicy: i.RetentionPolicy,
Consistency: i.WriteConsistency,
}
c, err := client.NewHTTP(config, wp)
if err != nil {
return fmt.Errorf("Error creating HTTP Client [%s]: %s", u, err)
}
i.clients = append(i.clients, c)
err = createDatabase(c, i.Database)
err = c.Query("CREATE DATABASE " + i.Database)
if err != nil {
log.Println("E! Database creation failed: " + err.Error())
continue
}
conns = append(conns, c)
}
}
i.conns = conns
rand.Seed(time.Now().UnixNano())
return nil
}
func createDatabase(c client.Client, database string) error {
// Create Database if it doesn't exist
_, err := c.Query(client.Query{
Command: fmt.Sprintf("CREATE DATABASE \"%s\"", database),
})
return err
}
func (i *InfluxDB) Close() error {
var errS string
for j, _ := range i.conns {
if err := i.conns[j].Close(); err != nil {
errS += err.Error()
}
}
if errS != "" {
return fmt.Errorf("output influxdb close failed: %s", errS)
}
return nil
}
@@ -175,34 +151,24 @@ func (i *InfluxDB) Description() string {
// Choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error.
func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
if len(i.conns) == 0 {
err := i.Connect()
if err != nil {
return err
}
}
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: i.Database,
RetentionPolicy: i.RetentionPolicy,
WriteConsistency: i.WriteConsistency,
})
if err != nil {
return err
}
for _, metric := range metrics {
bp.AddPoint(metric.Point())
bufsize := 0
for _, m := range metrics {
bufsize += m.Len()
r := metric.NewReader(metrics)
}
// This will get set to nil if a successful write occurs
err = errors.New("Could not write to any InfluxDB server in cluster")
err := fmt.Errorf("Could not write to any InfluxDB server in cluster")
p := rand.Perm(len(i.conns))
p := rand.Perm(len(i.clients))
for _, n := range p {
if e := i.conns[n].Write(bp); e != nil {
// If the database was not found, try to recreate it
if _, e := i.clients[n].WriteStream(r, bufsize); e != nil {
// Log write failure:
log.Printf("E! InfluxDB Output Error: %s", e)
// If the database was not found, try to recreate it:
if strings.Contains(e.Error(), "database not found") {
if errc := createDatabase(i.conns[n], i.Database); errc != nil {
if errc := i.clients[n].Query("CREATE DATABASE " + i.Database); errc != nil {
log.Printf("E! Error: Database %s not found and failed to recreate\n",
i.Database)
}
@@ -225,10 +191,12 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
return err
}
func init() {
outputs.Add("influxdb", func() telegraf.Output {
return &InfluxDB{
Timeout: internal.Duration{Duration: time.Second * 5},
}
})
func newInflux() *InfluxDB {
return &InfluxDB{
Timeout: internal.Duration{Duration: time.Second * 5},
}
}
func init() {
outputs.Add("influxdb", func() telegraf.Output { return newInflux() })
}

View File

@@ -20,22 +20,123 @@ func TestUDPInflux(t *testing.T) {
require.NoError(t, err)
err = i.Write(testutil.MockMetrics())
require.NoError(t, err)
require.NoError(t, i.Close())
}
func TestHTTPInflux(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}]}`)
switch r.URL.Path {
case "/write":
// test that database is set properly
if r.FormValue("db") != "test" {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
}
// test that user agent is set properly
if r.UserAgent() != "telegraf" {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
}
w.WriteHeader(http.StatusNoContent)
w.Header().Set("Content-Type", "application/json")
case "/query":
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}]}`)
}
}))
defer ts.Close()
i := InfluxDB{
URLs: []string{ts.URL},
}
i := newInflux()
i.URLs = []string{ts.URL}
i.Database = "test"
i.UserAgent = "telegraf"
err := i.Connect()
require.NoError(t, err)
err = i.Write(testutil.MockMetrics())
require.NoError(t, err)
require.NoError(t, i.Close())
}
func TestUDPConnectError(t *testing.T) {
i := InfluxDB{
URLs: []string{"udp://foobar:8089"},
}
err := i.Connect()
require.Error(t, err)
i = InfluxDB{
URLs: []string{"udp://localhost:9999999"},
}
err = i.Connect()
require.Error(t, err)
}
func TestHTTPConnectError_InvalidURL(t *testing.T) {
i := InfluxDB{
URLs: []string{"http://foobar:8089"},
}
err := i.Connect()
require.Error(t, err)
i = InfluxDB{
URLs: []string{"http://localhost:9999999"},
}
err = i.Connect()
require.Error(t, err)
}
func TestHTTPConnectError_DatabaseCreateFail(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/query":
w.WriteHeader(http.StatusNotFound)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"test error"}`)
}
}))
defer ts.Close()
i := InfluxDB{
URLs: []string{ts.URL},
Database: "test",
}
// database creation errors do not return an error from Connect
// they are only logged.
err := i.Connect()
require.NoError(t, err)
require.NoError(t, i.Close())
}
func TestHTTPError_DatabaseNotFound(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
w.WriteHeader(http.StatusNotFound)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"database not found"}`)
case "/query":
w.WriteHeader(http.StatusNotFound)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"database not found"}`)
}
}))
defer ts.Close()
i := InfluxDB{
URLs: []string{ts.URL},
Database: "test",
}
err := i.Connect()
require.NoError(t, err)
err = i.Write(testutil.MockMetrics())
require.Error(t, err)
require.NoError(t, i.Close())
}