Add gzip content-encoding support to influxdb output (#2978)

This commit is contained in:
Bob Shannon 2017-08-14 17:50:15 -04:00 committed by Daniel Nelson
parent 68e6841a5c
commit 67fe167b79
4 changed files with 69 additions and 16 deletions

View File

@ -43,6 +43,9 @@ This plugin writes to [InfluxDB](https://www.influxdb.com) via HTTP or UDP.
## HTTP Proxy Config ## HTTP Proxy Config
# http_proxy = "http://corporate.proxy:3128" # http_proxy = "http://corporate.proxy:3128"
## Compress each HTTP request payload using GZIP.
# content_encoding = "gzip"
``` ```
### Required parameters: ### Required parameters:
@ -67,3 +70,4 @@ to write to. Each URL should start with either `http://` or `udp://`
* `ssl_key`: SSL key * `ssl_key`: SSL key
* `insecure_skip_verify`: Use SSL but skip chain & host verification (default: false) * `insecure_skip_verify`: Use SSL but skip chain & host verification (default: false)
* `http_proxy`: HTTP Proxy URI * `http_proxy`: HTTP Proxy URI
* `content_encoding`: Compress each HTTP request payload using gzip if set to: "gzip"

View File

@ -2,6 +2,7 @@ package client
import ( import (
"bytes" "bytes"
"compress/gzip"
"crypto/tls" "crypto/tls"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -94,9 +95,8 @@ type HTTPConfig struct {
// Proxy URL should be of the form "http://host:port" // Proxy URL should be of the form "http://host:port"
HTTPProxy string HTTPProxy string
// Gzip, if true, compresses each payload using gzip. // The content encoding mechanism to use for each request.
// TODO ContentEncoding string
// Gzip bool
} }
// Response represents a list of statement results. // Response represents a list of statement results.
@ -232,16 +232,24 @@ func (c *httpClient) makeWriteRequest(
if err != nil { if err != nil {
return nil, err return nil, err
} }
req.Header.Set("Content-Length", fmt.Sprint(contentLength)) if c.config.ContentEncoding == "gzip" {
// TODO req.Header.Set("Content-Encoding", "gzip")
// if gzip { } else {
// req.Header.Set("Content-Encoding", "gzip") req.Header.Set("Content-Length", fmt.Sprint(contentLength))
// } }
return req, nil return req, nil
} }
func (c *httpClient) makeRequest(uri string, body io.Reader) (*http.Request, error) { func (c *httpClient) makeRequest(uri string, body io.Reader) (*http.Request, error) {
req, err := http.NewRequest("POST", uri, body) var req *http.Request
var err error
if c.config.ContentEncoding == "gzip" {
body, err = compressWithGzip(body)
if err != nil {
return nil, err
}
}
req, err = http.NewRequest("POST", uri, body)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -253,6 +261,20 @@ func (c *httpClient) makeRequest(uri string, body io.Reader) (*http.Request, err
return req, nil return req, nil
} }
func compressWithGzip(data io.Reader) (io.Reader, error) {
pr, pw := io.Pipe()
gw := gzip.NewWriter(pw)
var err error
go func() {
_, err = io.Copy(gw, data)
gw.Close()
pw.Close()
}()
return pr, err
}
func (c *httpClient) Close() error { func (c *httpClient) Close() error {
// Nothing to do. // Nothing to do.
return nil return nil

View File

@ -2,6 +2,7 @@ package client
import ( import (
"bytes" "bytes"
"compress/gzip"
"fmt" "fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
@ -341,3 +342,24 @@ func TestHTTPClient_Query_JSONDecodeError(t *testing.T) {
assert.Error(t, err) assert.Error(t, err)
assert.Contains(t, err.Error(), "json") assert.Contains(t, err.Error(), "json")
} }
func TestGzipCompression(t *testing.T) {
influxLine := "cpu value=99\n"
// Compress the payload using GZIP.
payload := bytes.NewReader([]byte(influxLine))
compressed, err := compressWithGzip(payload)
assert.Nil(t, err)
// Decompress the compressed payload and make sure
// that its original value has not changed.
gr, err := gzip.NewReader(compressed)
assert.Nil(t, err)
gr.Close()
var uncompressed bytes.Buffer
_, err = uncompressed.ReadFrom(gr)
assert.Nil(t, err)
assert.Equal(t, []byte(influxLine), uncompressed.Bytes())
}

View File

@ -34,6 +34,7 @@ type InfluxDB struct {
Timeout internal.Duration Timeout internal.Duration
UDPPayload int `toml:"udp_payload"` UDPPayload int `toml:"udp_payload"`
HTTPProxy string `toml:"http_proxy"` HTTPProxy string `toml:"http_proxy"`
ContentEncoding string `toml:"content_encoding"`
// Path to CA file // Path to CA file
SSLCA string `toml:"ssl_ca"` SSLCA string `toml:"ssl_ca"`
@ -87,6 +88,9 @@ var sampleConfig = `
## HTTP Proxy Config ## HTTP Proxy Config
# http_proxy = "http://corporate.proxy:3128" # http_proxy = "http://corporate.proxy:3128"
## Compress each HTTP request payload using GZIP.
# content_encoding = "gzip"
` `
// Connect initiates the primary connection to the range of provided URLs // Connect initiates the primary connection to the range of provided URLs
@ -121,13 +125,14 @@ func (i *InfluxDB) Connect() error {
default: default:
// If URL doesn't start with "udp", assume HTTP client // If URL doesn't start with "udp", assume HTTP client
config := client.HTTPConfig{ config := client.HTTPConfig{
URL: u, URL: u,
Timeout: i.Timeout.Duration, Timeout: i.Timeout.Duration,
TLSConfig: tlsConfig, TLSConfig: tlsConfig,
UserAgent: i.UserAgent, UserAgent: i.UserAgent,
Username: i.Username, Username: i.Username,
Password: i.Password, Password: i.Password,
HTTPProxy: i.HTTPProxy, HTTPProxy: i.HTTPProxy,
ContentEncoding: i.ContentEncoding,
} }
wp := client.WriteParams{ wp := client.WriteParams{
Database: i.Database, Database: i.Database,