diff --git a/CHANGELOG.md b/CHANGELOG.md index 58912b2fe..7ebea6184 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - [#2137](https://github.com/influxdata/telegraf/pull/2137): Added userstats to mysql input plugin. - [#2179](https://github.com/influxdata/telegraf/pull/2179): Added more InnoDB metric to MySQL plugin. +- [#2251](https://github.com/influxdata/telegraf/pull/2251): InfluxDB output: use own client for improved through-put and less allocations. ### Bugfixes diff --git a/Godeps b/Godeps index 83b9e4561..99606414e 100644 --- a/Godeps +++ b/Godeps @@ -50,8 +50,6 @@ github.com/shirou/gopsutil 1516eb9ddc5e61ba58874047a98f8b44b5e585e8 github.com/soniah/gosnmp 3fe3beb30fa9700988893c56a63b1df8e1b68c26 github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744 github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c -github.com/valyala/bytebufferpool e746df99fe4a3986f4d4f79e13c1e0117ce9c2f7 -github.com/valyala/fasthttp 2f4876aaf2b591786efc9b49f34b86ad44c25074 github.com/vjeantet/grok 83bfdfdfd1a8146795b28e547a8e3c8b28a466c2 github.com/wvanbergen/kafka bc265fedb9ff5b5c5d3c0fdcef4a819b3523d3ee github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8 diff --git a/plugins/inputs/http_listener/http_listener.go b/plugins/inputs/http_listener/http_listener.go index 05551a966..0f426f809 100644 --- a/plugins/inputs/http_listener/http_listener.go +++ b/plugins/inputs/http_listener/http_listener.go @@ -300,9 +300,6 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) { } func (h *HTTPListener) parse(b []byte, t time.Time) error { - if !bytes.HasSuffix(b, []byte("\n")) { - b = append(b, '\n') - } metrics, err := h.parser.ParseWithDefaultTime(b, t) for _, m := range metrics { diff --git a/plugins/outputs/influxdb/client/http.go b/plugins/outputs/influxdb/client/http.go index 68cc3e094..62ca1315b 100644 --- a/plugins/outputs/influxdb/client/http.go +++ b/plugins/outputs/influxdb/client/http.go @@ -1,15 +1,15 @@ package client import ( + "bytes" "crypto/tls" - "encoding/base64" "encoding/json" "fmt" "io" + "io/ioutil" + "net/http" "net/url" "time" - - "github.com/valyala/fasthttp" ) var ( @@ -40,13 +40,15 @@ func NewHTTP(config HTTPConfig, defaultWP WriteParams) (Client, error) { return nil, fmt.Errorf("config.URL scheme must be http(s), got %s", u.Scheme) } - wu := writeURL(u, defaultWP) return &httpClient{ - writeURL: []byte(wu), + writeURL: writeURL(u, defaultWP), config: config, url: u, - client: &fasthttp.Client{ - TLSConfig: config.TLSConfig, + client: &http.Client{ + Timeout: config.Timeout, + Transport: &http.Transport{ + TLSClientConfig: config.TLSConfig, + }, }, }, nil } @@ -58,8 +60,13 @@ type HTTPConfig struct { // UserAgent sets the User-Agent header. UserAgent string - // Timeout is the time to wait for a response to each HTTP request (writes - // and queries). + // Timeout specifies a time limit for requests made by this + // Client. The timeout includes connection time, any + // redirects, and reading the response body. The timer remains + // running after Get, Head, Post, or Do return and will + // interrupt reading of the Response.Body. + // + // A Timeout of zero means no timeout. Timeout time.Duration // Username is the basic auth username for the server. @@ -92,24 +99,27 @@ func (r *Response) Error() error { } type httpClient struct { - writeURL []byte + writeURL string config HTTPConfig - client *fasthttp.Client + client *http.Client url *url.URL } func (c *httpClient) Query(command string) error { - req := c.makeRequest() - req.Header.SetRequestURI(queryURL(c.url, command)) - - return c.doRequest(req, fasthttp.StatusOK) + req, err := c.makeRequest(queryURL(c.url, command), bytes.NewReader([]byte(""))) + if err != nil { + return err + } + return c.doRequest(req, http.StatusOK) } func (c *httpClient) Write(b []byte) (int, error) { - req := c.makeWriteRequest(len(b), c.writeURL) - req.SetBody(b) + req, err := c.makeWriteRequest(bytes.NewReader(b), len(b), c.writeURL) + if err != nil { + return 0, nil + } - err := c.doRequest(req, fasthttp.StatusNoContent) + err = c.doRequest(req, http.StatusNoContent) if err == nil { return len(b), nil } @@ -117,10 +127,12 @@ func (c *httpClient) Write(b []byte) (int, error) { } func (c *httpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) { - req := c.makeWriteRequest(len(b), []byte(writeURL(c.url, wp))) - req.SetBody(b) + req, err := c.makeWriteRequest(bytes.NewReader(b), len(b), writeURL(c.url, wp)) + if err != nil { + return 0, nil + } - err := c.doRequest(req, fasthttp.StatusNoContent) + err = c.doRequest(req, http.StatusNoContent) if err == nil { return len(b), nil } @@ -128,10 +140,12 @@ func (c *httpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) { } func (c *httpClient) WriteStream(r io.Reader, contentLength int) (int, error) { - req := c.makeWriteRequest(contentLength, c.writeURL) - req.SetBodyStream(r, contentLength) + req, err := c.makeWriteRequest(r, contentLength, c.writeURL) + if err != nil { + return 0, nil + } - err := c.doRequest(req, fasthttp.StatusNoContent) + err = c.doRequest(req, http.StatusNoContent) if err == nil { return contentLength, nil } @@ -143,10 +157,12 @@ func (c *httpClient) WriteStreamWithParams( contentLength int, wp WriteParams, ) (int, error) { - req := c.makeWriteRequest(contentLength, []byte(writeURL(c.url, wp))) - req.SetBodyStream(r, contentLength) + req, err := c.makeWriteRequest(r, contentLength, writeURL(c.url, wp)) + if err != nil { + return 0, nil + } - err := c.doRequest(req, fasthttp.StatusNoContent) + err = c.doRequest(req, http.StatusNoContent) if err == nil { return contentLength, nil } @@ -154,24 +170,27 @@ func (c *httpClient) WriteStreamWithParams( } func (c *httpClient) doRequest( - req *fasthttp.Request, + req *http.Request, expectedCode int, ) error { - resp := fasthttp.AcquireResponse() + resp, err := c.client.Do(req) + if err != nil { + return err + } - err := c.client.DoTimeout(req, resp, c.config.Timeout) - - code := resp.StatusCode() + code := resp.StatusCode // If it's a "no content" response, then release and return nil - if code == fasthttp.StatusNoContent { - fasthttp.ReleaseResponse(resp) - fasthttp.ReleaseRequest(req) + if code == http.StatusNoContent { return nil } // not a "no content" response, so parse the result: var response Response - decErr := json.Unmarshal(resp.Body(), &response) + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("Fatal error reading body: %s", err) + } + decErr := json.Unmarshal(body, &response) // If we got a JSON decode error, send that back if decErr != nil { @@ -184,35 +203,37 @@ func (c *httpClient) doRequest( code, expectedCode, response.Error()) } - fasthttp.ReleaseResponse(resp) - fasthttp.ReleaseRequest(req) - return err } func (c *httpClient) makeWriteRequest( + body io.Reader, contentLength int, - writeURL []byte, -) *fasthttp.Request { - req := c.makeRequest() - req.Header.SetContentLength(contentLength) - req.Header.SetRequestURIBytes(writeURL) + writeURL string, +) (*http.Request, error) { + req, err := c.makeRequest(writeURL, body) + if err != nil { + return nil, err + } + req.Header.Set("Content-Length", fmt.Sprint(contentLength)) // TODO // if gzip { - // req.Header.SetBytesKV([]byte("Content-Encoding"), []byte("gzip")) + // req.Header.Set("Content-Encoding", "gzip") // } - return req + return req, nil } -func (c *httpClient) makeRequest() *fasthttp.Request { - req := fasthttp.AcquireRequest() - req.Header.SetContentTypeBytes([]byte("text/plain")) - req.Header.SetMethodBytes([]byte("POST")) - req.Header.SetUserAgent(c.config.UserAgent) - if c.config.Username != "" && c.config.Password != "" { - req.Header.Set("Authorization", "Basic "+basicAuth(c.config.Username, c.config.Password)) +func (c *httpClient) makeRequest(uri string, body io.Reader) (*http.Request, error) { + req, err := http.NewRequest("POST", uri, body) + if err != nil { + return nil, err } - return req + req.Header.Set("Content-Type", "text/plain") + req.Header.Set("User-Agent", c.config.UserAgent) + if c.config.Username != "" && c.config.Password != "" { + req.SetBasicAuth(c.config.Username, c.config.Password) + } + return req, nil } func (c *httpClient) Close() error { @@ -246,13 +267,3 @@ func queryURL(u *url.URL, command string) string { u.Path = "query" return u.String() } - -// See 2 (end of page 4) http://www.ietf.org/rfc/rfc2617.txt -// "To receive authorization, the httpClient sends the userid and password, -// separated by a single colon (":") character, within a base64 -// encoded string in the credentials." -// It is not meant to be urlencoded. -func basicAuth(username, password string) string { - auth := username + ":" + password - return base64.StdEncoding.EncodeToString([]byte(auth)) -} diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index 06d8bd042..5a5899a60 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -99,8 +99,8 @@ func (i *InfluxDB) Connect() error { config := client.UDPConfig{ URL: u, PayloadSize: i.UDPPayload, - c, err := client.NewUDP(config) } + c, err := client.NewUDP(config) if err != nil { return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err) } @@ -154,8 +154,8 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { bufsize := 0 for _, m := range metrics { bufsize += m.Len() - r := metric.NewReader(metrics) } + r := metric.NewReader(metrics) // This will get set to nil if a successful write occurs err := fmt.Errorf("Could not write to any InfluxDB server in cluster") @@ -163,9 +163,6 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { p := rand.Perm(len(i.clients)) for _, n := range p { if _, e := i.clients[n].WriteStream(r, bufsize); e != nil { - // Log write failure: - log.Printf("E! InfluxDB Output Error: %s", e) - // If the database was not found, try to recreate it: if strings.Contains(e.Error(), "database not found") { if errc := i.clients[n].Query("CREATE DATABASE " + i.Database); errc != nil { diff --git a/plugins/outputs/influxdb/influxdb_test.go b/plugins/outputs/influxdb/influxdb_test.go index db2cd5ec7..0ece8a1c2 100644 --- a/plugins/outputs/influxdb/influxdb_test.go +++ b/plugins/outputs/influxdb/influxdb_test.go @@ -140,3 +140,27 @@ func TestHTTPError_DatabaseNotFound(t *testing.T) { require.Error(t, err) require.NoError(t, i.Close()) } + +// field type conflict does not return an error, instead we +func TestHTTPError_FieldTypeConflict(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/write": + w.WriteHeader(http.StatusNotFound) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"results":[{}],"error":"field type conflict: input field \"value\" on measurement \"test\" is type integer, already exists as type float dropped=1"}`) + } + })) + defer ts.Close() + + i := InfluxDB{ + URLs: []string{ts.URL}, + Database: "test", + } + + err := i.Connect() + require.NoError(t, err) + err = i.Write(testutil.MockMetrics()) + require.NoError(t, err) + require.NoError(t, i.Close()) +}