From f5d892d7d341d5d1c8936b885397083a36e9d087 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Sun, 4 Dec 2016 20:18:13 +0000 Subject: [PATCH] Improve the InfluxDB through-put performance This changes the current use of the InfluxDB client to instead use a baked-in client that uses the fasthttp library. This allows for significantly smaller allocations, the re-use of http body buffers, and the re-use of the actual bytes of the line-protocol metric representations. --- Godeps | 2 + metric.go | 9 +- metric/metric.go | 42 ++ metric/reader.go | 155 ++++++ metric/reader_test.go | 487 ++++++++++++++++++ plugins/inputs/http_listener/http_listener.go | 3 + plugins/outputs/influxdb/client/client.go | 22 + plugins/outputs/influxdb/client/http.go | 258 ++++++++++ plugins/outputs/influxdb/client/http_test.go | 343 ++++++++++++ plugins/outputs/influxdb/client/udp.go | 99 ++++ plugins/outputs/influxdb/client/udp_test.go | 163 ++++++ plugins/outputs/influxdb/influxdb.go | 124 ++--- plugins/outputs/influxdb/influxdb_test.go | 113 +++- 13 files changed, 1735 insertions(+), 85 deletions(-) create mode 100644 metric/reader.go create mode 100644 metric/reader_test.go create mode 100644 plugins/outputs/influxdb/client/client.go create mode 100644 plugins/outputs/influxdb/client/http.go create mode 100644 plugins/outputs/influxdb/client/http_test.go create mode 100644 plugins/outputs/influxdb/client/udp.go create mode 100644 plugins/outputs/influxdb/client/udp_test.go diff --git a/Godeps b/Godeps index 99606414e..83b9e4561 100644 --- a/Godeps +++ b/Godeps @@ -50,6 +50,8 @@ github.com/shirou/gopsutil 1516eb9ddc5e61ba58874047a98f8b44b5e585e8 github.com/soniah/gosnmp 3fe3beb30fa9700988893c56a63b1df8e1b68c26 github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744 github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c +github.com/valyala/bytebufferpool e746df99fe4a3986f4d4f79e13c1e0117ce9c2f7 +github.com/valyala/fasthttp 2f4876aaf2b591786efc9b49f34b86ad44c25074 github.com/vjeantet/grok 83bfdfdfd1a8146795b28e547a8e3c8b28a466c2 github.com/wvanbergen/kafka bc265fedb9ff5b5c5d3c0fdcef4a819b3523d3ee github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8 diff --git a/metric.go b/metric.go index cb230512f..b1ab1b29f 100644 --- a/metric.go +++ b/metric.go @@ -19,8 +19,15 @@ const ( ) type Metric interface { + // Serialize serializes the metric into a line-protocol byte buffer, + // including a newline at the end. Serialize() []byte - String() string // convenience function for string(Serialize()) + // same as Serialize, but avoids an allocation. + // returns number of bytes copied into dst. + SerializeTo(dst []byte) int + // String is the same as Serialize, but returns a string. + String() string + // Copy deep-copies the metric. Copy() Metric // Split will attempt to return multiple metrics with the same timestamp // whose string representations are no longer than maxSize. diff --git a/metric/metric.go b/metric/metric.go index 8a18c0f2c..4fbee0ad1 100644 --- a/metric/metric.go +++ b/metric/metric.go @@ -178,6 +178,48 @@ func (m *metric) Serialize() []byte { return tmp } +func (m *metric) SerializeTo(dst []byte) int { + i := 0 + if i >= len(dst) { + return i + } + + i += copy(dst[i:], m.name) + if i >= len(dst) { + return i + } + + i += copy(dst[i:], m.tags) + if i >= len(dst) { + return i + } + + dst[i] = ' ' + i++ + if i >= len(dst) { + return i + } + + i += copy(dst[i:], m.fields) + if i >= len(dst) { + return i + } + + dst[i] = ' ' + i++ + if i >= len(dst) { + return i + } + + i += copy(dst[i:], m.t) + if i >= len(dst) { + return i + } + dst[i] = '\n' + + return i + 1 +} + func (m *metric) Split(maxSize int) []telegraf.Metric { if m.Len() < maxSize { return []telegraf.Metric{m} diff --git a/metric/reader.go b/metric/reader.go new file mode 100644 index 000000000..df0729963 --- /dev/null +++ b/metric/reader.go @@ -0,0 +1,155 @@ +package metric + +import ( + "io" + + "github.com/influxdata/telegraf" +) + +type state int + +const ( + _ state = iota + // normal state copies whole metrics into the given buffer until we can't + // fit the next metric. + normal + // split state means that we have a metric that we were able to split, so + // that we can fit it into multiple metrics (and calls to Read) + split + // overflow state means that we have a metric that didn't fit into a single + // buffer, and needs to be split across multiple calls to Read. + overflow + // splitOverflow state means that a split metric didn't fit into a single + // buffer, and needs to be split across multiple calls to Read. + splitOverflow + // done means we're done reading metrics, and now always return (0, io.EOF) + done +) + +type reader struct { + metrics []telegraf.Metric + splitMetrics []telegraf.Metric + buf []byte + state state + + // metric index + iM int + // split metric index + iSM int + // buffer index + iB int +} + +func NewReader(metrics []telegraf.Metric) io.Reader { + return &reader{ + metrics: metrics, + state: normal, + } +} + +func (r *reader) Read(p []byte) (n int, err error) { + var i int + switch r.state { + case done: + return 0, io.EOF + case normal: + for { + // this for-loop is the sunny-day scenario, where we are given a + // buffer that is large enough to hold at least a single metric. + // all of the cases below it are edge-cases. + if r.metrics[r.iM].Len() < len(p[i:]) { + i += r.metrics[r.iM].SerializeTo(p[i:]) + } else { + break + } + r.iM++ + if r.iM == len(r.metrics) { + r.state = done + return i, io.EOF + } + } + + // if we haven't written any bytes, check if we can split the current + // metric into multiple full metrics at a smaller size. + if i == 0 { + tmp := r.metrics[r.iM].Split(len(p)) + if len(tmp) > 1 { + r.splitMetrics = tmp + r.state = split + if r.splitMetrics[0].Len() < len(p) { + i += r.splitMetrics[0].SerializeTo(p) + r.iSM = 1 + } else { + // splitting didn't quite work, so we'll drop down and + // overflow the metric. + r.state = normal + r.iSM = 0 + } + } + } + + // if we haven't written any bytes and we're not at the end of the metrics + // slice, then it means we have a single metric that is larger than the + // provided buffer. + if i == 0 { + r.buf = r.metrics[r.iM].Serialize() + i += copy(p, r.buf[r.iB:]) + r.iB += i + r.state = overflow + } + + case split: + if r.splitMetrics[r.iSM].Len() < len(p) { + // write the current split metric + i += r.splitMetrics[r.iSM].SerializeTo(p) + r.iSM++ + if r.iSM >= len(r.splitMetrics) { + // done writing the current split metrics + r.iSM = 0 + r.iM++ + if r.iM == len(r.metrics) { + r.state = done + return i, io.EOF + } + r.state = normal + } + } else { + // This would only happen if we split the metric, and then a + // subsequent buffer was smaller than the initial one given, + // so that our split metric no longer fits. + r.buf = r.splitMetrics[r.iSM].Serialize() + i += copy(p, r.buf[r.iB:]) + r.iB += i + r.state = splitOverflow + } + + case splitOverflow: + i = copy(p, r.buf[r.iB:]) + r.iB += i + if r.iB >= len(r.buf) { + r.iB = 0 + r.iSM++ + if r.iSM == len(r.splitMetrics) { + r.iM++ + r.state = normal + } else { + r.state = split + } + } + + case overflow: + i = copy(p, r.buf[r.iB:]) + r.iB += i + if r.iB >= len(r.buf) { + r.iB = 0 + r.iM++ + if r.iM == len(r.metrics) { + r.state = done + return i, io.EOF + } + r.state = normal + } + } + + return i, nil +} diff --git a/metric/reader_test.go b/metric/reader_test.go new file mode 100644 index 000000000..a1c864ad5 --- /dev/null +++ b/metric/reader_test.go @@ -0,0 +1,487 @@ +package metric + +import ( + "io" + "io/ioutil" + "regexp" + "testing" + "time" + + "github.com/influxdata/telegraf" + + "github.com/stretchr/testify/assert" +) + +func BenchmarkMetricReader(b *testing.B) { + metrics := make([]telegraf.Metric, 10) + for i := 0; i < 10; i++ { + metrics[i], _ = New("foo", map[string]string{}, + map[string]interface{}{"value": int64(1)}, time.Now()) + } + for n := 0; n < b.N; n++ { + r := NewReader(metrics) + io.Copy(ioutil.Discard, r) + } +} + +func TestMetricReader(t *testing.T) { + ts := time.Unix(1481032190, 0) + metrics := make([]telegraf.Metric, 10) + for i := 0; i < 10; i++ { + metrics[i], _ = New("foo", map[string]string{}, + map[string]interface{}{"value": int64(1)}, ts) + } + + r := NewReader(metrics) + + buf := make([]byte, 35) + for i := 0; i < 10; i++ { + n, err := r.Read(buf) + if err != nil { + assert.True(t, err == io.EOF, err.Error()) + } + assert.Equal(t, 33, n) + assert.Equal(t, "foo value=1i 1481032190000000000\n", string(buf[0:n])) + } + + // reader should now be done, and always return 0, io.EOF + for i := 0; i < 10; i++ { + n, err := r.Read(buf) + assert.True(t, err == io.EOF, err.Error()) + assert.Equal(t, 0, n) + } +} + +func TestMetricReader_OverflowMetric(t *testing.T) { + ts := time.Unix(1481032190, 0) + m, _ := New("foo", map[string]string{}, + map[string]interface{}{"value": int64(10)}, ts) + metrics := []telegraf.Metric{m} + + r := NewReader(metrics) + buf := make([]byte, 5) + + tests := []struct { + exp string + err error + n int + }{ + { + "foo v", + nil, + 5, + }, + { + "alue=", + nil, + 5, + }, + { + "10i 1", + nil, + 5, + }, + { + "48103", + nil, + 5, + }, + { + "21900", + nil, + 5, + }, + { + "00000", + nil, + 5, + }, + { + "000\n", + io.EOF, + 4, + }, + { + "", + io.EOF, + 0, + }, + } + + for _, test := range tests { + n, err := r.Read(buf) + assert.Equal(t, test.n, n) + assert.Equal(t, test.exp, string(buf[0:n])) + assert.Equal(t, test.err, err) + } +} + +func TestMetricReader_OverflowMultipleMetrics(t *testing.T) { + ts := time.Unix(1481032190, 0) + m, _ := New("foo", map[string]string{}, + map[string]interface{}{"value": int64(10)}, ts) + metrics := []telegraf.Metric{m, m.Copy()} + + r := NewReader(metrics) + buf := make([]byte, 10) + + tests := []struct { + exp string + err error + n int + }{ + { + "foo value=", + nil, + 10, + }, + { + "10i 148103", + nil, + 10, + }, + { + "2190000000", + nil, + 10, + }, + { + "000\n", + nil, + 4, + }, + { + "foo value=", + nil, + 10, + }, + { + "10i 148103", + nil, + 10, + }, + { + "2190000000", + nil, + 10, + }, + { + "000\n", + io.EOF, + 4, + }, + { + "", + io.EOF, + 0, + }, + } + + for _, test := range tests { + n, err := r.Read(buf) + assert.Equal(t, test.n, n) + assert.Equal(t, test.exp, string(buf[0:n])) + assert.Equal(t, test.err, err) + } +} + +// test splitting a metric +func TestMetricReader_SplitMetric(t *testing.T) { + ts := time.Unix(1481032190, 0) + m1, _ := New("foo", map[string]string{}, + map[string]interface{}{ + "value1": int64(10), + "value2": int64(10), + "value3": int64(10), + "value4": int64(10), + "value5": int64(10), + "value6": int64(10), + }, + ts, + ) + metrics := []telegraf.Metric{m1} + + r := NewReader(metrics) + buf := make([]byte, 60) + + tests := []struct { + expRegex string + err error + n int + }{ + { + `foo value\d=10i,value\d=10i,value\d=10i 1481032190000000000\n`, + nil, + 57, + }, + { + `foo value\d=10i,value\d=10i,value\d=10i 1481032190000000000\n`, + io.EOF, + 57, + }, + { + "", + io.EOF, + 0, + }, + } + + for _, test := range tests { + n, err := r.Read(buf) + assert.Equal(t, test.n, n) + re := regexp.MustCompile(test.expRegex) + assert.True(t, re.MatchString(string(buf[0:n])), string(buf[0:n])) + assert.Equal(t, test.err, err) + } +} + +// test an array with one split metric and one unsplit +func TestMetricReader_SplitMetric2(t *testing.T) { + ts := time.Unix(1481032190, 0) + m1, _ := New("foo", map[string]string{}, + map[string]interface{}{ + "value1": int64(10), + "value2": int64(10), + "value3": int64(10), + "value4": int64(10), + "value5": int64(10), + "value6": int64(10), + }, + ts, + ) + m2, _ := New("foo", map[string]string{}, + map[string]interface{}{ + "value1": int64(10), + }, + ts, + ) + metrics := []telegraf.Metric{m1, m2} + + r := NewReader(metrics) + buf := make([]byte, 60) + + tests := []struct { + expRegex string + err error + n int + }{ + { + `foo value\d=10i,value\d=10i,value\d=10i 1481032190000000000\n`, + nil, + 57, + }, + { + `foo value\d=10i,value\d=10i,value\d=10i 1481032190000000000\n`, + nil, + 57, + }, + { + `foo value1=10i 1481032190000000000\n`, + io.EOF, + 35, + }, + { + "", + io.EOF, + 0, + }, + } + + for _, test := range tests { + n, err := r.Read(buf) + assert.Equal(t, test.n, n) + re := regexp.MustCompile(test.expRegex) + assert.True(t, re.MatchString(string(buf[0:n])), string(buf[0:n])) + assert.Equal(t, test.err, err) + } +} + +// test split that results in metrics that are still too long, which results in +// the reader falling back to regular overflow. +func TestMetricReader_SplitMetricTooLong(t *testing.T) { + ts := time.Unix(1481032190, 0) + m1, _ := New("foo", map[string]string{}, + map[string]interface{}{ + "value1": int64(10), + "value2": int64(10), + }, + ts, + ) + metrics := []telegraf.Metric{m1} + + r := NewReader(metrics) + buf := make([]byte, 30) + + tests := []struct { + expRegex string + err error + n int + }{ + { + `foo value\d=10i,value\d=10i 1481`, + nil, + 30, + }, + { + `032190000000000\n`, + io.EOF, + 16, + }, + { + "", + io.EOF, + 0, + }, + } + + for _, test := range tests { + n, err := r.Read(buf) + assert.Equal(t, test.n, n) + re := regexp.MustCompile(test.expRegex) + assert.True(t, re.MatchString(string(buf[0:n])), string(buf[0:n])) + assert.Equal(t, test.err, err) + } +} + +// test split with a changing buffer size in the middle of subsequent calls +// to Read +func TestMetricReader_SplitMetricChangingBuffer(t *testing.T) { + ts := time.Unix(1481032190, 0) + m1, _ := New("foo", map[string]string{}, + map[string]interface{}{ + "value1": int64(10), + "value2": int64(10), + "value3": int64(10), + }, + ts, + ) + m2, _ := New("foo", map[string]string{}, + map[string]interface{}{ + "value1": int64(10), + }, + ts, + ) + metrics := []telegraf.Metric{m1, m2} + + r := NewReader(metrics) + + tests := []struct { + expRegex string + err error + n int + buf []byte + }{ + { + `foo value\d=10i 1481032190000000000\n`, + nil, + 35, + make([]byte, 36), + }, + { + `foo value\d=10i 148103219000000`, + nil, + 30, + make([]byte, 30), + }, + { + `0000\n`, + nil, + 5, + make([]byte, 30), + }, + { + `foo value\d=10i 1481032190000000000\n`, + nil, + 35, + make([]byte, 36), + }, + { + `foo value1=10i 1481032190000000000\n`, + io.EOF, + 35, + make([]byte, 36), + }, + { + "", + io.EOF, + 0, + make([]byte, 36), + }, + } + + for _, test := range tests { + n, err := r.Read(test.buf) + assert.Equal(t, test.n, n, test.expRegex) + re := regexp.MustCompile(test.expRegex) + assert.True(t, re.MatchString(string(test.buf[0:n])), string(test.buf[0:n])) + assert.Equal(t, test.err, err, test.expRegex) + } +} + +// test split with a changing buffer size in the middle of subsequent calls +// to Read +func TestMetricReader_SplitMetricChangingBuffer2(t *testing.T) { + ts := time.Unix(1481032190, 0) + m1, _ := New("foo", map[string]string{}, + map[string]interface{}{ + "value1": int64(10), + "value2": int64(10), + }, + ts, + ) + m2, _ := New("foo", map[string]string{}, + map[string]interface{}{ + "value1": int64(10), + }, + ts, + ) + metrics := []telegraf.Metric{m1, m2} + + r := NewReader(metrics) + + tests := []struct { + expRegex string + err error + n int + buf []byte + }{ + { + `foo value\d=10i 1481032190000000000\n`, + nil, + 35, + make([]byte, 36), + }, + { + `foo value\d=10i 148103219000000`, + nil, + 30, + make([]byte, 30), + }, + { + `0000\n`, + nil, + 5, + make([]byte, 30), + }, + { + `foo value1=10i 1481032190000000000\n`, + io.EOF, + 35, + make([]byte, 36), + }, + { + "", + io.EOF, + 0, + make([]byte, 36), + }, + } + + for _, test := range tests { + n, err := r.Read(test.buf) + assert.Equal(t, test.n, n, test.expRegex) + re := regexp.MustCompile(test.expRegex) + assert.True(t, re.MatchString(string(test.buf[0:n])), string(test.buf[0:n])) + assert.Equal(t, test.err, err, test.expRegex) + } +} diff --git a/plugins/inputs/http_listener/http_listener.go b/plugins/inputs/http_listener/http_listener.go index 0f426f809..05551a966 100644 --- a/plugins/inputs/http_listener/http_listener.go +++ b/plugins/inputs/http_listener/http_listener.go @@ -300,6 +300,9 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) { } func (h *HTTPListener) parse(b []byte, t time.Time) error { + if !bytes.HasSuffix(b, []byte("\n")) { + b = append(b, '\n') + } metrics, err := h.parser.ParseWithDefaultTime(b, t) for _, m := range metrics { diff --git a/plugins/outputs/influxdb/client/client.go b/plugins/outputs/influxdb/client/client.go new file mode 100644 index 000000000..3f52752ad --- /dev/null +++ b/plugins/outputs/influxdb/client/client.go @@ -0,0 +1,22 @@ +package client + +import "io" + +type Client interface { + Query(command string) error + + Write(b []byte) (int, error) + WriteWithParams(b []byte, params WriteParams) (int, error) + + WriteStream(b io.Reader, contentLength int) (int, error) + WriteStreamWithParams(b io.Reader, contentLength int, params WriteParams) (int, error) + + Close() error +} + +type WriteParams struct { + Database string + RetentionPolicy string + Precision string + Consistency string +} diff --git a/plugins/outputs/influxdb/client/http.go b/plugins/outputs/influxdb/client/http.go new file mode 100644 index 000000000..68cc3e094 --- /dev/null +++ b/plugins/outputs/influxdb/client/http.go @@ -0,0 +1,258 @@ +package client + +import ( + "crypto/tls" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "net/url" + "time" + + "github.com/valyala/fasthttp" +) + +var ( + defaultRequestTimeout = time.Second * 5 +) + +// +func NewHTTP(config HTTPConfig, defaultWP WriteParams) (Client, error) { + // validate required parameters: + if len(config.URL) == 0 { + return nil, fmt.Errorf("config.URL is required to create an HTTP client") + } + if len(defaultWP.Database) == 0 { + return nil, fmt.Errorf("A default database is required to create an HTTP client") + } + + // set defaults: + if config.Timeout == 0 { + config.Timeout = defaultRequestTimeout + } + + // parse URL: + u, err := url.Parse(config.URL) + if err != nil { + return nil, fmt.Errorf("error parsing config.URL: %s", err) + } + if u.Scheme != "http" && u.Scheme != "https" { + return nil, fmt.Errorf("config.URL scheme must be http(s), got %s", u.Scheme) + } + + wu := writeURL(u, defaultWP) + return &httpClient{ + writeURL: []byte(wu), + config: config, + url: u, + client: &fasthttp.Client{ + TLSConfig: config.TLSConfig, + }, + }, nil +} + +type HTTPConfig struct { + // URL should be of the form "http://host:port" (REQUIRED) + URL string + + // UserAgent sets the User-Agent header. + UserAgent string + + // Timeout is the time to wait for a response to each HTTP request (writes + // and queries). + Timeout time.Duration + + // Username is the basic auth username for the server. + Username string + // Password is the basic auth password for the server. + Password string + + // TLSConfig is the tls auth settings to use for each request. + TLSConfig *tls.Config + + // Gzip, if true, compresses each payload using gzip. + // TODO + // Gzip bool +} + +// Response represents a list of statement results. +type Response struct { + // ignore Results: + Results []interface{} `json:"-"` + Err string `json:"error,omitempty"` +} + +// Error returns the first error from any statement. +// Returns nil if no errors occurred on any statements. +func (r *Response) Error() error { + if r.Err != "" { + return fmt.Errorf(r.Err) + } + return nil +} + +type httpClient struct { + writeURL []byte + config HTTPConfig + client *fasthttp.Client + url *url.URL +} + +func (c *httpClient) Query(command string) error { + req := c.makeRequest() + req.Header.SetRequestURI(queryURL(c.url, command)) + + return c.doRequest(req, fasthttp.StatusOK) +} + +func (c *httpClient) Write(b []byte) (int, error) { + req := c.makeWriteRequest(len(b), c.writeURL) + req.SetBody(b) + + err := c.doRequest(req, fasthttp.StatusNoContent) + if err == nil { + return len(b), nil + } + return 0, err +} + +func (c *httpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) { + req := c.makeWriteRequest(len(b), []byte(writeURL(c.url, wp))) + req.SetBody(b) + + err := c.doRequest(req, fasthttp.StatusNoContent) + if err == nil { + return len(b), nil + } + return 0, err +} + +func (c *httpClient) WriteStream(r io.Reader, contentLength int) (int, error) { + req := c.makeWriteRequest(contentLength, c.writeURL) + req.SetBodyStream(r, contentLength) + + err := c.doRequest(req, fasthttp.StatusNoContent) + if err == nil { + return contentLength, nil + } + return 0, err +} + +func (c *httpClient) WriteStreamWithParams( + r io.Reader, + contentLength int, + wp WriteParams, +) (int, error) { + req := c.makeWriteRequest(contentLength, []byte(writeURL(c.url, wp))) + req.SetBodyStream(r, contentLength) + + err := c.doRequest(req, fasthttp.StatusNoContent) + if err == nil { + return contentLength, nil + } + return 0, err +} + +func (c *httpClient) doRequest( + req *fasthttp.Request, + expectedCode int, +) error { + resp := fasthttp.AcquireResponse() + + err := c.client.DoTimeout(req, resp, c.config.Timeout) + + code := resp.StatusCode() + // If it's a "no content" response, then release and return nil + if code == fasthttp.StatusNoContent { + fasthttp.ReleaseResponse(resp) + fasthttp.ReleaseRequest(req) + return nil + } + + // not a "no content" response, so parse the result: + var response Response + decErr := json.Unmarshal(resp.Body(), &response) + + // If we got a JSON decode error, send that back + if decErr != nil { + err = fmt.Errorf("Unable to decode json: received status code %d err: %s", code, decErr) + } + // Unexpected response code OR error in JSON response body overrides + // a JSON decode error: + if code != expectedCode || response.Error() != nil { + err = fmt.Errorf("Response Error: Status Code [%d], expected [%d], [%v]", + code, expectedCode, response.Error()) + } + + fasthttp.ReleaseResponse(resp) + fasthttp.ReleaseRequest(req) + + return err +} + +func (c *httpClient) makeWriteRequest( + contentLength int, + writeURL []byte, +) *fasthttp.Request { + req := c.makeRequest() + req.Header.SetContentLength(contentLength) + req.Header.SetRequestURIBytes(writeURL) + // TODO + // if gzip { + // req.Header.SetBytesKV([]byte("Content-Encoding"), []byte("gzip")) + // } + return req +} + +func (c *httpClient) makeRequest() *fasthttp.Request { + req := fasthttp.AcquireRequest() + req.Header.SetContentTypeBytes([]byte("text/plain")) + req.Header.SetMethodBytes([]byte("POST")) + req.Header.SetUserAgent(c.config.UserAgent) + if c.config.Username != "" && c.config.Password != "" { + req.Header.Set("Authorization", "Basic "+basicAuth(c.config.Username, c.config.Password)) + } + return req +} + +func (c *httpClient) Close() error { + // Nothing to do. + return nil +} + +func writeURL(u *url.URL, wp WriteParams) string { + params := url.Values{} + params.Set("db", wp.Database) + if wp.RetentionPolicy != "" { + params.Set("rp", wp.RetentionPolicy) + } + if wp.Precision != "n" && wp.Precision != "" { + params.Set("precision", wp.Precision) + } + if wp.Consistency != "one" && wp.Consistency != "" { + params.Set("consistency", wp.Consistency) + } + + u.RawQuery = params.Encode() + u.Path = "write" + return u.String() +} + +func queryURL(u *url.URL, command string) string { + params := url.Values{} + params.Set("q", command) + + u.RawQuery = params.Encode() + u.Path = "query" + return u.String() +} + +// See 2 (end of page 4) http://www.ietf.org/rfc/rfc2617.txt +// "To receive authorization, the httpClient sends the userid and password, +// separated by a single colon (":") character, within a base64 +// encoded string in the credentials." +// It is not meant to be urlencoded. +func basicAuth(username, password string) string { + auth := username + ":" + password + return base64.StdEncoding.EncodeToString([]byte(auth)) +} diff --git a/plugins/outputs/influxdb/client/http_test.go b/plugins/outputs/influxdb/client/http_test.go new file mode 100644 index 000000000..8fa02dd22 --- /dev/null +++ b/plugins/outputs/influxdb/client/http_test.go @@ -0,0 +1,343 @@ +package client + +import ( + "bytes" + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestHTTPClient_Write(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/write": + // test form values: + if r.FormValue("db") != "test" { + w.WriteHeader(http.StatusTeapot) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"results":[{}],"error":"wrong db name"}`) + } + if r.FormValue("rp") != "policy" { + w.WriteHeader(http.StatusTeapot) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"results":[{}],"error":"wrong rp name"}`) + } + if r.FormValue("precision") != "ns" { + w.WriteHeader(http.StatusTeapot) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"results":[{}],"error":"wrong precision"}`) + } + if r.FormValue("consistency") != "all" { + w.WriteHeader(http.StatusTeapot) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"results":[{}],"error":"wrong consistency"}`) + } + // test that user agent is set properly + if r.UserAgent() != "test-agent" { + w.WriteHeader(http.StatusTeapot) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"results":[{}],"error":"wrong agent name"}`) + } + // test basic auth params + user, pass, ok := r.BasicAuth() + if !ok { + w.WriteHeader(http.StatusTeapot) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"results":[{}],"error":"basic auth not set"}`) + } + if user != "test-user" || pass != "test-password" { + w.WriteHeader(http.StatusTeapot) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"results":[{}],"error":"basic auth incorrect"}`) + } + + // Validate Content-Length Header + if r.ContentLength != 13 { + w.WriteHeader(http.StatusTeapot) + w.Header().Set("Content-Type", "application/json") + msg := fmt.Sprintf(`{"results":[{}],"error":"Content-Length: expected [13], got [%d]"}`, r.ContentLength) + fmt.Fprintln(w, msg) + } + + // Validate the request body: + buf := make([]byte, 100) + n, _ := r.Body.Read(buf) + expected := "cpu value=99" + got := string(buf[0 : n-1]) + if expected != got { + w.WriteHeader(http.StatusTeapot) + w.Header().Set("Content-Type", "application/json") + msg := fmt.Sprintf(`{"results":[{}],"error":"expected [%s], got [%s]"}`, expected, got) + fmt.Fprintln(w, msg) + } + + w.WriteHeader(http.StatusNoContent) + w.Header().Set("Content-Type", "application/json") + case "/query": + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"results":[{}]}`) + } + })) + defer ts.Close() + + config := HTTPConfig{ + URL: ts.URL, + UserAgent: "test-agent", + Username: "test-user", + Password: "test-password", + } + wp := WriteParams{ + Database: "test", + RetentionPolicy: "policy", + Precision: "ns", + Consistency: "all", + } + client, err := NewHTTP(config, wp) + defer client.Close() + assert.NoError(t, err) + n, err := client.Write([]byte("cpu value=99\n")) + assert.Equal(t, 13, n) + assert.NoError(t, err) + + _, err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n")), 13) + assert.NoError(t, err) +} + +func TestHTTPClient_WriteParamsOverride(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/write": + // test that database is set properly + if r.FormValue("db") != "override" { + w.WriteHeader(http.StatusTeapot) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"results":[{}],"error":"wrong db name"}`) + } + + // Validate the request body: + buf := make([]byte, 100) + n, _ := r.Body.Read(buf) + expected := "cpu value=99" + got := string(buf[0 : n-1]) + if expected != got { + w.WriteHeader(http.StatusTeapot) + w.Header().Set("Content-Type", "application/json") + msg := fmt.Sprintf(`{"results":[{}],"error":"expected [%s], got [%s]"}`, expected, got) + fmt.Fprintln(w, msg) + } + + w.WriteHeader(http.StatusNoContent) + w.Header().Set("Content-Type", "application/json") + case "/query": + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"results":[{}]}`) + } + })) + defer ts.Close() + + config := HTTPConfig{ + URL: ts.URL, + } + defaultWP := WriteParams{ + Database: "test", + } + client, err := NewHTTP(config, defaultWP) + defer client.Close() + assert.NoError(t, err) + + // test that WriteWithParams overrides the default write params + wp := WriteParams{ + Database: "override", + } + n, err := client.WriteWithParams([]byte("cpu value=99\n"), wp) + assert.Equal(t, 13, n) + assert.NoError(t, err) + + _, err = client.WriteStreamWithParams(bytes.NewReader([]byte("cpu value=99\n")), 13, wp) + assert.NoError(t, err) +} + +func TestHTTPClient_Write_Errors(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/write": + w.WriteHeader(http.StatusTeapot) + case "/query": + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"results":[{}]}`) + } + })) + defer ts.Close() + + config := HTTPConfig{ + URL: ts.URL, + } + defaultWP := WriteParams{ + Database: "test", + } + client, err := NewHTTP(config, defaultWP) + defer client.Close() + assert.NoError(t, err) + + lp := []byte("cpu value=99\n") + n, err := client.Write(lp) + assert.Equal(t, 0, n) + assert.Error(t, err) + + n, err = client.WriteStream(bytes.NewReader(lp), 13) + assert.Equal(t, 0, n) + assert.Error(t, err) + + wp := WriteParams{ + Database: "override", + } + n, err = client.WriteWithParams(lp, wp) + assert.Equal(t, 0, n) + assert.Error(t, err) + + n, err = client.WriteStreamWithParams(bytes.NewReader(lp), 13, wp) + assert.Equal(t, 0, n) + assert.Error(t, err) +} + +func TestNewHTTPErrors(t *testing.T) { + // No URL: + config := HTTPConfig{} + defaultWP := WriteParams{ + Database: "test", + } + client, err := NewHTTP(config, defaultWP) + assert.Error(t, err) + assert.Nil(t, client) + + // No Database: + config = HTTPConfig{ + URL: "http://localhost:8086", + } + defaultWP = WriteParams{} + client, err = NewHTTP(config, defaultWP) + assert.Nil(t, client) + assert.Error(t, err) + + // Invalid URL: + config = HTTPConfig{ + URL: "http://192.168.0.%31:8080/", + } + defaultWP = WriteParams{ + Database: "test", + } + client, err = NewHTTP(config, defaultWP) + assert.Nil(t, client) + assert.Error(t, err) + + // Invalid URL scheme: + config = HTTPConfig{ + URL: "mailto://localhost:8086", + } + defaultWP = WriteParams{ + Database: "test", + } + client, err = NewHTTP(config, defaultWP) + assert.Nil(t, client) + assert.Error(t, err) +} + +func TestHTTPClient_Query(t *testing.T) { + command := "CREATE DATABASE test" + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/write": + w.WriteHeader(http.StatusNoContent) + case "/query": + // validate the create database command is correct + got := r.FormValue("q") + if got != command { + w.WriteHeader(http.StatusTeapot) + w.Header().Set("Content-Type", "application/json") + msg := fmt.Sprintf(`{"results":[{}],"error":"got %s, expected %s"}`, got, command) + fmt.Fprintln(w, msg) + } + + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"results":[{}]}`) + } + })) + defer ts.Close() + + config := HTTPConfig{ + URL: ts.URL, + } + defaultWP := WriteParams{ + Database: "test", + } + client, err := NewHTTP(config, defaultWP) + defer client.Close() + assert.NoError(t, err) + err = client.Query(command) + assert.NoError(t, err) +} + +func TestHTTPClient_Query_ResponseError(t *testing.T) { + command := "CREATE DATABASE test" + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/write": + w.WriteHeader(http.StatusNoContent) + case "/query": + w.WriteHeader(http.StatusTeapot) + w.Header().Set("Content-Type", "application/json") + msg := fmt.Sprintf(`{"results":[{}],"error":"couldnt create database"}`) + fmt.Fprintln(w, msg) + } + })) + defer ts.Close() + + config := HTTPConfig{ + URL: ts.URL, + } + defaultWP := WriteParams{ + Database: "test", + } + client, err := NewHTTP(config, defaultWP) + defer client.Close() + assert.NoError(t, err) + err = client.Query(command) + assert.Error(t, err) +} + +func TestHTTPClient_Query_JSONDecodeError(t *testing.T) { + command := "CREATE DATABASE test" + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/write": + w.WriteHeader(http.StatusNoContent) + case "/query": + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + // write JSON missing a ']' + msg := fmt.Sprintf(`{"results":[{}}`) + fmt.Fprintln(w, msg) + } + })) + defer ts.Close() + + config := HTTPConfig{ + URL: ts.URL, + } + defaultWP := WriteParams{ + Database: "test", + } + client, err := NewHTTP(config, defaultWP) + defer client.Close() + assert.NoError(t, err) + err = client.Query(command) + assert.Error(t, err) + assert.Contains(t, err.Error(), "json") +} diff --git a/plugins/outputs/influxdb/client/udp.go b/plugins/outputs/influxdb/client/udp.go new file mode 100644 index 000000000..d542ecf63 --- /dev/null +++ b/plugins/outputs/influxdb/client/udp.go @@ -0,0 +1,99 @@ +package client + +import ( + "bytes" + "fmt" + "io" + "net" + "net/url" +) + +const ( + // UDPPayloadSize is a reasonable default payload size for UDP packets that + // could be travelling over the internet. + UDPPayloadSize = 512 +) + +// UDPConfig is the config data needed to create a UDP Client +type UDPConfig struct { + // URL should be of the form "udp://host:port" + // or "udp://[ipv6-host%zone]:port". + URL string + + // PayloadSize is the maximum size of a UDP client message, optional + // Tune this based on your network. Defaults to UDPPayloadSize. + PayloadSize int +} + +func NewUDP(config UDPConfig) (Client, error) { + p, err := url.Parse(config.URL) + if err != nil { + return nil, fmt.Errorf("Error parsing UDP url [%s]: %s", config.URL, err) + } + + udpAddr, err := net.ResolveUDPAddr("udp", p.Host) + if err != nil { + return nil, fmt.Errorf("Error resolving UDP Address [%s]: %s", p.Host, err) + } + + conn, err := net.DialUDP("udp", nil, udpAddr) + if err != nil { + return nil, fmt.Errorf("Error dialing UDP address [%s]: %s", + udpAddr.String(), err) + } + + size := config.PayloadSize + if size == 0 { + size = UDPPayloadSize + } + buf := make([]byte, size) + return &udpClient{conn: conn, buffer: buf}, nil +} + +type udpClient struct { + conn *net.UDPConn + buffer []byte +} + +func (c *udpClient) Query(command string) error { + return nil +} + +func (c *udpClient) Write(b []byte) (int, error) { + return c.WriteStream(bytes.NewReader(b), -1) +} + +// write params are ignored by the UDP client +func (c *udpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) { + return c.WriteStream(bytes.NewReader(b), -1) +} + +// contentLength is ignored by the UDP client. +func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) { + var totaln int + for { + nR, err := r.Read(c.buffer) + if nR == 0 { + break + } + if err != io.EOF && err != nil { + return totaln, err + } + nW, err := c.conn.Write(c.buffer[0:nR]) + totaln += nW + if err != nil { + return totaln, err + } + } + return totaln, nil +} + +// contentLength is ignored by the UDP client. +// write params are ignored by the UDP client +func (c *udpClient) WriteStreamWithParams(r io.Reader, contentLength int, wp WriteParams) (int, error) { + return c.WriteStream(r, -1) +} + +func (c *udpClient) Close() error { + return c.conn.Close() +} diff --git a/plugins/outputs/influxdb/client/udp_test.go b/plugins/outputs/influxdb/client/udp_test.go new file mode 100644 index 000000000..31196ddca --- /dev/null +++ b/plugins/outputs/influxdb/client/udp_test.go @@ -0,0 +1,163 @@ +package client + +import ( + "bytes" + "net" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + + "github.com/stretchr/testify/assert" +) + +func TestUDPClient(t *testing.T) { + config := UDPConfig{ + URL: "udp://localhost:8089", + } + client, err := NewUDP(config) + assert.NoError(t, err) + + err = client.Query("ANY QUERY RETURNS NIL") + assert.NoError(t, err) + + assert.NoError(t, client.Close()) +} + +func TestNewUDPClient_Errors(t *testing.T) { + // url.Parse Error + config := UDPConfig{ + URL: "udp://localhost%35:8089", + } + _, err := NewUDP(config) + assert.Error(t, err) + + // ResolveUDPAddr Error + config = UDPConfig{ + URL: "udp://localhost:999999", + } + _, err = NewUDP(config) + assert.Error(t, err) +} + +func TestUDPClient_Write(t *testing.T) { + config := UDPConfig{ + URL: "udp://localhost:8199", + } + client, err := NewUDP(config) + assert.NoError(t, err) + + packets := make(chan string, 100) + address, err := net.ResolveUDPAddr("udp", "localhost:8199") + assert.NoError(t, err) + listener, err := net.ListenUDP("udp", address) + defer listener.Close() + assert.NoError(t, err) + go func() { + buf := make([]byte, 200) + for { + n, _, err := listener.ReadFromUDP(buf) + if err != nil { + packets <- err.Error() + } + packets <- string(buf[0:n]) + } + }() + + // test sending simple metric + time.Sleep(time.Second) + n, err := client.Write([]byte("cpu value=99\n")) + assert.Equal(t, n, 13) + assert.NoError(t, err) + pkt := <-packets + assert.Equal(t, "cpu value=99\n", pkt) + + metrics := `cpu value=99 +cpu value=55 +cpu value=44 +cpu value=101 +cpu value=91 +cpu value=92 +` + // test sending packet with 6 metrics in a stream. + reader := bytes.NewReader([]byte(metrics)) + // contentLength is ignored: + n, err = client.WriteStream(reader, 10) + assert.Equal(t, n, len(metrics)) + assert.NoError(t, err) + pkt = <-packets + assert.Equal(t, "cpu value=99\ncpu value=55\ncpu value=44\ncpu value=101\ncpu value=91\ncpu value=92\n", pkt) + + // + // Test that UDP packets get broken up properly: + config2 := UDPConfig{ + URL: "udp://localhost:8199", + PayloadSize: 25, + } + client2, err := NewUDP(config2) + assert.NoError(t, err) + + wp := WriteParams{} + + // + // Using Write(): + buf := []byte(metrics) + n, err = client2.WriteWithParams(buf, wp) + assert.Equal(t, n, len(metrics)) + assert.NoError(t, err) + pkt = <-packets + assert.Equal(t, "cpu value=99\ncpu value=55", pkt) + pkt = <-packets + assert.Equal(t, "\ncpu value=44\ncpu value=1", pkt) + pkt = <-packets + assert.Equal(t, "01\ncpu value=91\ncpu value", pkt) + pkt = <-packets + assert.Equal(t, "=92\n", pkt) + + // + // Using WriteStream(): + reader = bytes.NewReader([]byte(metrics)) + n, err = client2.WriteStreamWithParams(reader, 10, wp) + assert.Equal(t, n, len(metrics)) + assert.NoError(t, err) + pkt = <-packets + assert.Equal(t, "cpu value=99\ncpu value=55", pkt) + pkt = <-packets + assert.Equal(t, "\ncpu value=44\ncpu value=1", pkt) + pkt = <-packets + assert.Equal(t, "01\ncpu value=91\ncpu value", pkt) + pkt = <-packets + assert.Equal(t, "=92\n", pkt) + + // + // Using WriteStream() & a metric.Reader: + config3 := UDPConfig{ + URL: "udp://localhost:8199", + PayloadSize: 40, + } + client3, err := NewUDP(config3) + assert.NoError(t, err) + + now := time.Unix(1484142942, 0) + m1, _ := metric.New("test", map[string]string{}, + map[string]interface{}{"value": 1.1}, now) + m2, _ := metric.New("test", map[string]string{}, + map[string]interface{}{"value": 1.1}, now) + m3, _ := metric.New("test", map[string]string{}, + map[string]interface{}{"value": 1.1}, now) + ms := []telegraf.Metric{m1, m2, m3} + mReader := metric.NewReader(ms) + n, err = client3.WriteStreamWithParams(mReader, 10, wp) + // 3 metrics at 35 bytes each (including the newline) + assert.Equal(t, 105, n) + assert.NoError(t, err) + pkt = <-packets + assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt) + pkt = <-packets + assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt) + pkt = <-packets + assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt) + + assert.NoError(t, client.Close()) +} diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index 999e1bc6f..06d8bd042 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -1,19 +1,18 @@ package influxdb import ( - "errors" "fmt" "log" "math/rand" - "net/url" "strings" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/outputs" - "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/telegraf/plugins/outputs/influxdb/client" ) type InfluxDB struct { @@ -41,7 +40,7 @@ type InfluxDB struct { // Precision is only here for legacy support. It will be ignored. Precision string - conns []client.Client + clients []client.Client } var sampleConfig = ` @@ -88,79 +87,56 @@ func (i *InfluxDB) Connect() error { urls = append(urls, i.URL) } - tlsCfg, err := internal.GetTLSConfig( + tlsConfig, err := internal.GetTLSConfig( i.SSLCert, i.SSLKey, i.SSLCA, i.InsecureSkipVerify) if err != nil { return err } - var conns []client.Client for _, u := range urls { switch { case strings.HasPrefix(u, "udp"): - parsed_url, err := url.Parse(u) - if err != nil { - return err - } - - if i.UDPPayload == 0 { - i.UDPPayload = client.UDPPayloadSize - } - c, err := client.NewUDPClient(client.UDPConfig{ - Addr: parsed_url.Host, + config := client.UDPConfig{ + URL: u, PayloadSize: i.UDPPayload, - }) - if err != nil { - return err + c, err := client.NewUDP(config) } - conns = append(conns, c) + if err != nil { + return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err) + } + i.clients = append(i.clients, c) default: // If URL doesn't start with "udp", assume HTTP client - c, err := client.NewHTTPClient(client.HTTPConfig{ - Addr: u, - Username: i.Username, - Password: i.Password, - UserAgent: i.UserAgent, + config := client.HTTPConfig{ + URL: u, Timeout: i.Timeout.Duration, - TLSConfig: tlsCfg, - }) - if err != nil { - return err + TLSConfig: tlsConfig, + UserAgent: i.UserAgent, } + wp := client.WriteParams{ + Database: i.Database, + RetentionPolicy: i.RetentionPolicy, + Consistency: i.WriteConsistency, + } + c, err := client.NewHTTP(config, wp) + if err != nil { + return fmt.Errorf("Error creating HTTP Client [%s]: %s", u, err) + } + i.clients = append(i.clients, c) - err = createDatabase(c, i.Database) + err = c.Query("CREATE DATABASE " + i.Database) if err != nil { log.Println("E! Database creation failed: " + err.Error()) continue } - - conns = append(conns, c) } } - i.conns = conns rand.Seed(time.Now().UnixNano()) return nil } -func createDatabase(c client.Client, database string) error { - // Create Database if it doesn't exist - _, err := c.Query(client.Query{ - Command: fmt.Sprintf("CREATE DATABASE \"%s\"", database), - }) - return err -} - func (i *InfluxDB) Close() error { - var errS string - for j, _ := range i.conns { - if err := i.conns[j].Close(); err != nil { - errS += err.Error() - } - } - if errS != "" { - return fmt.Errorf("output influxdb close failed: %s", errS) - } return nil } @@ -175,34 +151,24 @@ func (i *InfluxDB) Description() string { // Choose a random server in the cluster to write to until a successful write // occurs, logging each unsuccessful. If all servers fail, return error. func (i *InfluxDB) Write(metrics []telegraf.Metric) error { - if len(i.conns) == 0 { - err := i.Connect() - if err != nil { - return err - } - } - bp, err := client.NewBatchPoints(client.BatchPointsConfig{ - Database: i.Database, - RetentionPolicy: i.RetentionPolicy, - WriteConsistency: i.WriteConsistency, - }) - if err != nil { - return err - } - - for _, metric := range metrics { - bp.AddPoint(metric.Point()) + bufsize := 0 + for _, m := range metrics { + bufsize += m.Len() + r := metric.NewReader(metrics) } // This will get set to nil if a successful write occurs - err = errors.New("Could not write to any InfluxDB server in cluster") + err := fmt.Errorf("Could not write to any InfluxDB server in cluster") - p := rand.Perm(len(i.conns)) + p := rand.Perm(len(i.clients)) for _, n := range p { - if e := i.conns[n].Write(bp); e != nil { - // If the database was not found, try to recreate it + if _, e := i.clients[n].WriteStream(r, bufsize); e != nil { + // Log write failure: + log.Printf("E! InfluxDB Output Error: %s", e) + + // If the database was not found, try to recreate it: if strings.Contains(e.Error(), "database not found") { - if errc := createDatabase(i.conns[n], i.Database); errc != nil { + if errc := i.clients[n].Query("CREATE DATABASE " + i.Database); errc != nil { log.Printf("E! Error: Database %s not found and failed to recreate\n", i.Database) } @@ -225,10 +191,12 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { return err } -func init() { - outputs.Add("influxdb", func() telegraf.Output { - return &InfluxDB{ - Timeout: internal.Duration{Duration: time.Second * 5}, - } - }) +func newInflux() *InfluxDB { + return &InfluxDB{ + Timeout: internal.Duration{Duration: time.Second * 5}, + } +} + +func init() { + outputs.Add("influxdb", func() telegraf.Output { return newInflux() }) } diff --git a/plugins/outputs/influxdb/influxdb_test.go b/plugins/outputs/influxdb/influxdb_test.go index 1414fa839..db2cd5ec7 100644 --- a/plugins/outputs/influxdb/influxdb_test.go +++ b/plugins/outputs/influxdb/influxdb_test.go @@ -20,22 +20,123 @@ func TestUDPInflux(t *testing.T) { require.NoError(t, err) err = i.Write(testutil.MockMetrics()) require.NoError(t, err) + require.NoError(t, i.Close()) } func TestHTTPInflux(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Header().Set("Content-Type", "application/json") - fmt.Fprintln(w, `{"results":[{}]}`) + switch r.URL.Path { + case "/write": + // test that database is set properly + if r.FormValue("db") != "test" { + w.WriteHeader(http.StatusTeapot) + w.Header().Set("Content-Type", "application/json") + } + // test that user agent is set properly + if r.UserAgent() != "telegraf" { + w.WriteHeader(http.StatusTeapot) + w.Header().Set("Content-Type", "application/json") + } + w.WriteHeader(http.StatusNoContent) + w.Header().Set("Content-Type", "application/json") + case "/query": + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"results":[{}]}`) + } })) defer ts.Close() - i := InfluxDB{ - URLs: []string{ts.URL}, - } + i := newInflux() + i.URLs = []string{ts.URL} + i.Database = "test" + i.UserAgent = "telegraf" err := i.Connect() require.NoError(t, err) err = i.Write(testutil.MockMetrics()) require.NoError(t, err) + require.NoError(t, i.Close()) +} + +func TestUDPConnectError(t *testing.T) { + i := InfluxDB{ + URLs: []string{"udp://foobar:8089"}, + } + + err := i.Connect() + require.Error(t, err) + + i = InfluxDB{ + URLs: []string{"udp://localhost:9999999"}, + } + + err = i.Connect() + require.Error(t, err) +} + +func TestHTTPConnectError_InvalidURL(t *testing.T) { + i := InfluxDB{ + URLs: []string{"http://foobar:8089"}, + } + + err := i.Connect() + require.Error(t, err) + + i = InfluxDB{ + URLs: []string{"http://localhost:9999999"}, + } + + err = i.Connect() + require.Error(t, err) +} + +func TestHTTPConnectError_DatabaseCreateFail(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/query": + w.WriteHeader(http.StatusNotFound) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"results":[{}],"error":"test error"}`) + } + })) + defer ts.Close() + + i := InfluxDB{ + URLs: []string{ts.URL}, + Database: "test", + } + + // database creation errors do not return an error from Connect + // they are only logged. + err := i.Connect() + require.NoError(t, err) + require.NoError(t, i.Close()) +} + +func TestHTTPError_DatabaseNotFound(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/write": + w.WriteHeader(http.StatusNotFound) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"results":[{}],"error":"database not found"}`) + case "/query": + w.WriteHeader(http.StatusNotFound) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"results":[{}],"error":"database not found"}`) + } + })) + defer ts.Close() + + i := InfluxDB{ + URLs: []string{ts.URL}, + Database: "test", + } + + err := i.Connect() + require.NoError(t, err) + err = i.Write(testutil.MockMetrics()) + require.Error(t, err) + require.NoError(t, i.Close()) }