Revert using fasthttp library to net/http
This commit is contained in:
parent
f5d892d7d3
commit
a36fd375de
|
@ -6,6 +6,7 @@
|
||||||
|
|
||||||
- [#2137](https://github.com/influxdata/telegraf/pull/2137): Added userstats to mysql input plugin.
|
- [#2137](https://github.com/influxdata/telegraf/pull/2137): Added userstats to mysql input plugin.
|
||||||
- [#2179](https://github.com/influxdata/telegraf/pull/2179): Added more InnoDB metric to MySQL plugin.
|
- [#2179](https://github.com/influxdata/telegraf/pull/2179): Added more InnoDB metric to MySQL plugin.
|
||||||
|
- [#2251](https://github.com/influxdata/telegraf/pull/2251): InfluxDB output: use own client for improved through-put and less allocations.
|
||||||
|
|
||||||
### Bugfixes
|
### Bugfixes
|
||||||
|
|
||||||
|
|
2
Godeps
2
Godeps
|
@ -50,8 +50,6 @@ github.com/shirou/gopsutil 1516eb9ddc5e61ba58874047a98f8b44b5e585e8
|
||||||
github.com/soniah/gosnmp 3fe3beb30fa9700988893c56a63b1df8e1b68c26
|
github.com/soniah/gosnmp 3fe3beb30fa9700988893c56a63b1df8e1b68c26
|
||||||
github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744
|
github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744
|
||||||
github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c
|
github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c
|
||||||
github.com/valyala/bytebufferpool e746df99fe4a3986f4d4f79e13c1e0117ce9c2f7
|
|
||||||
github.com/valyala/fasthttp 2f4876aaf2b591786efc9b49f34b86ad44c25074
|
|
||||||
github.com/vjeantet/grok 83bfdfdfd1a8146795b28e547a8e3c8b28a466c2
|
github.com/vjeantet/grok 83bfdfdfd1a8146795b28e547a8e3c8b28a466c2
|
||||||
github.com/wvanbergen/kafka bc265fedb9ff5b5c5d3c0fdcef4a819b3523d3ee
|
github.com/wvanbergen/kafka bc265fedb9ff5b5c5d3c0fdcef4a819b3523d3ee
|
||||||
github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8
|
github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8
|
||||||
|
|
|
@ -300,9 +300,6 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *HTTPListener) parse(b []byte, t time.Time) error {
|
func (h *HTTPListener) parse(b []byte, t time.Time) error {
|
||||||
if !bytes.HasSuffix(b, []byte("\n")) {
|
|
||||||
b = append(b, '\n')
|
|
||||||
}
|
|
||||||
metrics, err := h.parser.ParseWithDefaultTime(b, t)
|
metrics, err := h.parser.ParseWithDefaultTime(b, t)
|
||||||
|
|
||||||
for _, m := range metrics {
|
for _, m := range metrics {
|
||||||
|
|
|
@ -1,15 +1,15 @@
|
||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"encoding/base64"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/valyala/fasthttp"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -40,13 +40,15 @@ func NewHTTP(config HTTPConfig, defaultWP WriteParams) (Client, error) {
|
||||||
return nil, fmt.Errorf("config.URL scheme must be http(s), got %s", u.Scheme)
|
return nil, fmt.Errorf("config.URL scheme must be http(s), got %s", u.Scheme)
|
||||||
}
|
}
|
||||||
|
|
||||||
wu := writeURL(u, defaultWP)
|
|
||||||
return &httpClient{
|
return &httpClient{
|
||||||
writeURL: []byte(wu),
|
writeURL: writeURL(u, defaultWP),
|
||||||
config: config,
|
config: config,
|
||||||
url: u,
|
url: u,
|
||||||
client: &fasthttp.Client{
|
client: &http.Client{
|
||||||
TLSConfig: config.TLSConfig,
|
Timeout: config.Timeout,
|
||||||
|
Transport: &http.Transport{
|
||||||
|
TLSClientConfig: config.TLSConfig,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
@ -58,8 +60,13 @@ type HTTPConfig struct {
|
||||||
// UserAgent sets the User-Agent header.
|
// UserAgent sets the User-Agent header.
|
||||||
UserAgent string
|
UserAgent string
|
||||||
|
|
||||||
// Timeout is the time to wait for a response to each HTTP request (writes
|
// Timeout specifies a time limit for requests made by this
|
||||||
// and queries).
|
// Client. The timeout includes connection time, any
|
||||||
|
// redirects, and reading the response body. The timer remains
|
||||||
|
// running after Get, Head, Post, or Do return and will
|
||||||
|
// interrupt reading of the Response.Body.
|
||||||
|
//
|
||||||
|
// A Timeout of zero means no timeout.
|
||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
|
|
||||||
// Username is the basic auth username for the server.
|
// Username is the basic auth username for the server.
|
||||||
|
@ -92,24 +99,27 @@ func (r *Response) Error() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
type httpClient struct {
|
type httpClient struct {
|
||||||
writeURL []byte
|
writeURL string
|
||||||
config HTTPConfig
|
config HTTPConfig
|
||||||
client *fasthttp.Client
|
client *http.Client
|
||||||
url *url.URL
|
url *url.URL
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *httpClient) Query(command string) error {
|
func (c *httpClient) Query(command string) error {
|
||||||
req := c.makeRequest()
|
req, err := c.makeRequest(queryURL(c.url, command), bytes.NewReader([]byte("")))
|
||||||
req.Header.SetRequestURI(queryURL(c.url, command))
|
if err != nil {
|
||||||
|
return err
|
||||||
return c.doRequest(req, fasthttp.StatusOK)
|
}
|
||||||
|
return c.doRequest(req, http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *httpClient) Write(b []byte) (int, error) {
|
func (c *httpClient) Write(b []byte) (int, error) {
|
||||||
req := c.makeWriteRequest(len(b), c.writeURL)
|
req, err := c.makeWriteRequest(bytes.NewReader(b), len(b), c.writeURL)
|
||||||
req.SetBody(b)
|
if err != nil {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
err := c.doRequest(req, fasthttp.StatusNoContent)
|
err = c.doRequest(req, http.StatusNoContent)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return len(b), nil
|
return len(b), nil
|
||||||
}
|
}
|
||||||
|
@ -117,10 +127,12 @@ func (c *httpClient) Write(b []byte) (int, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *httpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
|
func (c *httpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
|
||||||
req := c.makeWriteRequest(len(b), []byte(writeURL(c.url, wp)))
|
req, err := c.makeWriteRequest(bytes.NewReader(b), len(b), writeURL(c.url, wp))
|
||||||
req.SetBody(b)
|
if err != nil {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
err := c.doRequest(req, fasthttp.StatusNoContent)
|
err = c.doRequest(req, http.StatusNoContent)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return len(b), nil
|
return len(b), nil
|
||||||
}
|
}
|
||||||
|
@ -128,10 +140,12 @@ func (c *httpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *httpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
|
func (c *httpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
|
||||||
req := c.makeWriteRequest(contentLength, c.writeURL)
|
req, err := c.makeWriteRequest(r, contentLength, c.writeURL)
|
||||||
req.SetBodyStream(r, contentLength)
|
if err != nil {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
err := c.doRequest(req, fasthttp.StatusNoContent)
|
err = c.doRequest(req, http.StatusNoContent)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return contentLength, nil
|
return contentLength, nil
|
||||||
}
|
}
|
||||||
|
@ -143,10 +157,12 @@ func (c *httpClient) WriteStreamWithParams(
|
||||||
contentLength int,
|
contentLength int,
|
||||||
wp WriteParams,
|
wp WriteParams,
|
||||||
) (int, error) {
|
) (int, error) {
|
||||||
req := c.makeWriteRequest(contentLength, []byte(writeURL(c.url, wp)))
|
req, err := c.makeWriteRequest(r, contentLength, writeURL(c.url, wp))
|
||||||
req.SetBodyStream(r, contentLength)
|
if err != nil {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
err := c.doRequest(req, fasthttp.StatusNoContent)
|
err = c.doRequest(req, http.StatusNoContent)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return contentLength, nil
|
return contentLength, nil
|
||||||
}
|
}
|
||||||
|
@ -154,24 +170,27 @@ func (c *httpClient) WriteStreamWithParams(
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *httpClient) doRequest(
|
func (c *httpClient) doRequest(
|
||||||
req *fasthttp.Request,
|
req *http.Request,
|
||||||
expectedCode int,
|
expectedCode int,
|
||||||
) error {
|
) error {
|
||||||
resp := fasthttp.AcquireResponse()
|
resp, err := c.client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
err := c.client.DoTimeout(req, resp, c.config.Timeout)
|
code := resp.StatusCode
|
||||||
|
|
||||||
code := resp.StatusCode()
|
|
||||||
// If it's a "no content" response, then release and return nil
|
// If it's a "no content" response, then release and return nil
|
||||||
if code == fasthttp.StatusNoContent {
|
if code == http.StatusNoContent {
|
||||||
fasthttp.ReleaseResponse(resp)
|
|
||||||
fasthttp.ReleaseRequest(req)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// not a "no content" response, so parse the result:
|
// not a "no content" response, so parse the result:
|
||||||
var response Response
|
var response Response
|
||||||
decErr := json.Unmarshal(resp.Body(), &response)
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Fatal error reading body: %s", err)
|
||||||
|
}
|
||||||
|
decErr := json.Unmarshal(body, &response)
|
||||||
|
|
||||||
// If we got a JSON decode error, send that back
|
// If we got a JSON decode error, send that back
|
||||||
if decErr != nil {
|
if decErr != nil {
|
||||||
|
@ -184,35 +203,37 @@ func (c *httpClient) doRequest(
|
||||||
code, expectedCode, response.Error())
|
code, expectedCode, response.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
fasthttp.ReleaseResponse(resp)
|
|
||||||
fasthttp.ReleaseRequest(req)
|
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *httpClient) makeWriteRequest(
|
func (c *httpClient) makeWriteRequest(
|
||||||
|
body io.Reader,
|
||||||
contentLength int,
|
contentLength int,
|
||||||
writeURL []byte,
|
writeURL string,
|
||||||
) *fasthttp.Request {
|
) (*http.Request, error) {
|
||||||
req := c.makeRequest()
|
req, err := c.makeRequest(writeURL, body)
|
||||||
req.Header.SetContentLength(contentLength)
|
if err != nil {
|
||||||
req.Header.SetRequestURIBytes(writeURL)
|
return nil, err
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Length", fmt.Sprint(contentLength))
|
||||||
// TODO
|
// TODO
|
||||||
// if gzip {
|
// if gzip {
|
||||||
// req.Header.SetBytesKV([]byte("Content-Encoding"), []byte("gzip"))
|
// req.Header.Set("Content-Encoding", "gzip")
|
||||||
// }
|
// }
|
||||||
return req
|
return req, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *httpClient) makeRequest() *fasthttp.Request {
|
func (c *httpClient) makeRequest(uri string, body io.Reader) (*http.Request, error) {
|
||||||
req := fasthttp.AcquireRequest()
|
req, err := http.NewRequest("POST", uri, body)
|
||||||
req.Header.SetContentTypeBytes([]byte("text/plain"))
|
if err != nil {
|
||||||
req.Header.SetMethodBytes([]byte("POST"))
|
return nil, err
|
||||||
req.Header.SetUserAgent(c.config.UserAgent)
|
|
||||||
if c.config.Username != "" && c.config.Password != "" {
|
|
||||||
req.Header.Set("Authorization", "Basic "+basicAuth(c.config.Username, c.config.Password))
|
|
||||||
}
|
}
|
||||||
return req
|
req.Header.Set("Content-Type", "text/plain")
|
||||||
|
req.Header.Set("User-Agent", c.config.UserAgent)
|
||||||
|
if c.config.Username != "" && c.config.Password != "" {
|
||||||
|
req.SetBasicAuth(c.config.Username, c.config.Password)
|
||||||
|
}
|
||||||
|
return req, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *httpClient) Close() error {
|
func (c *httpClient) Close() error {
|
||||||
|
@ -246,13 +267,3 @@ func queryURL(u *url.URL, command string) string {
|
||||||
u.Path = "query"
|
u.Path = "query"
|
||||||
return u.String()
|
return u.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
// See 2 (end of page 4) http://www.ietf.org/rfc/rfc2617.txt
|
|
||||||
// "To receive authorization, the httpClient sends the userid and password,
|
|
||||||
// separated by a single colon (":") character, within a base64
|
|
||||||
// encoded string in the credentials."
|
|
||||||
// It is not meant to be urlencoded.
|
|
||||||
func basicAuth(username, password string) string {
|
|
||||||
auth := username + ":" + password
|
|
||||||
return base64.StdEncoding.EncodeToString([]byte(auth))
|
|
||||||
}
|
|
||||||
|
|
|
@ -99,8 +99,8 @@ func (i *InfluxDB) Connect() error {
|
||||||
config := client.UDPConfig{
|
config := client.UDPConfig{
|
||||||
URL: u,
|
URL: u,
|
||||||
PayloadSize: i.UDPPayload,
|
PayloadSize: i.UDPPayload,
|
||||||
c, err := client.NewUDP(config)
|
|
||||||
}
|
}
|
||||||
|
c, err := client.NewUDP(config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err)
|
return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err)
|
||||||
}
|
}
|
||||||
|
@ -154,8 +154,8 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
||||||
bufsize := 0
|
bufsize := 0
|
||||||
for _, m := range metrics {
|
for _, m := range metrics {
|
||||||
bufsize += m.Len()
|
bufsize += m.Len()
|
||||||
r := metric.NewReader(metrics)
|
|
||||||
}
|
}
|
||||||
|
r := metric.NewReader(metrics)
|
||||||
|
|
||||||
// This will get set to nil if a successful write occurs
|
// This will get set to nil if a successful write occurs
|
||||||
err := fmt.Errorf("Could not write to any InfluxDB server in cluster")
|
err := fmt.Errorf("Could not write to any InfluxDB server in cluster")
|
||||||
|
@ -163,9 +163,6 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
||||||
p := rand.Perm(len(i.clients))
|
p := rand.Perm(len(i.clients))
|
||||||
for _, n := range p {
|
for _, n := range p {
|
||||||
if _, e := i.clients[n].WriteStream(r, bufsize); e != nil {
|
if _, e := i.clients[n].WriteStream(r, bufsize); e != nil {
|
||||||
// Log write failure:
|
|
||||||
log.Printf("E! InfluxDB Output Error: %s", e)
|
|
||||||
|
|
||||||
// If the database was not found, try to recreate it:
|
// If the database was not found, try to recreate it:
|
||||||
if strings.Contains(e.Error(), "database not found") {
|
if strings.Contains(e.Error(), "database not found") {
|
||||||
if errc := i.clients[n].Query("CREATE DATABASE " + i.Database); errc != nil {
|
if errc := i.clients[n].Query("CREATE DATABASE " + i.Database); errc != nil {
|
||||||
|
|
|
@ -140,3 +140,27 @@ func TestHTTPError_DatabaseNotFound(t *testing.T) {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.NoError(t, i.Close())
|
require.NoError(t, i.Close())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// field type conflict does not return an error, instead we
|
||||||
|
func TestHTTPError_FieldTypeConflict(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.URL.Path {
|
||||||
|
case "/write":
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
fmt.Fprintln(w, `{"results":[{}],"error":"field type conflict: input field \"value\" on measurement \"test\" is type integer, already exists as type float dropped=1"}`)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
i := InfluxDB{
|
||||||
|
URLs: []string{ts.URL},
|
||||||
|
Database: "test",
|
||||||
|
}
|
||||||
|
|
||||||
|
err := i.Connect()
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = i.Write(testutil.MockMetrics())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, i.Close())
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue