Add new line protocol parser and serializer, influxdb output (#3924)

This commit is contained in:
Daniel Nelson
2018-03-27 17:30:51 -07:00
committed by GitHub
parent 503881d4d7
commit 1c0f63a90d
70 changed files with 26827 additions and 6533 deletions

View File

@@ -1,35 +1,40 @@
# InfluxDB Output Plugin
This plugin writes to [InfluxDB](https://www.influxdb.com) via HTTP or UDP.
This InfluxDB output plugin writes metrics to the [InfluxDB](https://github.com/influxdata/influxdb) HTTP or UDP service.
### Configuration:
```toml
# Configuration for influxdb server to send metrics to
# Configuration for sending metrics to InfluxDB
[[outputs.influxdb]]
## The full HTTP or UDP URL for your InfluxDB instance.
##
## Multiple urls can be specified as part of the same cluster,
## this means that only ONE of the urls will be written to each interval.
# urls = ["udp://127.0.0.1:8089"] # UDP endpoint example
urls = ["http://127.0.0.1:8086"] # required
## The target database for metrics (telegraf will create it if not exists).
database = "telegraf" # required
## Multiple URLs can be specified for a single cluster, only ONE of the
## urls will be written to each interval.
# urls = ["udp://127.0.0.1:8089"]
# urls = ["http://127.0.0.1:8086"]
## The target database for metrics; will be created as needed.
# database = "telegraf"
## Name of existing retention policy to write to. Empty string writes to
## the default retention policy.
retention_policy = ""
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any"
# retention_policy = ""
## Write timeout (for the InfluxDB client), formatted as a string.
## If not provided, will default to 5s. 0s means no timeout (not recommended).
timeout = "5s"
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
# write_consistency = "any"
## Timeout for HTTP messages.
# timeout = "5s"
## HTTP Basic Auth
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
## Set the user agent for HTTP POSTs (can be useful for log differentiation)
## HTTP User-Agent
# user_agent = "telegraf"
## Set UDP payload size, defaults to InfluxDB UDP Client default (512 bytes)
## UDP payload size is the maximum packet size to send.
# udp_payload = 512
## Optional SSL Config
@@ -39,37 +44,14 @@ This plugin writes to [InfluxDB](https://www.influxdb.com) via HTTP or UDP.
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
## HTTP Proxy Config
## HTTP Proxy override, if unset values the standard proxy environment
## variables are consulted to determine which proxy, if any, should be used.
# http_proxy = "http://corporate.proxy:3128"
## Optional HTTP headers
## Additional HTTP headers
# http_headers = {"X-Special-Header" = "Special-Value"}
## Compress each HTTP request payload using GZIP.
# content_encoding = "gzip"
## HTTP Content-Encoding for write request body, can be set to "gzip" to
## compress body or "identity" to apply no encoding.
# content_encoding = "identity"
```
### Required parameters:
* `urls`: List of strings, this is for InfluxDB clustering
support. On each flush interval, Telegraf will randomly choose one of the urls
to write to. Each URL should start with either `http://` or `udp://`
* `database`: The name of the database to write to.
### Optional parameters:
* `write_consistency`: Write consistency (clusters only), can be: "any", "one", "quorum", "all".
* `retention_policy`: Name of existing retention policy to write to. Empty string writes to the default retention policy.
* `timeout`: Write timeout (for the InfluxDB client), formatted as a string. If not provided, will default to 5s. 0s means no timeout (not recommended).
* `username`: Username for influxdb
* `password`: Password for influxdb
* `user_agent`: Set the user agent for HTTP POSTs (can be useful for log differentiation)
* `udp_payload`: Set UDP payload size, defaults to InfluxDB UDP Client default (512 bytes)
* `ssl_ca`: SSL CA
* `ssl_cert`: SSL CERT
* `ssl_key`: SSL key
* `insecure_skip_verify`: Use SSL but skip chain & host verification (default: false)
* `http_proxy`: HTTP Proxy URI
* `http_headers`: HTTP headers to add to each HTTP request
* `content_encoding`: Compress each HTTP request payload using gzip if set to: "gzip"

View File

@@ -1,16 +0,0 @@
package client
import "io"
type Client interface {
Query(command string) error
WriteStream(b io.Reader) error
Close() error
}
type WriteParams struct {
Database string
RetentionPolicy string
Precision string
Consistency string
}

View File

@@ -1,277 +0,0 @@
package client
import (
"bytes"
"compress/gzip"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"path"
"time"
)
var (
defaultRequestTimeout = time.Second * 5
)
func NewHTTP(config HTTPConfig, defaultWP WriteParams) (Client, error) {
// validate required parameters:
if len(config.URL) == 0 {
return nil, fmt.Errorf("config.URL is required to create an HTTP client")
}
if len(defaultWP.Database) == 0 {
return nil, fmt.Errorf("A default database is required to create an HTTP client")
}
// set defaults:
if config.Timeout == 0 {
config.Timeout = defaultRequestTimeout
}
// parse URL:
u, err := url.Parse(config.URL)
if err != nil {
return nil, fmt.Errorf("error parsing config.URL: %s", err)
}
if u.Scheme != "http" && u.Scheme != "https" {
return nil, fmt.Errorf("config.URL scheme must be http(s), got %s", u.Scheme)
}
var transport http.Transport
if len(config.HTTPProxy) > 0 {
proxyURL, err := url.Parse(config.HTTPProxy)
if err != nil {
return nil, fmt.Errorf("error parsing config.HTTPProxy: %s", err)
}
transport = http.Transport{
Proxy: http.ProxyURL(proxyURL),
TLSClientConfig: config.TLSConfig,
}
} else {
transport = http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: config.TLSConfig,
}
}
return &httpClient{
writeURL: writeURL(u, defaultWP),
config: config,
url: u,
client: &http.Client{
Timeout: config.Timeout,
Transport: &transport,
},
}, nil
}
type HTTPHeaders map[string]string
type HTTPConfig struct {
// URL should be of the form "http://host:port" (REQUIRED)
URL string
// UserAgent sets the User-Agent header.
UserAgent string
// Timeout specifies a time limit for requests made by this
// Client. The timeout includes connection time, any
// redirects, and reading the response body. The timer remains
// running after Get, Head, Post, or Do return and will
// interrupt reading of the Response.Body.
//
// A Timeout of zero means no timeout.
Timeout time.Duration
// Username is the basic auth username for the server.
Username string
// Password is the basic auth password for the server.
Password string
// TLSConfig is the tls auth settings to use for each request.
TLSConfig *tls.Config
// Proxy URL should be of the form "http://host:port"
HTTPProxy string
// HTTP headers to append to HTTP requests.
HTTPHeaders HTTPHeaders
// The content encoding mechanism to use for each request.
ContentEncoding string
}
// Response represents a list of statement results.
type Response struct {
// ignore Results:
Results []interface{} `json:"-"`
Err string `json:"error,omitempty"`
}
// Error returns the first error from any statement.
// Returns nil if no errors occurred on any statements.
func (r *Response) Error() error {
if r.Err != "" {
return fmt.Errorf(r.Err)
}
return nil
}
type httpClient struct {
writeURL string
config HTTPConfig
client *http.Client
url *url.URL
}
func (c *httpClient) Query(command string) error {
req, err := c.makeRequest(queryURL(c.url, command), bytes.NewReader([]byte("")))
if err != nil {
return err
}
return c.doRequest(req, http.StatusOK)
}
func (c *httpClient) WriteStream(r io.Reader) error {
req, err := c.makeWriteRequest(r, c.writeURL)
if err != nil {
return err
}
return c.doRequest(req, http.StatusNoContent)
}
func (c *httpClient) doRequest(
req *http.Request,
expectedCode int,
) error {
resp, err := c.client.Do(req)
if err != nil {
return err
}
code := resp.StatusCode
// If it's a "no content" response, then release and return nil
if code == http.StatusNoContent {
return nil
}
// not a "no content" response, so parse the result:
var response Response
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("Fatal error reading body: %s", err)
}
decErr := json.Unmarshal(body, &response)
// If we got a JSON decode error, send that back
if decErr != nil {
err = fmt.Errorf("Unable to decode json: received status code %d err: %s", code, decErr)
}
// Unexpected response code OR error in JSON response body overrides
// a JSON decode error:
if code != expectedCode || response.Error() != nil {
err = fmt.Errorf("Response Error: Status Code [%d], expected [%d], [%v]",
code, expectedCode, response.Error())
}
return err
}
func (c *httpClient) makeWriteRequest(
body io.Reader,
writeURL string,
) (*http.Request, error) {
req, err := c.makeRequest(writeURL, body)
if err != nil {
return nil, err
}
if c.config.ContentEncoding == "gzip" {
req.Header.Set("Content-Encoding", "gzip")
}
return req, nil
}
func (c *httpClient) makeRequest(uri string, body io.Reader) (*http.Request, error) {
var req *http.Request
var err error
if c.config.ContentEncoding == "gzip" {
body, err = compressWithGzip(body)
if err != nil {
return nil, err
}
}
req, err = http.NewRequest("POST", uri, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "text/plain; charset=utf-8")
for header, value := range c.config.HTTPHeaders {
req.Header.Set(header, value)
}
req.Header.Set("User-Agent", c.config.UserAgent)
if c.config.Username != "" && c.config.Password != "" {
req.SetBasicAuth(c.config.Username, c.config.Password)
}
return req, nil
}
func compressWithGzip(data io.Reader) (io.Reader, error) {
pr, pw := io.Pipe()
gw := gzip.NewWriter(pw)
var err error
go func() {
_, err = io.Copy(gw, data)
gw.Close()
pw.Close()
}()
return pr, err
}
func (c *httpClient) Close() error {
// Nothing to do.
return nil
}
func writeURL(u *url.URL, wp WriteParams) string {
params := url.Values{}
params.Set("db", wp.Database)
if wp.RetentionPolicy != "" {
params.Set("rp", wp.RetentionPolicy)
}
if wp.Precision != "n" && wp.Precision != "" {
params.Set("precision", wp.Precision)
}
if wp.Consistency != "one" && wp.Consistency != "" {
params.Set("consistency", wp.Consistency)
}
u.RawQuery = params.Encode()
p := u.Path
u.Path = path.Join(p, "write")
s := u.String()
u.Path = p
return s
}
func queryURL(u *url.URL, command string) string {
params := url.Values{}
params.Set("q", command)
u.RawQuery = params.Encode()
p := u.Path
u.Path = path.Join(p, "query")
s := u.String()
u.Path = p
return s
}

View File

@@ -1,335 +0,0 @@
package client
import (
"bytes"
"compress/gzip"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"github.com/stretchr/testify/assert"
)
func TestHTTPClient_Write(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
// test form values:
if r.FormValue("db") != "test" {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"wrong db name"}`)
}
if r.FormValue("rp") != "policy" {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"wrong rp name"}`)
}
if r.FormValue("precision") != "ns" {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"wrong precision"}`)
}
if r.FormValue("consistency") != "all" {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"wrong consistency"}`)
}
// test that user agent is set properly
if r.UserAgent() != "test-agent" {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"wrong agent name"}`)
}
// test basic auth params
user, pass, ok := r.BasicAuth()
if !ok {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"basic auth not set"}`)
}
if user != "test-user" || pass != "test-password" {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"basic auth incorrect"}`)
}
// test that user-specified http header is set properly
if r.Header.Get("X-Test-Header") != "Test-Value" {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"wrong http header value"}`)
}
// Validate Content-Length Header
if r.ContentLength != 13 {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
msg := fmt.Sprintf(`{"results":[{}],"error":"Content-Length: expected [13], got [%d]"}`, r.ContentLength)
fmt.Fprintln(w, msg)
}
// Validate the request body:
buf := make([]byte, 100)
n, _ := r.Body.Read(buf)
expected := "cpu value=99"
got := string(buf[0 : n-1])
if expected != got {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
msg := fmt.Sprintf(`{"results":[{}],"error":"expected [%s], got [%s]"}`, expected, got)
fmt.Fprintln(w, msg)
}
w.WriteHeader(http.StatusNoContent)
w.Header().Set("Content-Type", "application/json")
case "/query":
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}]}`)
}
}))
defer ts.Close()
config := HTTPConfig{
URL: ts.URL,
UserAgent: "test-agent",
Username: "test-user",
Password: "test-password",
HTTPHeaders: HTTPHeaders{
"X-Test-Header": "Test-Value",
},
}
wp := WriteParams{
Database: "test",
RetentionPolicy: "policy",
Precision: "ns",
Consistency: "all",
}
client, err := NewHTTP(config, wp)
defer client.Close()
assert.NoError(t, err)
err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n")))
assert.NoError(t, err)
}
func TestHTTPClient_Write_Errors(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
w.WriteHeader(http.StatusTeapot)
case "/query":
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}]}`)
}
}))
defer ts.Close()
config := HTTPConfig{
URL: ts.URL,
}
defaultWP := WriteParams{
Database: "test",
}
client, err := NewHTTP(config, defaultWP)
defer client.Close()
assert.NoError(t, err)
lp := []byte("cpu value=99\n")
err = client.WriteStream(bytes.NewReader(lp))
assert.Error(t, err)
}
func TestNewHTTPErrors(t *testing.T) {
// No URL:
config := HTTPConfig{}
defaultWP := WriteParams{
Database: "test",
}
client, err := NewHTTP(config, defaultWP)
assert.Error(t, err)
assert.Nil(t, client)
// No Database:
config = HTTPConfig{
URL: "http://localhost:8086",
}
defaultWP = WriteParams{}
client, err = NewHTTP(config, defaultWP)
assert.Nil(t, client)
assert.Error(t, err)
// Invalid URL:
config = HTTPConfig{
URL: "http://192.168.0.%31:8080/",
}
defaultWP = WriteParams{
Database: "test",
}
client, err = NewHTTP(config, defaultWP)
assert.Nil(t, client)
assert.Error(t, err)
// Invalid URL scheme:
config = HTTPConfig{
URL: "mailto://localhost:8086",
}
defaultWP = WriteParams{
Database: "test",
}
client, err = NewHTTP(config, defaultWP)
assert.Nil(t, client)
assert.Error(t, err)
}
func TestHTTPClient_Query(t *testing.T) {
command := "CREATE DATABASE test"
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
w.WriteHeader(http.StatusNoContent)
case "/query":
// validate the create database command is correct
got := r.FormValue("q")
if got != command {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
msg := fmt.Sprintf(`{"results":[{}],"error":"got %s, expected %s"}`, got, command)
fmt.Fprintln(w, msg)
}
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}]}`)
}
}))
defer ts.Close()
config := HTTPConfig{
URL: ts.URL,
}
defaultWP := WriteParams{
Database: "test",
}
client, err := NewHTTP(config, defaultWP)
defer client.Close()
assert.NoError(t, err)
err = client.Query(command)
assert.NoError(t, err)
}
func TestHTTPClient_Query_ResponseError(t *testing.T) {
command := "CREATE DATABASE test"
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
w.WriteHeader(http.StatusNoContent)
case "/query":
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
msg := fmt.Sprintf(`{"results":[{}],"error":"couldnt create database"}`)
fmt.Fprintln(w, msg)
}
}))
defer ts.Close()
config := HTTPConfig{
URL: ts.URL,
}
defaultWP := WriteParams{
Database: "test",
}
client, err := NewHTTP(config, defaultWP)
defer client.Close()
assert.NoError(t, err)
err = client.Query(command)
assert.Error(t, err)
}
func TestHTTPClient_Query_JSONDecodeError(t *testing.T) {
command := "CREATE DATABASE test"
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
w.WriteHeader(http.StatusNoContent)
case "/query":
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
// write JSON missing a ']'
msg := fmt.Sprintf(`{"results":[{}}`)
fmt.Fprintln(w, msg)
}
}))
defer ts.Close()
config := HTTPConfig{
URL: ts.URL,
}
defaultWP := WriteParams{
Database: "test",
}
client, err := NewHTTP(config, defaultWP)
defer client.Close()
assert.NoError(t, err)
err = client.Query(command)
assert.Error(t, err)
assert.Contains(t, err.Error(), "json")
}
func TestGzipCompression(t *testing.T) {
influxLine := "cpu value=99\n"
// Compress the payload using GZIP.
payload := bytes.NewReader([]byte(influxLine))
compressed, err := compressWithGzip(payload)
assert.Nil(t, err)
// Decompress the compressed payload and make sure
// that its original value has not changed.
gr, err := gzip.NewReader(compressed)
assert.Nil(t, err)
gr.Close()
var uncompressed bytes.Buffer
_, err = uncompressed.ReadFrom(gr)
assert.Nil(t, err)
assert.Equal(t, []byte(influxLine), uncompressed.Bytes())
}
func TestHTTPClient_PathPrefix(t *testing.T) {
prefix := "/some/random/prefix"
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case prefix + "/write":
w.WriteHeader(http.StatusNoContent)
w.Header().Set("Content-Type", "application/json")
case prefix + "/query":
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}]}`)
default:
w.WriteHeader(http.StatusNotFound)
msg := fmt.Sprintf("Path not found: %s", r.URL.Path)
fmt.Fprintln(w, msg)
}
}))
defer ts.Close()
config := HTTPConfig{
URL: ts.URL + prefix,
}
wp := WriteParams{
Database: "test",
}
client, err := NewHTTP(config, wp)
defer client.Close()
assert.NoError(t, err)
err = client.Query("CREATE DATABASE test")
assert.NoError(t, err)
err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n")))
assert.NoError(t, err)
}

