diff --git a/plugins/outputs/influxdb/README.md b/plugins/outputs/influxdb/README.md index 49fdc731c..36d64c219 100644 --- a/plugins/outputs/influxdb/README.md +++ b/plugins/outputs/influxdb/README.md @@ -43,6 +43,9 @@ This plugin writes to [InfluxDB](https://www.influxdb.com) via HTTP or UDP. ## HTTP Proxy Config # http_proxy = "http://corporate.proxy:3128" + + ## Compress each HTTP request payload using GZIP. + # content_encoding = "gzip" ``` ### Required parameters: @@ -67,3 +70,4 @@ to write to. Each URL should start with either `http://` or `udp://` * `ssl_key`: SSL key * `insecure_skip_verify`: Use SSL but skip chain & host verification (default: false) * `http_proxy`: HTTP Proxy URI +* `content_encoding`: Compress each HTTP request payload using gzip if set to: "gzip" diff --git a/plugins/outputs/influxdb/client/http.go b/plugins/outputs/influxdb/client/http.go index 6a757982d..957c490ed 100644 --- a/plugins/outputs/influxdb/client/http.go +++ b/plugins/outputs/influxdb/client/http.go @@ -2,6 +2,7 @@ package client import ( "bytes" + "compress/gzip" "crypto/tls" "encoding/json" "fmt" @@ -94,9 +95,8 @@ type HTTPConfig struct { // Proxy URL should be of the form "http://host:port" HTTPProxy string - // Gzip, if true, compresses each payload using gzip. - // TODO - // Gzip bool + // The content encoding mechanism to use for each request. + ContentEncoding string } // Response represents a list of statement results. @@ -232,16 +232,24 @@ func (c *httpClient) makeWriteRequest( if err != nil { return nil, err } - req.Header.Set("Content-Length", fmt.Sprint(contentLength)) - // TODO - // if gzip { - // req.Header.Set("Content-Encoding", "gzip") - // } + if c.config.ContentEncoding == "gzip" { + req.Header.Set("Content-Encoding", "gzip") + } else { + req.Header.Set("Content-Length", fmt.Sprint(contentLength)) + } return req, nil } func (c *httpClient) makeRequest(uri string, body io.Reader) (*http.Request, error) { - req, err := http.NewRequest("POST", uri, body) + var req *http.Request + var err error + if c.config.ContentEncoding == "gzip" { + body, err = compressWithGzip(body) + if err != nil { + return nil, err + } + } + req, err = http.NewRequest("POST", uri, body) if err != nil { return nil, err } @@ -253,6 +261,20 @@ func (c *httpClient) makeRequest(uri string, body io.Reader) (*http.Request, err return req, nil } +func compressWithGzip(data io.Reader) (io.Reader, error) { + pr, pw := io.Pipe() + gw := gzip.NewWriter(pw) + var err error + + go func() { + _, err = io.Copy(gw, data) + gw.Close() + pw.Close() + }() + + return pr, err +} + func (c *httpClient) Close() error { // Nothing to do. return nil diff --git a/plugins/outputs/influxdb/client/http_test.go b/plugins/outputs/influxdb/client/http_test.go index 8fa02dd22..7f3bd7e34 100644 --- a/plugins/outputs/influxdb/client/http_test.go +++ b/plugins/outputs/influxdb/client/http_test.go @@ -2,6 +2,7 @@ package client import ( "bytes" + "compress/gzip" "fmt" "net/http" "net/http/httptest" @@ -341,3 +342,24 @@ func TestHTTPClient_Query_JSONDecodeError(t *testing.T) { assert.Error(t, err) assert.Contains(t, err.Error(), "json") } + +func TestGzipCompression(t *testing.T) { + influxLine := "cpu value=99\n" + + // Compress the payload using GZIP. + payload := bytes.NewReader([]byte(influxLine)) + compressed, err := compressWithGzip(payload) + assert.Nil(t, err) + + // Decompress the compressed payload and make sure + // that its original value has not changed. + gr, err := gzip.NewReader(compressed) + assert.Nil(t, err) + gr.Close() + + var uncompressed bytes.Buffer + _, err = uncompressed.ReadFrom(gr) + assert.Nil(t, err) + + assert.Equal(t, []byte(influxLine), uncompressed.Bytes()) +} diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index 1028bc257..23f567820 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -34,6 +34,7 @@ type InfluxDB struct { Timeout internal.Duration UDPPayload int `toml:"udp_payload"` HTTPProxy string `toml:"http_proxy"` + ContentEncoding string `toml:"content_encoding"` // Path to CA file SSLCA string `toml:"ssl_ca"` @@ -87,6 +88,9 @@ var sampleConfig = ` ## HTTP Proxy Config # http_proxy = "http://corporate.proxy:3128" + + ## Compress each HTTP request payload using GZIP. + # content_encoding = "gzip" ` // Connect initiates the primary connection to the range of provided URLs @@ -121,13 +125,14 @@ func (i *InfluxDB) Connect() error { default: // If URL doesn't start with "udp", assume HTTP client config := client.HTTPConfig{ - URL: u, - Timeout: i.Timeout.Duration, - TLSConfig: tlsConfig, - UserAgent: i.UserAgent, - Username: i.Username, - Password: i.Password, - HTTPProxy: i.HTTPProxy, + URL: u, + Timeout: i.Timeout.Duration, + TLSConfig: tlsConfig, + UserAgent: i.UserAgent, + Username: i.Username, + Password: i.Password, + HTTPProxy: i.HTTPProxy, + ContentEncoding: i.ContentEncoding, } wp := client.WriteParams{ Database: i.Database,