Add entity-body compression to http output (#4807)

This commit is contained in:
Mihai Todor 2018-10-05 23:06:41 +01:00 committed by Daniel Nelson
parent fafe9d30bf
commit f3da717a88
7 changed files with 142 additions and 43 deletions

View File

@ -3,8 +3,10 @@ package internal
import ( import (
"bufio" "bufio"
"bytes" "bytes"
"compress/gzip"
"crypto/rand" "crypto/rand"
"errors" "errors"
"io"
"log" "log"
"math/big" "math/big"
"os" "os"
@ -208,3 +210,23 @@ func ExitStatus(err error) (int, bool) {
} }
return 0, false return 0, false
} }
// CompressWithGzip takes an io.Reader as input and pipes
// it through a gzip.Writer returning an io.Reader containing
// the gzipped data.
// An error is returned if passing data to the gzip.Writer fails
func CompressWithGzip(data io.Reader) (io.Reader, error) {
pipeReader, pipeWriter := io.Pipe()
gzipWriter := gzip.NewWriter(pipeWriter)
var err error
go func() {
_, err = io.Copy(gzipWriter, data)
gzipWriter.Close()
// subsequent reads from the read half of the pipe will
// return no bytes and the error err, or EOF if err is nil.
pipeWriter.CloseWithError(err)
}()
return pipeReader, err
}

View File