View File

@@ -1,105 +0,0 @@
package client
import (
"fmt"
"io"
"log"
"net"
"net/url"
)
const (
// UDPPayloadSize is a reasonable default payload size for UDP packets that
// could be travelling over the internet.
UDPPayloadSize = 512
)
// UDPConfig is the config data needed to create a UDP Client
type UDPConfig struct {
// URL should be of the form "udp://host:port"
// or "udp://[ipv6-host%zone]:port".
URL string
// PayloadSize is the maximum size of a UDP client message, optional
// Tune this based on your network. Defaults to UDPPayloadSize.
PayloadSize int
}
// NewUDP will return an instance of the telegraf UDP output plugin for influxdb
func NewUDP(config UDPConfig) (Client, error) {
p, err := url.Parse(config.URL)
if err != nil {
return nil, fmt.Errorf("Error parsing UDP url [%s]: %s", config.URL, err)
}
udpAddr, err := net.ResolveUDPAddr("udp", p.Host)
if err != nil {
return nil, fmt.Errorf("Error resolving UDP Address [%s]: %s", p.Host, err)
}
conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
return nil, fmt.Errorf("Error dialing UDP address [%s]: %s",
udpAddr.String(), err)
}
size := config.PayloadSize
if size == 0 {
size = UDPPayloadSize
}
buf := make([]byte, size)
return &udpClient{conn: conn, buffer: buf}, nil
}
type udpClient struct {
conn *net.UDPConn
buffer []byte
}
// Query will send the provided query command to the client, returning an error if any issues arise
func (c *udpClient) Query(command string) error {
return nil
}
// WriteStream will send the provided data through to the client, contentLength is ignored by the UDP client
func (c *udpClient) WriteStream(r io.Reader) error {
var totaln int
for {
nR, err := r.Read(c.buffer)
if nR == 0 {
break
}
if err != io.EOF && err != nil {
return err
}
if c.buffer[nR-1] == uint8('\n') {
nW, err := c.conn.Write(c.buffer[0:nR])
totaln += nW
if err != nil {
return err
}
} else {
log.Printf("E! Could not fit point into UDP payload; dropping")
// Scan forward until next line break to realign.
for {
nR, err := r.Read(c.buffer)
if nR == 0 {
break
}
if err != io.EOF && err != nil {
return err
}
if c.buffer[nR-1] == uint8('\n') {
break
}
}
}
}
return nil
}
// Close will terminate the provided client connection
func (c *udpClient) Close() error {
return c.conn.Close()
}

