Fix influxdb output serialization on connection closed (#6621)

This commit is contained in:
陈方舟 2019-11-14 04:56:01 +08:00 committed by Daniel Nelson
parent 9a2b3bc917
commit fa2f0fff4e
8 changed files with 132 additions and 37 deletions

View File

@ -16,6 +16,7 @@ import (
"runtime" "runtime"
"strconv" "strconv"
"strings" "strings"
"sync"
"syscall" "syscall"
"time" "time"
"unicode" "unicode"
@ -50,6 +51,11 @@ type Number struct {
Value float64 Value float64
} }
type ReadWaitCloser struct {
pipeReader *io.PipeReader
wg sync.WaitGroup
}
// SetVersion sets the telegraf agent version // SetVersion sets the telegraf agent version
func SetVersion(v string) error { func SetVersion(v string) error {
if version != "" { if version != "" {
@ -281,14 +287,25 @@ func ExitStatus(err error) (int, bool) {
return 0, false return 0, false
} }
func (r *ReadWaitCloser) Close() error {
err := r.pipeReader.Close()
r.wg.Wait() // wait for the gzip goroutine finish
return err
}
// CompressWithGzip takes an io.Reader as input and pipes // CompressWithGzip takes an io.Reader as input and pipes
// it through a gzip.Writer returning an io.Reader containing // it through a gzip.Writer returning an io.Reader containing
// the gzipped data. // the gzipped data.
// An error is returned if passing data to the gzip.Writer fails // An error is returned if passing data to the gzip.Writer fails
func CompressWithGzip(data io.Reader) (io.Reader, error) { func CompressWithGzip(data io.Reader) (io.ReadCloser, error) {
pipeReader, pipeWriter := io.Pipe() pipeReader, pipeWriter := io.Pipe()
gzipWriter := gzip.NewWriter(pipeWriter) gzipWriter := gzip.NewWriter(pipeWriter)
rc := &ReadWaitCloser{
pipeReader: pipeReader,
}
rc.wg.Add(1)
var err error var err error
go func() { go func() {
_, err = io.Copy(gzipWriter, data) _, err = io.Copy(gzipWriter, data)
@ -296,6 +313,7 @@ func CompressWithGzip(data io.Reader) (io.Reader, error) {
// subsequent reads from the read half of the pipe will // subsequent reads from the read half of the pipe will
// return no bytes and the error err, or EOF if err is nil. // return no bytes and the error err, or EOF if err is nil.
pipeWriter.CloseWithError(err) pipeWriter.CloseWithError(err)
rc.wg.Done()
}() }()
return pipeReader, err return pipeReader, err

View File

@ -3,6 +3,8 @@ package internal
import ( import (
"bytes" "bytes"
"compress/gzip" "compress/gzip"
"crypto/rand"
"io"
"io/ioutil" "io/ioutil"
"log" "log"
"os/exec" "os/exec"
@ -232,6 +234,38 @@ func TestCompressWithGzip(t *testing.T) {
assert.Equal(t, testData, string(output)) assert.Equal(t, testData, string(output))
} }
type mockReader struct {
readN uint64 // record the number of calls to Read
}
func (r *mockReader) Read(p []byte) (n int, err error) {
r.readN++
return rand.Read(p)
}
func TestCompressWithGzipEarlyClose(t *testing.T) {
mr := &mockReader{}
rc, err := CompressWithGzip(mr)
assert.NoError(t, err)
n, err := io.CopyN(ioutil.Discard, rc, 10000)
assert.NoError(t, err)
assert.Equal(t, int64(10000), n)
r1 := mr.readN
err = rc.Close()
assert.NoError(t, err)
n, err = io.CopyN(ioutil.Discard, rc, 10000)
assert.Error(t, io.EOF, err)
assert.Equal(t, int64(0), n)
r2 := mr.readN
// no more read to the source after closing
assert.Equal(t, r1, r2)
}
func TestVersionAlreadySet(t *testing.T) { func TestVersionAlreadySet(t *testing.T) {
err := SetVersion("foo") err := SetVersion("foo")
assert.Nil(t, err) assert.Nil(t, err)

View File

@ -153,6 +153,7 @@ func (h *HTTP) gatherURL(
if err != nil { if err != nil {
return err return err
} }
defer body.Close()
request, err := http.NewRequest(h.Method, url, body) request, err := http.NewRequest(h.Method, url, body)
if err != nil { if err != nil {
@ -216,16 +217,16 @@ func (h *HTTP) gatherURL(
return nil return nil
} }
func makeRequestBodyReader(contentEncoding, body string) (io.Reader, error) { func makeRequestBodyReader(contentEncoding, body string) (io.ReadCloser, error) {
var err error
var reader io.Reader = strings.NewReader(body) var reader io.Reader = strings.NewReader(body)
if contentEncoding == "gzip" { if contentEncoding == "gzip" {
reader, err = internal.CompressWithGzip(reader) rc, err := internal.CompressWithGzip(reader)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return rc, nil
} }
return reader, nil return ioutil.NopCloser(reader), nil
} }
func init() { func init() {

View File

@ -176,10 +176,12 @@ func (h *HTTP) write(reqBody []byte) error {
var err error var err error
if h.ContentEncoding == "gzip" { if h.ContentEncoding == "gzip" {
reqBodyBuffer, err = internal.CompressWithGzip(reqBodyBuffer) rc, err := internal.CompressWithGzip(reqBodyBuffer)
if err != nil { if err != nil {
return err return err
} }
defer rc.Close()
reqBodyBuffer = rc
} }
req, err := http.NewRequest(h.Method, h.URL, reqBodyBuffer) req, err := http.NewRequest(h.Method, h.URL, reqBodyBuffer)

View File

@ -6,6 +6,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
@ -288,7 +289,12 @@ func (c *httpClient) writeBatch(ctx context.Context, db string, metrics []telegr
return err return err
} }
reader := influx.NewReader(metrics, c.config.Serializer) reader, err := c.requestBodyReader(metrics)
if err != nil {
return err
}
defer reader.Close()
req, err := c.makeWriteRequest(url, reader) req, err := c.makeWriteRequest(url, reader)
if err != nil { if err != nil {
return err return err
@ -386,12 +392,6 @@ func (c *httpClient) makeQueryRequest(query string) (*http.Request, error) {
func (c *httpClient) makeWriteRequest(url string, body io.Reader) (*http.Request, error) { func (c *httpClient) makeWriteRequest(url string, body io.Reader) (*http.Request, error) {
var err error var err error
if c.config.ContentEncoding == "gzip" {
body, err = internal.CompressWithGzip(body)
if err != nil {
return nil, err
}
}
req, err := http.NewRequest("POST", url, body) req, err := http.NewRequest("POST", url, body)
if err != nil { if err != nil {
@ -408,6 +408,23 @@ func (c *httpClient) makeWriteRequest(url string, body io.Reader) (*http.Request
return req, nil return req, nil
} }
// requestBodyReader warp io.Reader from influx.NewReader to io.ReadCloser, which is usefully to fast close the write
// side of the connection in case of error
func (c *httpClient) requestBodyReader(metrics []telegraf.Metric) (io.ReadCloser, error) {
reader := influx.NewReader(metrics, c.config.Serializer)
if c.config.ContentEncoding == "gzip" {
rc, err := internal.CompressWithGzip(reader)
if err != nil {
return nil, err
}
return rc, nil
}
return ioutil.NopCloser(reader), nil
}
func (c *httpClient) addHeaders(req *http.Request) { func (c *httpClient) addHeaders(req *http.Request) {
if c.config.Username != "" || c.config.Password != "" { if c.config.Username != "" || c.config.Password != "" {
req.SetBasicAuth(c.config.Username, c.config.Password) req.SetBasicAuth(c.config.Username, c.config.Password)

View File

@ -57,8 +57,7 @@ type InfluxDB struct {
CreateHTTPClientF func(config *HTTPConfig) (Client, error) CreateHTTPClientF func(config *HTTPConfig) (Client, error)
CreateUDPClientF func(config *UDPConfig) (Client, error) CreateUDPClientF func(config *UDPConfig) (Client, error)
serializer *influx.Serializer Log telegraf.Logger
Log telegraf.Logger
} }
var sampleConfig = ` var sampleConfig = `
@ -145,11 +144,6 @@ func (i *InfluxDB) Connect() error {
urls = append(urls, defaultURL) urls = append(urls, defaultURL)
} }
i.serializer = influx.NewSerializer()
if i.InfluxUintSupport {
i.serializer.SetFieldTypeSupport(influx.UintSupport)
}
for _, u := range urls { for _, u := range urls {
parts, err := url.Parse(u) parts, err := url.Parse(u)
if err != nil { if err != nil {
@ -237,7 +231,7 @@ func (i *InfluxDB) udpClient(url *url.URL) (Client, error) {
config := &UDPConfig{ config := &UDPConfig{
URL: url, URL: url,
MaxPayloadSize: int(i.UDPPayload.Size), MaxPayloadSize: int(i.UDPPayload.Size),
Serializer: i.serializer, Serializer: i.newSerializer(),
Log: i.Log, Log: i.Log,
} }
@ -271,7 +265,7 @@ func (i *InfluxDB) httpClient(ctx context.Context, url *url.URL, proxy *url.URL)
SkipDatabaseCreation: i.SkipDatabaseCreation, SkipDatabaseCreation: i.SkipDatabaseCreation,
RetentionPolicy: i.RetentionPolicy, RetentionPolicy: i.RetentionPolicy,
Consistency: i.WriteConsistency, Consistency: i.WriteConsistency,
Serializer: i.serializer, Serializer: i.newSerializer(),
Log: i.Log, Log: i.Log,
} }
@ -291,6 +285,15 @@ func (i *InfluxDB) httpClient(ctx context.Context, url *url.URL, proxy *url.URL)
return c, nil return c, nil
} }
func (i *InfluxDB) newSerializer() *influx.Serializer {
serializer := influx.NewSerializer()
if i.InfluxUintSupport {
serializer.SetFieldTypeSupport(influx.UintSupport)
}
return serializer
}
func init() { func init() {
outputs.Add("influxdb", func() telegraf.Output { outputs.Add("influxdb", func() telegraf.Output {
return &InfluxDB{ return &InfluxDB{

View File

@ -7,6 +7,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"log" "log"
"net" "net"
"net/http" "net/http"
@ -214,7 +215,12 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
return err return err
} }
reader := influx.NewReader(metrics, c.serializer) reader, err := c.requestBodyReader(metrics)
if err != nil {
return err
}
defer reader.Close()
req, err := c.makeWriteRequest(url, reader) req, err := c.makeWriteRequest(url, reader)
if err != nil { if err != nil {
return err return err
@ -282,12 +288,6 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
func (c *httpClient) makeWriteRequest(url string, body io.Reader) (*http.Request, error) { func (c *httpClient) makeWriteRequest(url string, body io.Reader) (*http.Request, error) {
var err error var err error
if c.ContentEncoding == "gzip" {
body, err = internal.CompressWithGzip(body)
if err != nil {
return nil, err
}
}
req, err := http.NewRequest("POST", url, body) req, err := http.NewRequest("POST", url, body)
if err != nil { if err != nil {
@ -304,6 +304,23 @@ func (c *httpClient) makeWriteRequest(url string, body io.Reader) (*http.Request
return req, nil return req, nil
} }
// requestBodyReader warp io.Reader from influx.NewReader to io.ReadCloser, which is usefully to fast close the write
// side of the connection in case of error
func (c *httpClient) requestBodyReader(metrics []telegraf.Metric) (io.ReadCloser, error) {
reader := influx.NewReader(metrics, c.serializer)
if c.ContentEncoding == "gzip" {
rc, err := internal.CompressWithGzip(reader)
if err != nil {
return nil, err
}
return rc, nil
}
return ioutil.NopCloser(reader), nil
}
func (c *httpClient) addHeaders(req *http.Request) { func (c *httpClient) addHeaders(req *http.Request) {
for header, value := range c.Headers { for header, value := range c.Headers {
req.Header.Set(header, value) req.Header.Set(header, value)

View File

@ -96,8 +96,7 @@ type InfluxDB struct {
UintSupport bool `toml:"influx_uint_support"` UintSupport bool `toml:"influx_uint_support"`
tls.ClientConfig tls.ClientConfig
clients []Client clients []Client
serializer *influx.Serializer
} }
func (i *InfluxDB) Connect() error { func (i *InfluxDB) Connect() error {
@ -107,11 +106,6 @@ func (i *InfluxDB) Connect() error {
i.URLs = append(i.URLs, defaultURL) i.URLs = append(i.URLs, defaultURL)
} }
i.serializer = influx.NewSerializer()
if i.UintSupport {
i.serializer.SetFieldTypeSupport(influx.UintSupport)
}
for _, u := range i.URLs { for _, u := range i.URLs {
parts, err := url.Parse(u) parts, err := url.Parse(u)
if err != nil { if err != nil {
@ -196,7 +190,7 @@ func (i *InfluxDB) getHTTPClient(ctx context.Context, url *url.URL, proxy *url.U
UserAgent: i.UserAgent, UserAgent: i.UserAgent,
ContentEncoding: i.ContentEncoding, ContentEncoding: i.ContentEncoding,
TLSConfig: tlsConfig, TLSConfig: tlsConfig,
Serializer: i.serializer, Serializer: i.newSerializer(),
} }
c, err := NewHTTPClient(config) c, err := NewHTTPClient(config)
@ -207,6 +201,15 @@ func (i *InfluxDB) getHTTPClient(ctx context.Context, url *url.URL, proxy *url.U
return c, nil return c, nil
} }
func (i *InfluxDB) newSerializer() *influx.Serializer {
serializer := influx.NewSerializer()
if i.UintSupport {
serializer.SetFieldTypeSupport(influx.UintSupport)
}
return serializer
}
func init() { func init() {
outputs.Add("influxdb_v2", func() telegraf.Output { outputs.Add("influxdb_v2", func() telegraf.Output {
return &InfluxDB{ return &InfluxDB{