@ -1,6 +1,9 @@
package internal package internal
import ( import (
"bytes"
"compress/gzip"
"io/ioutil"
"os/exec" "os/exec"
"testing" "testing"
"time" "time"
@ -162,3 +165,20 @@ func TestDuration(t *testing.T) {
d.UnmarshalTOML([]byte(`1.5`)) d.UnmarshalTOML([]byte(`1.5`))
assert.Equal(t, time.Second, d.Duration) assert.Equal(t, time.Second, d.Duration)
} }
func TestCompressWithGzip(t *testing.T) {
testData := "the quick brown fox jumps over the lazy dog"
inputBuffer := bytes.NewBuffer([]byte(testData))
outputBuffer, err := CompressWithGzip(inputBuffer)
assert.NoError(t, err)
gzipReader, err := gzip.NewReader(outputBuffer)
assert.NoError(t, err)
defer gzipReader.Close()
output, err := ioutil.ReadAll(gzipReader)
assert.NoError(t, err)
assert.Equal(t, testData, string(output))
}

View File

@ -44,4 +44,8 @@ data formats. For data_formats that support batching, metrics are sent in batch
# [outputs.http.headers] # [outputs.http.headers]
# # Should be set manually to "application/json" for json data_format # # Should be set manually to "application/json" for json data_format
# Content-Type = "text/plain; charset=utf-8" # Content-Type = "text/plain; charset=utf-8"
## HTTP Content-Encoding for write request body, can be set to "gzip" to
## compress body or "identity" to apply no encoding.
# content_encoding = "identity"
``` ```

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"strings" "strings"
@ -55,6 +56,10 @@ var sampleConfig = `
# [outputs.http.headers] # [outputs.http.headers]
# # Should be set manually to "application/json" for json data_format # # Should be set manually to "application/json" for json data_format
# Content-Type = "text/plain; charset=utf-8" # Content-Type = "text/plain; charset=utf-8"
## HTTP Content-Encoding for write request body, can be set to "gzip" to
## compress body or "identity" to apply no encoding.
# content_encoding = "identity"
` `
const ( const (
@ -64,16 +69,17 @@ const (
) )
type HTTP struct { type HTTP struct {
URL string `toml:"url"` URL string `toml:"url"`
Timeout internal.Duration `toml:"timeout"` Timeout internal.Duration `toml:"timeout"`
Method string `toml:"method"` Method string `toml:"method"`
Username string `toml:"username"` Username string `toml:"username"`
Password string `toml:"password"` Password string `toml:"password"`
Headers map[string]string `toml:"headers"` Headers map[string]string `toml:"headers"`
ClientID string `toml:"client_id"` ClientID string `toml:"client_id"`
ClientSecret string `toml:"client_secret"` ClientSecret string `toml:"client_secret"`
TokenURL string `toml:"token_url"` TokenURL string `toml:"token_url"`
Scopes []string `toml:"scopes"` Scopes []string `toml:"scopes"`
ContentEncoding string `toml:"content_encoding"`
tls.ClientConfig tls.ClientConfig
client *http.Client client *http.Client
@ -162,7 +168,17 @@ func (h *HTTP) Write(metrics []telegraf.Metric) error {
} }
func (h *HTTP) write(reqBody []byte) error { func (h *HTTP) write(reqBody []byte) error {
req, err := http.NewRequest(h.Method, h.URL, bytes.NewBuffer(reqBody)) var reqBodyBuffer io.Reader = bytes.NewBuffer(reqBody)
var err error
if h.ContentEncoding == "gzip" {
reqBodyBuffer, err = internal.CompressWithGzip(reqBodyBuffer)
if err != nil {
return err
}
}
req, err := http.NewRequest(h.Method, h.URL, reqBodyBuffer)
if err != nil { if err != nil {
return err return err
} }
@ -172,6 +188,9 @@ func (h *HTTP) write(reqBody []byte) error {
} }
req.Header.Set("Content-Type", defaultContentType) req.Header.Set("Content-Type", defaultContentType)
if h.ContentEncoding == "gzip" {
req.Header.Set("Content-Encoding", "gzip")
}
for k, v := range h.Headers { for k, v := range h.Headers {
req.Header.Set(k, v) req.Header.Set(k, v)
} }

View File

@ -1,7 +1,9 @@
package http package http
import ( import (
"compress/gzip"
"fmt" "fmt"
"io/ioutil"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url" "net/url"
@ -227,6 +229,66 @@ func TestContentType(t *testing.T) {
} }
} }
func TestContentEncodingGzip(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)
tests := []struct {
name string
plugin *HTTP
payload string
expected string
}{
{
name: "default is no content encoding",
plugin: &HTTP{
URL: u.String(),
},
expected: "",
},
{
name: "overwrite content_encoding",
plugin: &HTTP{
URL: u.String(),
ContentEncoding: "gzip",
},
expected: "gzip",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, tt.expected, r.Header.Get("Content-Encoding"))
body := r.Body
var err error
if r.Header.Get("Content-Encoding") == "gzip" {
body, err = gzip.NewReader(r.Body)
require.NoError(t, err)
}
payload, err := ioutil.ReadAll(body)
require.NoError(t, err)
require.Contains(t, string(payload), "cpu value=42")
w.WriteHeader(http.StatusNoContent)
})
serializer := influx.NewSerializer()
tt.plugin.SetSerializer(serializer)
err = tt.plugin.Connect()
require.NoError(t, err)
err = tt.plugin.Write([]telegraf.Metric{getMetric()})
require.NoError(t, err)
})
}
}
func TestBasicAuth(t *testing.T) { func TestBasicAuth(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler()) ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close() defer ts.Close()

View File

@ -1,7 +1,6 @@
package influxdb package influxdb
import ( import (
"compress/gzip"
"context" "context"
"crypto/tls" "crypto/tls"
"encoding/json" "encoding/json"
@ -16,6 +15,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/plugins/serializers/influx"
) )
@ -360,7 +360,7 @@ func (c *httpClient) makeQueryRequest(query string) (*http.Request, error) {
func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) { func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) {
var err error var err error
if c.ContentEncoding == "gzip" { if c.ContentEncoding == "gzip" {
body, err = compressWithGzip(body) body, err = internal.CompressWithGzip(body)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -381,20 +381,6 @@ func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) {
return req, nil return req, nil
} }
func compressWithGzip(data io.Reader) (io.Reader, error) {
pr, pw := io.Pipe()
gw := gzip.NewWriter(pw)
var err error
go func() {
_, err = io.Copy(gw, data)
gw.Close()
pw.Close()
}()
return pr, err
}
func (c *httpClient) addHeaders(req *http.Request) { func (c *httpClient) addHeaders(req *http.Request) {
if c.Username != "" || c.Password != "" { if c.Username != "" || c.Password != "" {
req.SetBasicAuth(c.Username, c.Password) req.SetBasicAuth(c.Username, c.Password)

View File

@ -1,7 +1,6 @@
package influxdb_v2 package influxdb_v2
import ( import (
"compress/gzip"
"context" "context"
"crypto/tls" "crypto/tls"
"encoding/json" "encoding/json"
@ -17,6 +16,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/plugins/serializers/influx"
) )
@ -231,7 +231,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) { func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) {
var err error var err error
if c.ContentEncoding == "gzip" { if c.ContentEncoding == "gzip" {
body, err = compressWithGzip(body) body, err = internal.CompressWithGzip(body)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -258,20 +258,6 @@ func (c *httpClient) addHeaders(req *http.Request) {
} }
} }
func compressWithGzip(data io.Reader) (io.Reader, error) {
pipeReader, pipeWriter := io.Pipe()
gzipWriter := gzip.NewWriter(pipeWriter)
var err error
go func() {
_, err = io.Copy(gzipWriter, data)
gzipWriter.Close()
pipeWriter.Close()
}()
return pipeReader, err
}
func makeWriteURL(loc url.URL, org, bucket string) (string, error) { func makeWriteURL(loc url.URL, org, bucket string) (string, error) {
params := url.Values{} params := url.Values{}
params.Set("bucket", bucket) params.Set("bucket", bucket)