View File

@@ -1,89 +0,0 @@
package client
import (
"net"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert"
)
func TestUDPClient(t *testing.T) {
config := UDPConfig{
URL: "udp://localhost:8089",
}
client, err := NewUDP(config)
assert.NoError(t, err)
err = client.Query("ANY QUERY RETURNS NIL")
assert.NoError(t, err)
assert.NoError(t, client.Close())
}
func TestNewUDPClient_Errors(t *testing.T) {
// url.Parse Error
config := UDPConfig{
URL: "udp://localhost%35:8089",
}
_, err := NewUDP(config)
assert.Error(t, err)
// ResolveUDPAddr Error
config = UDPConfig{
URL: "udp://localhost:999999",
}
_, err = NewUDP(config)
assert.Error(t, err)
}
func TestUDPClient_Write(t *testing.T) {
config := UDPConfig{
URL: "udp://localhost:8199",
}
client, err := NewUDP(config)
assert.NoError(t, err)
packets := make(chan string, 100)
address, err := net.ResolveUDPAddr("udp", "localhost:8199")
assert.NoError(t, err)
listener, err := net.ListenUDP("udp", address)
defer listener.Close()
assert.NoError(t, err)
go func() {
buf := make([]byte, 200)
for {
n, _, err := listener.ReadFromUDP(buf)
if err != nil {
packets <- err.Error()
}
packets <- string(buf[0:n])
}
}()
assert.NoError(t, client.Close())
config = UDPConfig{
URL: "udp://localhost:8199",
PayloadSize: 40,
}
client4, err := NewUDP(config)
assert.NoError(t, err)
ts := time.Unix(1484142943, 0)
m1, _ := metric.New("test", map[string]string{},
map[string]interface{}{"this_is_a_very_long_field_name": 1.1}, ts)
m2, _ := metric.New("test", map[string]string{},
map[string]interface{}{"value": 1.1}, ts)
ms := []telegraf.Metric{m1, m2}
reader := metric.NewReader(ms)
err = client4.WriteStream(reader)
assert.NoError(t, err)
pkt := <-packets
assert.Equal(t, "test value=1.1 1484142943000000000\n", pkt)
assert.NoError(t, client4.Close())
}

View File

@@ -0,0 +1,404 @@
package influxdb
import (
"compress/gzip"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"path"
"strings"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/serializers/influx"
)
type APIErrorType int
const (
_ APIErrorType = iota
DatabaseNotFound
)
const (
defaultRequestTimeout = time.Second * 5
defaultDatabase = "telegraf"
defaultUserAgent = "telegraf"
errStringDatabaseNotFound = "database not found"
errStringHintedHandoffNotEmpty = "hinted handoff queue not empty"
errStringPartialWrite = "partial write"
errStringPointsBeyondRP = "points beyond retention policy"
errStringUnableToParse = "unable to parse"
)
var (
// Escape an identifier in InfluxQL.
escapeIdentifier = strings.NewReplacer(
"\n", `\n`,
`\`, `\\`,
`"`, `\"`,
)
)
// APIError is an error reported by the InfluxDB server
type APIError struct {
StatusCode int
Title string
Description string
Type APIErrorType
}
func (e APIError) Error() string {
if e.Description != "" {
return fmt.Sprintf("%s: %s", e.Title, e.Description)
}
return e.Title
}
// QueryResponse is the response body from the /query endpoint
type QueryResponse struct {
Results []QueryResult `json:"results"`
}
type QueryResult struct {
Err string `json:"error,omitempty"`
}
func (r QueryResponse) Error() string {
if len(r.Results) > 0 {
return r.Results[0].Err
}
return ""
}
// WriteResponse is the response body from the /write endpoint
type WriteResponse struct {
Err string `json:"error,omitempty"`
}
func (r WriteResponse) Error() string {
return r.Err
}
type HTTPConfig struct {
URL *url.URL
UserAgent string
Timeout time.Duration
Username string
Password string
TLSConfig *tls.Config
Proxy *url.URL
Headers map[string]string
ContentEncoding string
Database string
RetentionPolicy string
Consistency string
Serializer *influx.Serializer
}
type httpClient struct {
WriteURL string
QueryURL string
ContentEncoding string
Timeout time.Duration
Username string
Password string
Headers map[string]string
client *http.Client
serializer *influx.Serializer
url *url.URL
database string
}
func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
if config.URL == nil {
return nil, ErrMissingURL
}
database := config.Database
if database == "" {
database = defaultDatabase
}
timeout := config.Timeout
if timeout == 0 {
timeout = defaultRequestTimeout
}
userAgent := config.UserAgent
if userAgent == "" {
userAgent = defaultUserAgent
}
var headers = make(map[string]string, len(config.Headers)+1)
headers["User-Agent"] = userAgent
for k, v := range config.Headers {
headers[k] = v
}
var proxy func(*http.Request) (*url.URL, error)
if config.Proxy != nil {
proxy = http.ProxyURL(config.Proxy)
} else {
proxy = http.ProxyFromEnvironment
}
serializer := config.Serializer
if serializer == nil {
serializer = influx.NewSerializer()
}
writeURL := makeWriteURL(
config.URL,
database,
config.RetentionPolicy,
config.Consistency)
queryURL := makeQueryURL(config.URL)
client := &httpClient{
serializer: serializer,
client: &http.Client{
Timeout: timeout,
Transport: &http.Transport{
Proxy: proxy,
TLSClientConfig: config.TLSConfig,
},
},
database: database,
url: config.URL,
WriteURL: writeURL,
QueryURL: queryURL,
ContentEncoding: config.ContentEncoding,
Timeout: timeout,
Username: config.Username,
Password: config.Password,
Headers: headers,
}
return client, nil
}
// URL returns the origin URL that this client connects too.
func (c *httpClient) URL() string {
return c.url.String()
}
// URL returns the database that this client connects too.
func (c *httpClient) Database() string {
return c.database
}
// CreateDatabase attemps to create a new database in the InfluxDB server.
// Note that some names are not allowed by the server, notably those with
// non-printable characters or slashes.
func (c *httpClient) CreateDatabase(ctx context.Context) error {
query := fmt.Sprintf(`CREATE DATABASE "%s"`,
escapeIdentifier.Replace(c.database))
req, err := c.makeQueryRequest(query)
resp, err := c.client.Do(req.WithContext(ctx))
if err != nil {
return err
}
defer resp.Body.Close()
queryResp := &QueryResponse{}
dec := json.NewDecoder(resp.Body)
err = dec.Decode(queryResp)
if err != nil {
if resp.StatusCode == 200 {
return nil
}
return &APIError{
StatusCode: resp.StatusCode,
Title: resp.Status,
}
}
// Even with a 200 response there can be an error
if resp.StatusCode == http.StatusOK && queryResp.Error() == "" {
return nil
}
return &APIError{
StatusCode: resp.StatusCode,
Title: resp.Status,
Description: queryResp.Error(),
}
}
// Write sends the metrics to InfluxDB
func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error {
var err error
reader := influx.NewReader(metrics, c.serializer)
req, err := c.makeWriteRequest(reader)
if err != nil {
return err
}
resp, err := c.client.Do(req.WithContext(ctx))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNoContent {
return nil
}
writeResp := &WriteResponse{}
dec := json.NewDecoder(resp.Body)
var desc string
err = dec.Decode(writeResp)
if err == nil {
desc = writeResp.Err
}
if strings.Contains(desc, errStringDatabaseNotFound) {
return &APIError{
StatusCode: resp.StatusCode,
Title: resp.Status,
Description: desc,
Type: DatabaseNotFound,
}
}
// This "error" is an informational message about the state of the
// InfluxDB cluster.
if strings.Contains(desc, errStringHintedHandoffNotEmpty) {
return nil
}
// Points beyond retention policy is returned when points are immediately
// discarded for being older than the retention policy. Usually this not
// a cause for concern and we don't want to retry.
if strings.Contains(desc, errStringPointsBeyondRP) {
log.Printf("W! [outputs.influxdb]: when writing to [%s]: received error %v",
c.URL(), desc)
return nil
}
// Other partial write errors, such as "field type conflict", are not
// correctable at this point and so the point is dropped instead of
// retrying.
if strings.Contains(desc, errStringPartialWrite) {
log.Printf("E! [outputs.influxdb]: when writing to [%s]: received error %v; discarding points",
c.URL(), desc)
return nil
}
// This error indicates a bug in either Telegraf line protocol
// serialization, retries would not be successful.
if strings.Contains(desc, errStringUnableToParse) {
log.Printf("E! [outputs.influxdb]: when writing to [%s]: received error %v; discarding points",
c.URL(), desc)
return nil
}
return &APIError{
StatusCode: resp.StatusCode,
Title: resp.Status,
Description: desc,
}
}
func (c *httpClient) makeQueryRequest(query string) (*http.Request, error) {
params := url.Values{}
params.Set("q", query)
form := strings.NewReader(params.Encode())
req, err := http.NewRequest("POST", c.QueryURL, form)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
c.addHeaders(req)
return req, nil
}
func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) {
var err error
if c.ContentEncoding == "gzip" {
body, err = compressWithGzip(body)
if err != nil {
return nil, err
}
}
req, err := http.NewRequest("POST", c.WriteURL, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "text/plain; charset=utf-8")
c.addHeaders(req)
if c.ContentEncoding == "gzip" {
req.Header.Set("Content-Encoding", "gzip")
}
return req, nil
}
func compressWithGzip(data io.Reader) (io.Reader, error) {
pr, pw := io.Pipe()
gw := gzip.NewWriter(pw)
var err error
go func() {
_, err = io.Copy(gw, data)
gw.Close()
pw.Close()
}()
return pr, err
}
func (c *httpClient) addHeaders(req *http.Request) {
if c.Username != "" || c.Password != "" {
req.SetBasicAuth(c.Username, c.Password)
}
for header, value := range c.Headers {
req.Header.Set(header, value)
}
}
func makeWriteURL(loc *url.URL, db, rp, consistency string) string {
params := url.Values{}
params.Set("db", db)
if rp != "" {
params.Set("rp", rp)
}
if consistency != "one" && consistency != "" {
params.Set("consistency", consistency)
}
u := *loc
u.Path = path.Join(u.Path, "write")
u.RawQuery = params.Encode()
return u.String()
}
func makeQueryURL(loc *url.URL) string {
u := *loc
u.Path = path.Join(u.Path, "query")
return u.String()
}

View File

@@ -0,0 +1,558 @@
package influxdb_test
import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/outputs/influxdb"
"github.com/stretchr/testify/require"
)
func getHTTPURL() *url.URL {
u, err := url.Parse("http://localhost")
if err != nil {
panic(err)
}
return u
}
func TestHTTP_EmptyConfig(t *testing.T) {
config := &influxdb.HTTPConfig{}
_, err := influxdb.NewHTTPClient(config)
require.Error(t, err)
require.Contains(t, err.Error(), influxdb.ErrMissingURL.Error())
}
func TestHTTP_MinimalConfig(t *testing.T) {
config := &influxdb.HTTPConfig{
URL: getHTTPURL(),
}
_, err := influxdb.NewHTTPClient(config)
require.NoError(t, err)
}
func TestHTTP_CreateDatabase(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)
successResponse := []byte(`{"results": [{"statement_id": 0}]}`)
tests := []struct {
name string
config *influxdb.HTTPConfig
database string
queryHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
errFunc func(t *testing.T, err error)
}{
{
name: "success",
config: &influxdb.HTTPConfig{
URL: u,
Database: "xyzzy",
},
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, `CREATE DATABASE "xyzzy"`, r.FormValue("q"))
w.WriteHeader(http.StatusOK)
w.Write(successResponse)
},
},
{
name: "send basic auth",
config: &influxdb.HTTPConfig{
URL: u,
Username: "guy",
Password: "smiley",
Database: "telegraf",
},
database: "telegraf",
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
username, password, ok := r.BasicAuth()
require.True(t, ok)
require.Equal(t, "guy", username)
require.Equal(t, "smiley", password)
w.WriteHeader(http.StatusOK)
w.Write(successResponse)
},
},
{
name: "send user agent",
config: &influxdb.HTTPConfig{
URL: u,
Headers: map[string]string{
"A": "B",
"C": "D",
},
Database: "telegraf",
},
database: `a " b`,
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.Header.Get("A"), "B")
require.Equal(t, r.Header.Get("C"), "D")
w.WriteHeader(http.StatusOK)
w.Write(successResponse)
},
},
{
name: "send headers",
config: &influxdb.HTTPConfig{
URL: u,
Headers: map[string]string{
"A": "B",
"C": "D",
},
Database: "telegraf",
},
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.Header.Get("A"), "B")
require.Equal(t, r.Header.Get("C"), "D")
w.WriteHeader(http.StatusOK)
w.Write(successResponse)
},
},
{
name: "database default",
config: &influxdb.HTTPConfig{
URL: u,
},
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, `CREATE DATABASE "telegraf"`, r.FormValue("q"))
w.WriteHeader(http.StatusOK)
w.Write(successResponse)
},
},
{
name: "database name is escaped",
config: &influxdb.HTTPConfig{
URL: u,
Database: `a " b`,
},
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, `CREATE DATABASE "a \" b"`, r.FormValue("q"))
w.WriteHeader(http.StatusOK)
w.Write(successResponse)
},
},
{
name: "invalid database name creates api error",
config: &influxdb.HTTPConfig{
URL: u,
Database: `a \\ b`,
},
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
// Yes, 200 OK is the correct response...
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"results": [{"error": "invalid name", "statement_id": 0}]}`))
},
errFunc: func(t *testing.T, err error) {
expected := &influxdb.APIError{
StatusCode: 200,
Title: "200 OK",
Description: "invalid name",
}
require.Equal(t, expected, err)
},
},
{
name: "error with no response body",
config: &influxdb.HTTPConfig{
URL: u,
Database: "telegraf",
},
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
},
errFunc: func(t *testing.T, err error) {
expected := &influxdb.APIError{
StatusCode: 404,
Title: "404 Not Found",
}
require.Equal(t, expected, err)
},
},
{
name: "ok with no response body",
config: &influxdb.HTTPConfig{
URL: u,
Database: "telegraf",
},
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/query":
tt.queryHandlerFunc(t, w, r)
return
default:
w.WriteHeader(http.StatusNotFound)
return
}
})
ctx := context.Background()
client, err := influxdb.NewHTTPClient(tt.config)
require.NoError(t, err)
err = client.CreateDatabase(ctx)
if tt.errFunc != nil {
tt.errFunc(t, err)
} else {
require.NoError(t, err)
}
})
}
}
func TestHTTP_Write(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)
tests := []struct {
name string
config *influxdb.HTTPConfig
queryHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
errFunc func(t *testing.T, err error)
logFunc func(t *testing.T, str string)
}{
{
name: "success",
config: &influxdb.HTTPConfig{
URL: u,
Database: "telegraf",
},
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.FormValue("db"), "telegraf")
body, err := ioutil.ReadAll(r.Body)
require.NoError(t, err)
require.Contains(t, string(body), "cpu value=42")
w.WriteHeader(http.StatusNoContent)
},
},
{
name: "send basic auth",
config: &influxdb.HTTPConfig{
URL: u,
Database: "telegraf",
Username: "guy",
Password: "smiley",
},
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
username, password, ok := r.BasicAuth()
require.True(t, ok)
require.Equal(t, "guy", username)
require.Equal(t, "smiley", password)
w.WriteHeader(http.StatusNoContent)
},
},
{
name: "send user agent",
config: &influxdb.HTTPConfig{
URL: u,
Database: "telegraf",
UserAgent: "telegraf",
},
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.Header.Get("User-Agent"), "telegraf")
w.WriteHeader(http.StatusNoContent)
},
},
{
name: "default database",
config: &influxdb.HTTPConfig{
URL: u,
},
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, "telegraf", r.FormValue("db"))
w.WriteHeader(http.StatusNoContent)
},
},
{
name: "send headers",
config: &influxdb.HTTPConfig{
URL: u,
Headers: map[string]string{
"A": "B",
"C": "D",
},
},
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.Header.Get("A"), "B")
require.Equal(t, r.Header.Get("C"), "D")
w.WriteHeader(http.StatusNoContent)
},
},
{
name: "send retention policy",
config: &influxdb.HTTPConfig{
URL: u,
Database: "telegraf",
RetentionPolicy: "foo",
},
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, "foo", r.FormValue("rp"))
w.WriteHeader(http.StatusNoContent)
},
},
{
name: "send consistency",
config: &influxdb.HTTPConfig{
URL: u,
Database: "telegraf",
Consistency: "all",
},
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, "all", r.FormValue("consistency"))
w.WriteHeader(http.StatusNoContent)
},
},
{
name: "hinted handoff not empty no log no error",
config: &influxdb.HTTPConfig{
URL: u,
Database: "telegraf",
},
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(`{"error": "write failed: hinted handoff queue not empty"}`))
},
logFunc: func(t *testing.T, str string) {
require.False(t, strings.Contains(str, "hinted handoff queue not empty"))
},
},
{
name: "partial write errors are logged no error",
config: &influxdb.HTTPConfig{
URL: u,
Database: "telegraf",
},
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(`{"error": "partial write: field type conflict:"}`))
},
logFunc: func(t *testing.T, str string) {
require.Contains(t, str, "partial write")
},
},
{
name: "parse errors are logged no error",
config: &influxdb.HTTPConfig{
URL: u,
Database: "telegraf",
},
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(`{"error": "unable to parse 'cpu value': invalid field format"}`))
},
logFunc: func(t *testing.T, str string) {
require.Contains(t, str, "unable to parse")
},
},
{
name: "http error",
config: &influxdb.HTTPConfig{
URL: u,
Database: "telegraf",
},
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadGateway)
},
errFunc: func(t *testing.T, err error) {
expected := &influxdb.APIError{
StatusCode: 502,
Title: "502 Bad Gateway",
}
require.Equal(t, expected, err)
},
},
{
name: "http error with desc",
config: &influxdb.HTTPConfig{
URL: u,
Database: "telegraf",
},
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte(`{"error": "unknown error"}`))
},
errFunc: func(t *testing.T, err error) {
expected := &influxdb.APIError{
StatusCode: 503,
Title: "503 Service Unavailable",
Description: "unknown error",
}
require.Equal(t, expected, err)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
tt.queryHandlerFunc(t, w, r)
return
default:
w.WriteHeader(http.StatusNotFound)
return
}
})
var b bytes.Buffer
if tt.logFunc != nil {
log.SetOutput(&b)
}
ctx := context.Background()
m, err := metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
)
require.NoError(t, err)
metrics := []telegraf.Metric{m}
client, err := influxdb.NewHTTPClient(tt.config)
require.NoError(t, err)
err = client.Write(ctx, metrics)
if tt.errFunc != nil {
tt.errFunc(t, err)
} else {
require.NoError(t, err)
}
if tt.logFunc != nil {
tt.logFunc(t, b.String())
}
})
}
}
func TestHTTP_WritePathPrefix(t *testing.T) {
ts := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/x/y/z/query":
w.WriteHeader(http.StatusOK)
return
case "/x/y/z/write":
w.WriteHeader(http.StatusNoContent)
return
default:
w.WriteHeader(http.StatusNotFound)
return
}
},
),
)
defer ts.Close()
u, err := url.Parse(fmt.Sprintf("http://%s/x/y/z", ts.Listener.Addr().String()))
require.NoError(t, err)
ctx := context.Background()
m, err := metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
)
require.NoError(t, err)
metrics := []telegraf.Metric{m}
config := &influxdb.HTTPConfig{
URL: u,
Database: "telegraf",
}
client, err := influxdb.NewHTTPClient(config)
require.NoError(t, err)
err = client.CreateDatabase(ctx)
require.NoError(t, err)
err = client.Write(ctx, metrics)
require.NoError(t, err)
}
func TestHTTP_WriteContentEncodingGzip(t *testing.T) {
ts := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
require.Equal(t, r.Header.Get("Content-Encoding"), "gzip")
gr, err := gzip.NewReader(r.Body)
require.NoError(t, err)
body, err := ioutil.ReadAll(gr)
require.NoError(t, err)
require.Contains(t, string(body), "cpu value=42")
w.WriteHeader(http.StatusNoContent)
return
default:
w.WriteHeader(http.StatusNotFound)
return
}
},
),
)
defer ts.Close()
u, err := url.Parse(fmt.Sprintf("http://%s/", ts.Listener.Addr().String()))
require.NoError(t, err)
ctx := context.Background()
m, err := metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
)
require.NoError(t, err)
metrics := []telegraf.Metric{m}
config := &influxdb.HTTPConfig{
URL: u,
Database: "telegraf",
ContentEncoding: "gzip",
}
client, err := influxdb.NewHTTPClient(config)
require.NoError(t, err)
err = client.Write(ctx, metrics)
require.NoError(t, err)
}

View File

@@ -1,29 +1,37 @@
package influxdb
import (
"context"
"errors"
"fmt"
"log"
"math/rand"
"strings"
"net/url"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
"github.com/influxdata/telegraf/plugins/serializers/influx"
)
var (
// Quote Ident replacer.
qiReplacer = strings.NewReplacer("\n", `\n`, `\`, `\\`, `"`, `\"`)
defaultURL = "http://localhost:8086"
ErrMissingURL = errors.New("missing URL")
)
type Client interface {
Write(context.Context, []telegraf.Metric) error
CreateDatabase(ctx context.Context) error
URL() string
Database() string
}
// InfluxDB struct is the primary data structure for the plugin
type InfluxDB struct {
// URL is only for backwards compatibility
URL string
URL string // url deprecated in 0.1.9; use urls
URLs []string `toml:"urls"`
Username string
Password string
@@ -46,36 +54,45 @@ type InfluxDB struct {
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
// Precision is only here for legacy support. It will be ignored.
Precision string
Precision string // precision deprecated in 1.0; value is ignored
clients []client.Client
clients []Client
CreateHTTPClientF func(config *HTTPConfig) (Client, error)
CreateUDPClientF func(config *UDPConfig) (Client, error)
serializer *influx.Serializer
}
var sampleConfig = `
## The full HTTP or UDP URL for your InfluxDB instance.
##
## Multiple urls can be specified as part of the same cluster,
## this means that only ONE of the urls will be written to each interval.
# urls = ["udp://127.0.0.1:8089"] # UDP endpoint example
urls = ["http://127.0.0.1:8086"] # required
## The target database for metrics (telegraf will create it if not exists).
database = "telegraf" # required
## Multiple URLs can be specified for a single cluster, only ONE of the
## urls will be written to each interval.
# urls = ["udp://127.0.0.1:8089"]
# urls = ["http://127.0.0.1:8086"]
## The target database for metrics; will be created as needed.
# database = "telegraf"
## Name of existing retention policy to write to. Empty string writes to
## the default retention policy.
retention_policy = ""
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any"
# retention_policy = ""
## Write timeout (for the InfluxDB client), formatted as a string.
## If not provided, will default to 5s. 0s means no timeout (not recommended).
timeout = "5s"
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
# write_consistency = "any"
## Timeout for HTTP messages.
# timeout = "5s"
## HTTP Basic Auth
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
## Set the user agent for HTTP POSTs (can be useful for log differentiation)
## HTTP User-Agent
# user_agent = "telegraf"
## Set UDP payload size, defaults to InfluxDB UDP Client default (512 bytes)
## UDP payload size is the maximum packet size to send.
# udp_payload = 512
## Optional SSL Config
@@ -85,170 +102,181 @@ var sampleConfig = `
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
## HTTP Proxy Config
## HTTP Proxy override, if unset values the standard proxy environment
## variables are consulted to determine which proxy, if any, should be used.
# http_proxy = "http://corporate.proxy:3128"
## Optional HTTP headers
## Additional HTTP headers
# http_headers = {"X-Special-Header" = "Special-Value"}
## Compress each HTTP request payload using GZIP.
# content_encoding = "gzip"
## HTTP Content-Encoding for write request body, can be set to "gzip" to
## compress body or "identity" to apply no encoding.
# content_encoding = "identity"
`
// Connect initiates the primary connection to the range of provided URLs
func (i *InfluxDB) Connect() error {
var urls []string
urls = append(urls, i.URLs...)
ctx := context.Background()
// Backward-compatibility with single Influx URL config files
// This could eventually be removed in favor of specifying the urls as a list
urls := make([]string, 0, len(i.URLs))
urls = append(urls, i.URLs...)
if i.URL != "" {
urls = append(urls, i.URL)
}
tlsConfig, err := internal.GetTLSConfig(
i.SSLCert, i.SSLKey, i.SSLCA, i.InsecureSkipVerify)
if err != nil {
return err
if len(urls) == 0 {
urls = append(urls, defaultURL)
}
i.serializer = influx.NewSerializer()
for _, u := range urls {
switch {
case strings.HasPrefix(u, "udp"):
config := client.UDPConfig{
URL: u,
PayloadSize: i.UDPPayload,
}
c, err := client.NewUDP(config)
u, err := url.Parse(u)
if err != nil {
return fmt.Errorf("error parsing url [%s]: %v", u, err)
}
var proxy *url.URL
if len(i.HTTPProxy) > 0 {
proxy, err = url.Parse(i.HTTPProxy)
if err != nil {
return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err)
return fmt.Errorf("error parsing proxy_url [%s]: %v", proxy, err)
}
}
switch u.Scheme {
case "udp", "udp4", "udp6":
c, err := i.udpClient(u)
if err != nil {
return err
}
i.clients = append(i.clients, c)
case "http", "https":
c, err := i.httpClient(ctx, u, proxy)
if err != nil {
return err
}
i.clients = append(i.clients, c)
default:
// If URL doesn't start with "udp", assume HTTP client
config := client.HTTPConfig{
URL: u,
Timeout: i.Timeout.Duration,
TLSConfig: tlsConfig,
UserAgent: i.UserAgent,
Username: i.Username,
Password: i.Password,
HTTPProxy: i.HTTPProxy,
HTTPHeaders: client.HTTPHeaders{},
ContentEncoding: i.ContentEncoding,
}
for header, value := range i.HTTPHeaders {
config.HTTPHeaders[header] = value
}
wp := client.WriteParams{
Database: i.Database,
RetentionPolicy: i.RetentionPolicy,
Consistency: i.WriteConsistency,
}
c, err := client.NewHTTP(config, wp)
if err != nil {
return fmt.Errorf("Error creating HTTP Client [%s]: %s", u, err)
}
i.clients = append(i.clients, c)
err = c.Query(fmt.Sprintf(`CREATE DATABASE "%s"`, qiReplacer.Replace(i.Database)))
if err != nil {
if !strings.Contains(err.Error(), "Status Code [403]") {
log.Println("I! Database creation failed: " + err.Error())
}
continue
}
return fmt.Errorf("unsupported scheme [%s]: %q", u, u.Scheme)
}
}
rand.Seed(time.Now().UnixNano())
return nil
}
// Close will terminate the session to the backend, returning error if an issue arises
func (i *InfluxDB) Close() error {
return nil
}
// SampleConfig returns the formatted sample configuration for the plugin
func (i *InfluxDB) Description() string {
return "Configuration for sending metrics to InfluxDB"
}
func (i *InfluxDB) SampleConfig() string {
return sampleConfig
}
// Description returns the human-readable function definition of the plugin
func (i *InfluxDB) Description() string {
return "Configuration for influxdb server to send metrics to"
}
// Write will choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error.
// Write sends metrics to one of the configured servers, logging each
// unsuccessful. If all servers fail, return an error.
func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
r := metric.NewReader(metrics)
// This will get set to nil if a successful write occurs
err := fmt.Errorf("Could not write to any InfluxDB server in cluster")
ctx := context.Background()
var err error
p := rand.Perm(len(i.clients))
for _, n := range p {
if e := i.clients[n].WriteStream(r); e != nil {
// If the database was not found, try to recreate it:
if strings.Contains(e.Error(), "database not found") {
errc := i.clients[n].Query(fmt.Sprintf(`CREATE DATABASE "%s"`, qiReplacer.Replace(i.Database)))
if errc != nil {
log.Printf("E! Error: Database %s not found and failed to recreate\n",
i.Database)
client := i.clients[n]
err = client.Write(ctx, metrics)
if err == nil {
return nil
}
switch apiError := err.(type) {
case APIError:
if apiError.Type == DatabaseNotFound {
err := client.CreateDatabase(ctx)
if err != nil {
log.Printf("E! [outputs.influxdb] when writing to [%s]: database %q not found and failed to recreate",
client.URL(), client.Database())
}
}
if strings.Contains(e.Error(), "field type conflict") {
log.Printf("E! Field type conflict, dropping conflicted points: %s", e)
// setting err to nil, otherwise we will keep retrying and points
// w/ conflicting types will get stuck in the buffer forever.
err = nil
break
}
if strings.Contains(e.Error(), "points beyond retention policy") {
log.Printf("W! Points beyond retention policy: %s", e)
// This error is indicates the point is older than the
// retention policy permits, and is probably not a cause for
// concern. Retrying will not help unless the retention
// policy is modified.
err = nil
break
}
if strings.Contains(e.Error(), "unable to parse") {
log.Printf("E! Parse error; dropping points: %s", e)
// This error indicates a bug in Telegraf or InfluxDB parsing
// of line protocol. Retries will not be successful.
err = nil
break
}
if strings.Contains(e.Error(), "hinted handoff queue not empty") {
// This is an informational message
err = nil
break
}
// Log write failure
log.Printf("E! InfluxDB Output Error: %s", e)
} else {
err = nil
break
}
log.Printf("E! [outputs.influxdb]: when writing to [%s]: %v", client.URL(), err)
}
return err
return errors.New("could not write any address")
}
func newInflux() *InfluxDB {
return &InfluxDB{
Timeout: internal.Duration{Duration: time.Second * 5},
func (i *InfluxDB) udpClient(url *url.URL) (Client, error) {
config := &UDPConfig{
URL: url,
MaxPayloadSize: i.UDPPayload,
Serializer: i.serializer,
}
c, err := i.CreateUDPClientF(config)
if err != nil {
return nil, fmt.Errorf("error creating UDP client [%s]: %v", url, err)
}
return c, nil
}
func (i *InfluxDB) httpClient(ctx context.Context, url *url.URL, proxy *url.URL) (Client, error) {
tlsConfig, err := internal.GetTLSConfig(
i.SSLCert, i.SSLKey, i.SSLCA, i.InsecureSkipVerify)
if err != nil {
return nil, err
}
config := &HTTPConfig{
URL: url,
Timeout: i.Timeout.Duration,
TLSConfig: tlsConfig,
UserAgent: i.UserAgent,
Username: i.Username,
Password: i.Password,
Proxy: proxy,
ContentEncoding: i.ContentEncoding,
Headers: i.HTTPHeaders,
Database: i.Database,
RetentionPolicy: i.RetentionPolicy,
Consistency: i.WriteConsistency,
Serializer: i.serializer,
}
c, err := i.CreateHTTPClientF(config)
if err != nil {
return nil, fmt.Errorf("error creating HTTP client [%s]: %v", url, err)
}
err = c.CreateDatabase(ctx)
if err != nil {
if err, ok := err.(APIError); ok {
if err.StatusCode == 503 {
return c, nil
}
}
log.Printf("W! [outputs.influxdb] when writing to [%s]: database %q creation failed: %v",
c.URL(), c.Database(), err)
}
return c, nil
}
func init() {
outputs.Add("influxdb", func() telegraf.Output { return newInflux() })
outputs.Add("influxdb", func() telegraf.Output {
return &InfluxDB{
Timeout: internal.Duration{Duration: time.Second * 5},
CreateHTTPClientF: func(config *HTTPConfig) (Client, error) {
return NewHTTPClient(config)
},
CreateUDPClientF: func(config *UDPConfig) (Client, error) {
return NewUDPClient(config)
},
}
})
}

View File

@@ -1,313 +1,135 @@
package influxdb
package influxdb_test
import (
"fmt"
"io"
"net/http"
"net/http/httptest"
"context"
"testing"
"time"
"github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs/influxdb"
"github.com/stretchr/testify/require"
)
func TestIdentQuoting(t *testing.T) {
var testCases = []struct {
database string
expected string
}{
{"x-y", `CREATE DATABASE "x-y"`},
{`x"y`, `CREATE DATABASE "x\"y"`},
{"x\ny", `CREATE DATABASE "x\ny"`},
{`x\y`, `CREATE DATABASE "x\\y"`},
}
for _, tc := range testCases {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
q := r.Form.Get("q")
assert.Equal(t, tc.expected, q)
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}]}`)
}))
defer ts.Close()
i := InfluxDB{
URLs: []string{ts.URL},
Database: tc.database,
}
err := i.Connect()
require.NoError(t, err)
require.NoError(t, i.Close())
}
}
func TestUDPInflux(t *testing.T) {
i := InfluxDB{
URLs: []string{"udp://localhost:8089"},
}
err := i.Connect()
require.NoError(t, err)
err = i.Write(testutil.MockMetrics())
require.NoError(t, err)
require.NoError(t, i.Close())
}
func TestHTTPInflux(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
// test that database is set properly
if r.FormValue("db") != "test" {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
}
// test that user agent is set properly
if r.UserAgent() != "telegraf" {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
}
w.WriteHeader(http.StatusNoContent)
w.Header().Set("Content-Type", "application/json")
case "/query":
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}]}`)
}
}))
defer ts.Close()
i := newInflux()
i.URLs = []string{ts.URL}
i.Database = "test"
i.UserAgent = "telegraf"
err := i.Connect()
require.NoError(t, err)
err = i.Write(testutil.MockMetrics())
require.NoError(t, err)
require.NoError(t, i.Close())
}
func TestUDPConnectError(t *testing.T) {
i := InfluxDB{
URLs: []string{"udp://foobar:8089"},
}
err := i.Connect()
require.Error(t, err)
i = InfluxDB{
URLs: []string{"udp://localhost:9999999"},
}
err = i.Connect()
require.Error(t, err)
}
func TestHTTPConnectError_InvalidURL(t *testing.T) {
i := InfluxDB{
URLs: []string{"http://foobar:8089"},
}
err := i.Connect()
require.Error(t, err)
i = InfluxDB{
URLs: []string{"http://localhost:9999999"},
}
err = i.Connect()
require.Error(t, err)
}
func TestHTTPConnectError_DatabaseCreateFail(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/query":
w.WriteHeader(http.StatusNotFound)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"test error"}`)
}
}))
defer ts.Close()
i := InfluxDB{
URLs: []string{ts.URL},
Database: "test",
}
// database creation errors do not return an error from Connect
// they are only logged.
err := i.Connect()
require.NoError(t, err)
require.NoError(t, i.Close())
}
func TestHTTPError_DatabaseNotFound(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
w.WriteHeader(http.StatusNotFound)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"database not found"}`)
case "/query":
w.WriteHeader(http.StatusNotFound)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"database not found"}`)
}
}))
defer ts.Close()
i := InfluxDB{
URLs: []string{ts.URL},
Database: "test",
}
err := i.Connect()
require.NoError(t, err)
err = i.Write(testutil.MockMetrics())
require.Error(t, err)
require.NoError(t, i.Close())
}
func TestHTTPError_WriteErrors(t *testing.T) {
var testCases = []struct {
name string
status int
contentType string
body string
err error
}{
{
// HTTP/1.1 400 Bad Request
// Content-Type: application/json
// X-Influxdb-Version: 1.3.3
//
// {
// "error": "partial write: points beyond retention policy dropped=1"
// }
name: "beyond retention policy is not an error",
status: http.StatusBadRequest,
contentType: "application/json",
body: `{"error":"partial write: points beyond retention policy dropped=1"}`,
err: nil,
},
{
// HTTP/1.1 400 Bad Request
// Content-Type: application/json
// X-Influxdb-Version: 1.3.3
//
// {
// "error": "unable to parse 'foo bar=': missing field value"
// }
name: "unable to parse is not an error",
status: http.StatusBadRequest,
contentType: "application/json",
body: `{"error":"unable to parse 'foo bar=': missing field value"}`,
err: nil,
},
{
// HTTP/1.1 400 Bad Request
// Content-Type: application/json
// X-Influxdb-Version: 1.3.3
//
// {
// "error": "partial write: field type conflict: input field \"bar\" on measurement \"foo\" is type float, already exists as type integer dropped=1"
// }
name: "field type conflict is not an error",
status: http.StatusBadRequest,
contentType: "application/json",
body: `{"error": "partial write: field type conflict: input field \"bar\" on measurement \"foo\" is type float, already exists as type integer dropped=1"}`,
err: nil,
},
{
// HTTP/1.1 500 Internal Server Error
// Content-Type: application/json
// X-Influxdb-Version: 1.3.3-c1.3.3
//
// {
// "error": "write failed: hinted handoff queue not empty"
// }
name: "hinted handoff queue not empty is not an error",
status: http.StatusInternalServerError,
contentType: "application/json",
body: `{"error":"write failed: hinted handoff queue not empty"}`,
err: nil,
},
{
// HTTP/1.1 500 Internal Server Error
// Content-Type: application/json
// X-Influxdb-Version: 1.3.3-c1.3.3
//
// {
// "error": "partial write"
// }
name: "plain partial write is an error",
status: http.StatusInternalServerError,
contentType: "application/json",
body: `{"error":"partial write"}`,
err: fmt.Errorf("Could not write to any InfluxDB server in cluster"),
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
rw.WriteHeader(tt.status)
rw.Header().Set("Content-Type", tt.contentType)
fmt.Fprintln(rw, tt.body)
}))
defer ts.Close()
influx := InfluxDB{
URLs: []string{ts.URL},
Database: "test",
}
err := influx.Connect()
require.NoError(t, err)
err = influx.Write(testutil.MockMetrics())
require.Equal(t, tt.err, err)
require.NoError(t, influx.Close())
})
}
}
type MockClient struct {
writeStreamCalled int
contentLength int
URLF func() string
DatabaseF func() string
WriteF func(context.Context, []telegraf.Metric) error
CreateDatabaseF func(ctx context.Context) error
}
func (m *MockClient) Query(command string) error {
panic("not implemented")
func (c *MockClient) URL() string {
return c.URLF()
}
func (m *MockClient) Write(b []byte) (int, error) {
panic("not implemented")
func (c *MockClient) Database() string {
return c.DatabaseF()
}
func (m *MockClient) WriteWithParams(b []byte, params client.WriteParams) (int, error) {
panic("not implemented")
func (c *MockClient) Write(ctx context.Context, metrics []telegraf.Metric) error {
return c.WriteF(ctx, metrics)
}
func (m *MockClient) WriteStream(b io.Reader, contentLength int) (int, error) {
m.writeStreamCalled++
m.contentLength = contentLength
return 0, nil
func (c *MockClient) CreateDatabase(ctx context.Context) error {
return c.CreateDatabaseF(ctx)
}
func (m *MockClient) WriteStreamWithParams(b io.Reader, contentLength int, params client.WriteParams) (int, error) {
panic("not implemented")
func TestDeprecatedURLSupport(t *testing.T) {
var actual *influxdb.UDPConfig
output := influxdb.InfluxDB{
URL: "udp://localhost:8086",
CreateUDPClientF: func(config *influxdb.UDPConfig) (influxdb.Client, error) {
actual = config
return &MockClient{}, nil
},
}
err := output.Connect()
require.NoError(t, err)
require.Equal(t, "udp://localhost:8086", actual.URL.String())
}
func (m *MockClient) Close() error {
panic("not implemented")
func TestDefaultURL(t *testing.T) {
var actual *influxdb.HTTPConfig
output := influxdb.InfluxDB{
CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) {
actual = config
return &MockClient{
CreateDatabaseF: func(ctx context.Context) error {
return nil
},
}, nil
},
}
err := output.Connect()
require.NoError(t, err)
require.Equal(t, "http://localhost:8086", actual.URL.String())
}
func TestConnectUDPConfig(t *testing.T) {
var actual *influxdb.UDPConfig
output := influxdb.InfluxDB{
URLs: []string{"udp://localhost:8086"},
UDPPayload: 42,
CreateUDPClientF: func(config *influxdb.UDPConfig) (influxdb.Client, error) {
actual = config
return &MockClient{}, nil
},
}
err := output.Connect()
require.NoError(t, err)
require.Equal(t, "udp://localhost:8086", actual.URL.String())
require.Equal(t, 42, actual.MaxPayloadSize)
require.NotNil(t, actual.Serializer)
}
func TestConnectHTTPConfig(t *testing.T) {
var actual *influxdb.HTTPConfig
output := influxdb.InfluxDB{
URLs: []string{"http://localhost:8089"},
Database: "telegraf",
RetentionPolicy: "default",
WriteConsistency: "any",
Timeout: internal.Duration{Duration: 5 * time.Second},
Username: "guy",
Password: "smiley",
UserAgent: "telegraf",
HTTPProxy: "http://localhost:8089",
HTTPHeaders: map[string]string{
"x": "y",
},
ContentEncoding: "gzip",
InsecureSkipVerify: true,
CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) {
actual = config
return &MockClient{
CreateDatabaseF: func(ctx context.Context) error {
return nil
},
}, nil
},
}
err := output.Connect()
require.NoError(t, err)
require.Equal(t, output.URLs[0], actual.URL.String())
require.Equal(t, output.UserAgent, actual.UserAgent)
require.Equal(t, output.Timeout.Duration, actual.Timeout)
require.Equal(t, output.Username, actual.Username)
require.Equal(t, output.Password, actual.Password)
require.Equal(t, output.HTTPProxy, actual.Proxy.String())
require.Equal(t, output.HTTPHeaders, actual.Headers)
require.Equal(t, output.ContentEncoding, actual.ContentEncoding)
require.Equal(t, output.Database, actual.Database)
require.Equal(t, output.RetentionPolicy, actual.RetentionPolicy)
require.Equal(t, output.WriteConsistency, actual.Consistency)
require.NotNil(t, actual.TLSConfig)
require.NotNil(t, actual.Serializer)
require.Equal(t, output.Database, actual.Database)
}

View File

@@ -0,0 +1,116 @@
package influxdb
import (
"context"
"fmt"
"net"
"net/url"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/influx"
)
const (
// DefaultMaxPayloadSize is the maximum length of the UDP data payload
DefaultMaxPayloadSize = 512
)
type Dialer interface {
DialContext(ctx context.Context, network, address string) (Conn, error)
}
type Conn interface {
Write(b []byte) (int, error)
Close() error
}
type UDPConfig struct {
MaxPayloadSize int
URL *url.URL
Serializer serializers.Serializer
Dialer Dialer
}
func NewUDPClient(config *UDPConfig) (*udpClient, error) {
if config.URL == nil {
return nil, ErrMissingURL
}
size := config.MaxPayloadSize
if size == 0 {
size = DefaultMaxPayloadSize
}
serializer := config.Serializer
if serializer == nil {
s := influx.NewSerializer()
s.SetMaxLineBytes(config.MaxPayloadSize)
serializer = s
}
dialer := config.Dialer
if dialer == nil {
dialer = &netDialer{net.Dialer{}}
}
client := &udpClient{
url: config.URL,
serializer: serializer,
dialer: dialer,
}
return client, nil
}
type udpClient struct {
conn Conn
dialer Dialer
serializer serializers.Serializer
url *url.URL
}
func (c *udpClient) URL() string {
return c.url.String()
}
func (c *udpClient) Database() string {
return ""
}
func (c *udpClient) Write(ctx context.Context, metrics []telegraf.Metric) error {
if c.conn == nil {
conn, err := c.dialer.DialContext(ctx, c.url.Scheme, c.url.Host)
if err != nil {
return fmt.Errorf("error dialing address [%s]: %s", c.url, err)
}
c.conn = conn
}
for _, metric := range metrics {
octets, err := c.serializer.Serialize(metric)
if err != nil {
return fmt.Errorf("could not serialize metric: %v", err)
}
_, err = c.conn.Write(octets)
if err != nil {
c.conn.Close()
c.conn = nil
return err
}
}
return nil
}
func (c *udpClient) CreateDatabase(ctx context.Context) error {
return nil
}
type netDialer struct {
net.Dialer
}
func (d *netDialer) DialContext(ctx context.Context, network, address string) (Conn, error) {
return d.Dialer.DialContext(ctx, network, address)
}

View File

@@ -0,0 +1,241 @@
package influxdb_test
import (
"bytes"
"context"
"fmt"
"net"
"net/url"
"sync"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/outputs/influxdb"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/stretchr/testify/require"
)
var (
metricString = "cpu value=42 0\n"
)
func getMetric() telegraf.Metric {
metric, err := metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
)
if err != nil {
panic(err)
}
return metric
}
func getURL() *url.URL {
u, err := url.Parse("udp://localhost:0")
if err != nil {
panic(err)
}
return u
}
type MockConn struct {
WriteF func(b []byte) (n int, err error)
CloseF func() error
}
func (c *MockConn) Write(b []byte) (n int, err error) {
return c.WriteF(b)
}
func (c *MockConn) Close() error {
return c.CloseF()
}
type MockDialer struct {
DialContextF func(network, address string) (influxdb.Conn, error)
}
func (d *MockDialer) DialContext(ctx context.Context, network string, address string) (influxdb.Conn, error) {
return d.DialContextF(network, address)
}
type MockSerializer struct {
SerializeF func(metric telegraf.Metric) ([]byte, error)
}
func (s *MockSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
return s.SerializeF(metric)
}
func TestUDP_NewUDPClientNoURL(t *testing.T) {
config := &influxdb.UDPConfig{}
_, err := influxdb.NewUDPClient(config)
require.Equal(t, err, influxdb.ErrMissingURL)
}
func TestUDP_URL(t *testing.T) {
u := getURL()
config := &influxdb.UDPConfig{
URL: u,
}
client, err := influxdb.NewUDPClient(config)
require.NoError(t, err)
require.Equal(t, u.String(), client.URL())
}
func TestUDP_Simple(t *testing.T) {
var buffer bytes.Buffer
config := &influxdb.UDPConfig{
URL: getURL(),
Dialer: &MockDialer{
DialContextF: func(network, address string) (influxdb.Conn, error) {
conn := &MockConn{
WriteF: func(b []byte) (n int, err error) {
buffer.Write(b)
return 0, nil
},
}
return conn, nil
},
},
}
client, err := influxdb.NewUDPClient(config)
require.NoError(t, err)
ctx := context.Background()
err = client.Write(ctx, []telegraf.Metric{
getMetric(),
getMetric(),
})
require.NoError(t, err)
require.Equal(t, metricString+metricString, buffer.String())
}
func TestUDP_DialError(t *testing.T) {
u, err := url.Parse("invalid://127.0.0.1:9999")
require.NoError(t, err)
config := &influxdb.UDPConfig{
URL: u,
Dialer: &MockDialer{
DialContextF: func(network, address string) (influxdb.Conn, error) {
return nil, fmt.Errorf(
`unsupported scheme [invalid://localhost:9999]: "invalid"`)
},
},
}
client, err := influxdb.NewUDPClient(config)
require.NoError(t, err)
ctx := context.Background()
err = client.Write(ctx, []telegraf.Metric{getMetric()})
require.Error(t, err)
}
func TestUDP_WriteError(t *testing.T) {
closed := false
config := &influxdb.UDPConfig{
URL: getURL(),
Dialer: &MockDialer{
DialContextF: func(network, address string) (influxdb.Conn, error) {
conn := &MockConn{
WriteF: func(b []byte) (n int, err error) {
return 0, fmt.Errorf(
"write udp 127.0.0.1:52190->127.0.0.1:9999: write: connection refused")
},
CloseF: func() error {
closed = true
return nil
},
}
return conn, nil
},
},
}
client, err := influxdb.NewUDPClient(config)
require.NoError(t, err)
ctx := context.Background()
err = client.Write(ctx, []telegraf.Metric{getMetric()})
require.Error(t, err)
require.True(t, closed)
}
func TestUDP_SerializeError(t *testing.T) {
config := &influxdb.UDPConfig{
URL: getURL(),
Dialer: &MockDialer{
DialContextF: func(network, address string) (influxdb.Conn, error) {
conn := &MockConn{}
return conn, nil
},
},
Serializer: &MockSerializer{
SerializeF: func(metric telegraf.Metric) ([]byte, error) {
return nil, influx.ErrNeedMoreSpace
},
},
}
client, err := influxdb.NewUDPClient(config)
require.NoError(t, err)
ctx := context.Background()
err = client.Write(ctx, []telegraf.Metric{getMetric()})
require.Error(t, err)
require.Contains(t, err.Error(), influx.ErrNeedMoreSpace.Error())
}
func TestUDP_WriteWithRealConn(t *testing.T) {
conn, err := net.ListenPacket("udp", ":0")
require.NoError(t, err)
metrics := []telegraf.Metric{
getMetric(),
getMetric(),
}
buf := make([]byte, 200)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
var total int
for _, _ = range metrics {
n, _, err := conn.ReadFrom(buf[total:])
if err != nil {
break
}
total += n
}
buf = buf[:total]
}()
addr := conn.LocalAddr()
u, err := url.Parse(fmt.Sprintf("%s://%s", addr.Network(), addr))
require.NoError(t, err)
config := &influxdb.UDPConfig{
URL: u,
}
client, err := influxdb.NewUDPClient(config)
require.NoError(t, err)
ctx := context.Background()
err = client.Write(ctx, metrics)
require.NoError(t, err)
wg.Wait()
require.Equal(t, metricString+metricString, string(buf))
}