Add tag based routing in influxdb/influxdb_v2 outputs (#5490)

This commit is contained in:
Daniel Nelson 2019-02-27 10:54:02 -08:00 committed by GitHub
parent 1872356103
commit 65b76dc746
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 273 additions and 208 deletions

View File

@ -19,6 +19,10 @@ The InfluxDB output plugin writes metrics to the [InfluxDB v1.x] HTTP or UDP ser
## For UDP url endpoint database needs to be configured on server side. ## For UDP url endpoint database needs to be configured on server side.
# database = "telegraf" # database = "telegraf"
## The value of this tag will be used to determine the database. If this
## tag is not set the 'database' option is used as the default.
# database_tag = ""
## If true, no CREATE DATABASE queries will be sent. Set to true when using ## If true, no CREATE DATABASE queries will be sent. Set to true when using
## Telegraf with a user without permissions to create databases or when the ## Telegraf with a user without permissions to create databases or when the
## database already exists. ## database already exists.

View File

@ -19,13 +19,6 @@ import (
"github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/plugins/serializers/influx"
) )
type APIErrorType int
const (
_ APIErrorType = iota
DatabaseNotFound
)
const ( const (
defaultRequestTimeout = time.Second * 5 defaultRequestTimeout = time.Second * 5
defaultDatabase = "telegraf" defaultDatabase = "telegraf"
@ -37,7 +30,6 @@ const (
) )
var ( var (
// Escape an identifier in InfluxQL. // Escape an identifier in InfluxQL.
escapeIdentifier = strings.NewReplacer( escapeIdentifier = strings.NewReplacer(
"\n", `\n`, "\n", `\n`,
@ -46,12 +38,11 @@ var (
) )
) )
// APIError is an error reported by the InfluxDB server // APIError is a general error reported by the InfluxDB server
type APIError struct { type APIError struct {
StatusCode int StatusCode int
Title string Title string
Description string Description string
Type APIErrorType
} }
func (e APIError) Error() string { func (e APIError) Error() string {
@ -61,6 +52,11 @@ func (e APIError) Error() string {
return e.Title return e.Title
} }
type DatabaseNotFoundError struct {
APIError
Database string
}
// QueryResponse is the response body from the /query endpoint // QueryResponse is the response body from the /query endpoint
type QueryResponse struct { type QueryResponse struct {
Results []QueryResult `json:"results"` Results []QueryResult `json:"results"`
@ -97,41 +93,32 @@ type HTTPConfig struct {
Headers map[string]string Headers map[string]string
ContentEncoding string ContentEncoding string
Database string Database string
DatabaseTag string
RetentionPolicy string RetentionPolicy string
Consistency string Consistency string
SkipDatabaseCreation bool
InfluxUintSupport bool `toml:"influx_uint_support"` InfluxUintSupport bool `toml:"influx_uint_support"`
Serializer *influx.Serializer Serializer *influx.Serializer
} }
type httpClient struct { type httpClient struct {
WriteURL string
QueryURL string
ContentEncoding string
Timeout time.Duration
Username string
Password string
Headers map[string]string
client *http.Client client *http.Client
serializer *influx.Serializer config HTTPConfig
url *url.URL createdDatabases map[string]bool
database string
} }
func NewHTTPClient(config *HTTPConfig) (*httpClient, error) { func NewHTTPClient(config HTTPConfig) (*httpClient, error) {
if config.URL == nil { if config.URL == nil {
return nil, ErrMissingURL return nil, ErrMissingURL
} }
database := config.Database if config.Database == "" {
if database == "" { config.Database = defaultDatabase
database = defaultDatabase
} }
timeout := config.Timeout if config.Timeout == 0 {
if timeout == 0 { config.Timeout = defaultRequestTimeout
timeout = defaultRequestTimeout
} }
userAgent := config.UserAgent userAgent := config.UserAgent
@ -139,10 +126,12 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
userAgent = "Telegraf/" + internal.Version() userAgent = "Telegraf/" + internal.Version()
} }
var headers = make(map[string]string, len(config.Headers)+1) if config.Headers == nil {
headers["User-Agent"] = userAgent config.Headers = make(map[string]string)
}
config.Headers["User-Agent"] = userAgent
for k, v := range config.Headers { for k, v := range config.Headers {
headers[k] = v config.Headers[k] = v
} }
var proxy func(*http.Request) (*url.URL, error) var proxy func(*http.Request) (*url.URL, error)
@ -152,22 +141,8 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
proxy = http.ProxyFromEnvironment proxy = http.ProxyFromEnvironment
} }
serializer := config.Serializer if config.Serializer == nil {
if serializer == nil { config.Serializer = influx.NewSerializer()
serializer = influx.NewSerializer()
}
writeURL, err := makeWriteURL(
config.URL,
database,
config.RetentionPolicy,
config.Consistency)
if err != nil {
return nil, err
}
queryURL, err := makeQueryURL(config.URL)
if err != nil {
return nil, err
} }
var transport *http.Transport var transport *http.Transport
@ -192,40 +167,32 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
} }
client := &httpClient{ client := &httpClient{
serializer: serializer,
client: &http.Client{ client: &http.Client{
Timeout: timeout, Timeout: config.Timeout,
Transport: transport, Transport: transport,
}, },
database: database, createdDatabases: make(map[string]bool),
url: config.URL, config: config,
WriteURL: writeURL,
QueryURL: queryURL,
ContentEncoding: config.ContentEncoding,
Timeout: timeout,
Username: config.Username,
Password: config.Password,
Headers: headers,
} }
return client, nil return client, nil
} }
// URL returns the origin URL that this client connects too. // URL returns the origin URL that this client connects too.
func (c *httpClient) URL() string { func (c *httpClient) URL() string {
return c.url.String() return c.config.URL.String()
} }
// URL returns the database that this client connects too. // Database returns the default database that this client connects too.
func (c *httpClient) Database() string { func (c *httpClient) Database() string {
return c.database return c.config.Database
} }
// CreateDatabase attempts to create a new database in the InfluxDB server. // CreateDatabase attempts to create a new database in the InfluxDB server.
// Note that some names are not allowed by the server, notably those with // Note that some names are not allowed by the server, notably those with
// non-printable characters or slashes. // non-printable characters or slashes.
func (c *httpClient) CreateDatabase(ctx context.Context) error { func (c *httpClient) CreateDatabase(ctx context.Context, database string) error {
query := fmt.Sprintf(`CREATE DATABASE "%s"`, query := fmt.Sprintf(`CREATE DATABASE "%s"`,
escapeIdentifier.Replace(c.database)) escapeIdentifier.Replace(database))
req, err := c.makeQueryRequest(query) req, err := c.makeQueryRequest(query)
@ -241,6 +208,7 @@ func (c *httpClient) CreateDatabase(ctx context.Context) error {
if err != nil { if err != nil {
if resp.StatusCode == 200 { if resp.StatusCode == 200 {
c.createdDatabases[database] = true
return nil return nil
} }
@ -252,6 +220,7 @@ func (c *httpClient) CreateDatabase(ctx context.Context) error {
// Even with a 200 response there can be an error // Even with a 200 response there can be an error
if resp.StatusCode == http.StatusOK && queryResp.Error() == "" { if resp.StatusCode == http.StatusOK && queryResp.Error() == "" {
c.createdDatabases[database] = true
return nil return nil
} }
@ -264,10 +233,52 @@ func (c *httpClient) CreateDatabase(ctx context.Context) error {
// Write sends the metrics to InfluxDB // Write sends the metrics to InfluxDB
func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error { func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error {
var err error batches := make(map[string][]telegraf.Metric)
if c.config.DatabaseTag == "" {
err := c.writeBatch(ctx, c.config.Database, metrics)
if err != nil {
return err
}
} else {
for _, metric := range metrics {
db, ok := metric.GetTag(c.config.DatabaseTag)
if !ok {
db = c.config.Database
}
reader := influx.NewReader(metrics, c.serializer) if _, ok := batches[db]; !ok {
req, err := c.makeWriteRequest(reader) batches[db] = make([]telegraf.Metric, 0)
}
batches[db] = append(batches[db], metric)
}
for db, batch := range batches {
if !c.config.SkipDatabaseCreation && !c.createdDatabases[db] {
err := c.CreateDatabase(ctx, db)
if err != nil {
log.Printf("W! [outputs.influxdb] when writing to [%s]: database %q creation failed: %v",
c.config.URL, db, err)
}
}
err := c.writeBatch(ctx, db, batch)
if err != nil {
return err
}
}
}
return nil
}
func (c *httpClient) writeBatch(ctx context.Context, db string, metrics []telegraf.Metric) error {
url, err := makeWriteURL(c.config.URL, db, c.config.RetentionPolicy, c.config.Consistency)
if err != nil {
return err
}
reader := influx.NewReader(metrics, c.config.Serializer)
req, err := c.makeWriteRequest(url, reader)
if err != nil { if err != nil {
return err return err
} }
@ -292,11 +303,13 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
} }
if strings.Contains(desc, errStringDatabaseNotFound) { if strings.Contains(desc, errStringDatabaseNotFound) {
return &APIError{ return &DatabaseNotFoundError{
APIError: APIError{
StatusCode: resp.StatusCode, StatusCode: resp.StatusCode,
Title: resp.Status, Title: resp.Status,
Description: desc, Description: desc,
Type: DatabaseNotFound, },
Database: db,
} }
} }
@ -340,11 +353,16 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
} }
func (c *httpClient) makeQueryRequest(query string) (*http.Request, error) { func (c *httpClient) makeQueryRequest(query string) (*http.Request, error) {
queryURL, err := makeQueryURL(c.config.URL)
if err != nil {
return nil, err
}
params := url.Values{} params := url.Values{}
params.Set("q", query) params.Set("q", query)
form := strings.NewReader(params.Encode()) form := strings.NewReader(params.Encode())
req, err := http.NewRequest("POST", c.QueryURL, form) req, err := http.NewRequest("POST", queryURL, form)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -355,16 +373,16 @@ func (c *httpClient) makeQueryRequest(query string) (*http.Request, error) {
return req, nil return req, nil
} }
func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) { func (c *httpClient) makeWriteRequest(url string, body io.Reader) (*http.Request, error) {
var err error var err error
if c.ContentEncoding == "gzip" { if c.config.ContentEncoding == "gzip" {
body, err = internal.CompressWithGzip(body) body, err = internal.CompressWithGzip(body)
if err != nil { if err != nil {
return nil, err return nil, err
} }
} }
req, err := http.NewRequest("POST", c.WriteURL, body) req, err := http.NewRequest("POST", url, body)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -372,7 +390,7 @@ func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) {
req.Header.Set("Content-Type", "text/plain; charset=utf-8") req.Header.Set("Content-Type", "text/plain; charset=utf-8")
c.addHeaders(req) c.addHeaders(req)
if c.ContentEncoding == "gzip" { if c.config.ContentEncoding == "gzip" {
req.Header.Set("Content-Encoding", "gzip") req.Header.Set("Content-Encoding", "gzip")
} }
@ -380,11 +398,11 @@ func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) {
} }
func (c *httpClient) addHeaders(req *http.Request) { func (c *httpClient) addHeaders(req *http.Request) {
if c.Username != "" || c.Password != "" { if c.config.Username != "" || c.config.Password != "" {
req.SetBasicAuth(c.Username, c.Password) req.SetBasicAuth(c.config.Username, c.config.Password)
} }
for header, value := range c.Headers { for header, value := range c.config.Headers {
req.Header.Set(header, value) req.Header.Set(header, value)
} }
} }

View File

@ -33,14 +33,14 @@ func getHTTPURL() *url.URL {
} }
func TestHTTP_EmptyConfig(t *testing.T) { func TestHTTP_EmptyConfig(t *testing.T) {
config := &influxdb.HTTPConfig{} config := influxdb.HTTPConfig{}
_, err := influxdb.NewHTTPClient(config) _, err := influxdb.NewHTTPClient(config)
require.Error(t, err) require.Error(t, err)
require.Contains(t, err.Error(), influxdb.ErrMissingURL.Error()) require.Contains(t, err.Error(), influxdb.ErrMissingURL.Error())
} }
func TestHTTP_MinimalConfig(t *testing.T) { func TestHTTP_MinimalConfig(t *testing.T) {
config := &influxdb.HTTPConfig{ config := influxdb.HTTPConfig{
URL: getHTTPURL(), URL: getHTTPURL(),
} }
_, err := influxdb.NewHTTPClient(config) _, err := influxdb.NewHTTPClient(config)
@ -48,7 +48,7 @@ func TestHTTP_MinimalConfig(t *testing.T) {
} }
func TestHTTP_UnsupportedScheme(t *testing.T) { func TestHTTP_UnsupportedScheme(t *testing.T) {
config := &influxdb.HTTPConfig{ config := influxdb.HTTPConfig{
URL: &url.URL{ URL: &url.URL{
Scheme: "foo", Scheme: "foo",
Host: "localhost", Host: "localhost",
@ -69,14 +69,14 @@ func TestHTTP_CreateDatabase(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
config *influxdb.HTTPConfig config influxdb.HTTPConfig
database string database string
queryHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request) queryHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
errFunc func(t *testing.T, err error) errFunc func(t *testing.T, err error)
}{ }{
{ {
name: "success", name: "success",
config: &influxdb.HTTPConfig{ config: influxdb.HTTPConfig{
URL: u, URL: u,
Database: "xyzzy", Database: "xyzzy",
}, },
@ -88,7 +88,7 @@ func TestHTTP_CreateDatabase(t *testing.T) {
}, },
{ {
name: "send basic auth", name: "send basic auth",
config: &influxdb.HTTPConfig{ config: influxdb.HTTPConfig{
URL: u, URL: u,
Username: "guy", Username: "guy",
Password: "smiley", Password: "smiley",
@ -106,7 +106,7 @@ func TestHTTP_CreateDatabase(t *testing.T) {
}, },
{ {
name: "send user agent", name: "send user agent",
config: &influxdb.HTTPConfig{ config: influxdb.HTTPConfig{
URL: u, URL: u,
Headers: map[string]string{ Headers: map[string]string{
"A": "B", "A": "B",
@ -124,7 +124,7 @@ func TestHTTP_CreateDatabase(t *testing.T) {
}, },
{ {
name: "send headers", name: "send headers",
config: &influxdb.HTTPConfig{ config: influxdb.HTTPConfig{
URL: u, URL: u,
Headers: map[string]string{ Headers: map[string]string{
"A": "B", "A": "B",
@ -141,7 +141,7 @@ func TestHTTP_CreateDatabase(t *testing.T) {
}, },
{ {
name: "database default", name: "database default",
config: &influxdb.HTTPConfig{ config: influxdb.HTTPConfig{
URL: u, URL: u,
}, },
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) { queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
@ -152,7 +152,7 @@ func TestHTTP_CreateDatabase(t *testing.T) {
}, },
{ {
name: "database name is escaped", name: "database name is escaped",
config: &influxdb.HTTPConfig{ config: influxdb.HTTPConfig{
URL: u, URL: u,
Database: `a " b`, Database: `a " b`,
}, },
@ -164,7 +164,7 @@ func TestHTTP_CreateDatabase(t *testing.T) {
}, },
{ {
name: "invalid database name creates api error", name: "invalid database name creates api error",
config: &influxdb.HTTPConfig{ config: influxdb.HTTPConfig{
URL: u, URL: u,
Database: `a \\ b`, Database: `a \\ b`,
}, },
@ -185,7 +185,7 @@ func TestHTTP_CreateDatabase(t *testing.T) {
}, },
{ {
name: "error with no response body", name: "error with no response body",
config: &influxdb.HTTPConfig{ config: influxdb.HTTPConfig{
URL: u, URL: u,
Database: "telegraf", Database: "telegraf",
}, },
@ -203,7 +203,7 @@ func TestHTTP_CreateDatabase(t *testing.T) {
}, },
{ {
name: "ok with no response body", name: "ok with no response body",
config: &influxdb.HTTPConfig{ config: influxdb.HTTPConfig{
URL: u, URL: u,
Database: "telegraf", Database: "telegraf",
}, },
@ -230,7 +230,7 @@ func TestHTTP_CreateDatabase(t *testing.T) {
client, err := influxdb.NewHTTPClient(tt.config) client, err := influxdb.NewHTTPClient(tt.config)
require.NoError(t, err) require.NoError(t, err)
err = client.CreateDatabase(ctx) err = client.CreateDatabase(ctx, client.Database())
if tt.errFunc != nil { if tt.errFunc != nil {
tt.errFunc(t, err) tt.errFunc(t, err)
} else { } else {
@ -251,14 +251,14 @@ func TestHTTP_Write(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
config *influxdb.HTTPConfig config influxdb.HTTPConfig
queryHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request) queryHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
errFunc func(t *testing.T, err error) errFunc func(t *testing.T, err error)
logFunc func(t *testing.T, str string) logFunc func(t *testing.T, str string)
}{ }{
{ {
name: "success", name: "success",
config: &influxdb.HTTPConfig{ config: influxdb.HTTPConfig{
URL: u, URL: u,
Database: "telegraf", Database: "telegraf",
}, },
@ -272,7 +272,7 @@ func TestHTTP_Write(t *testing.T) {
}, },
{ {
name: "send basic auth", name: "send basic auth",
config: &influxdb.HTTPConfig{ config: influxdb.HTTPConfig{
URL: u, URL: u,
Database: "telegraf", Database: "telegraf",
Username: "guy", Username: "guy",
@ -288,7 +288,7 @@ func TestHTTP_Write(t *testing.T) {
}, },
{ {
name: "send user agent", name: "send user agent",
config: &influxdb.HTTPConfig{ config: influxdb.HTTPConfig{
URL: u, URL: u,
Database: "telegraf", Database: "telegraf",
UserAgent: "telegraf", UserAgent: "telegraf",
@ -300,7 +300,7 @@ func TestHTTP_Write(t *testing.T) {
}, },
{ {
name: "default user agent", name: "default user agent",
config: &influxdb.HTTPConfig{ config: influxdb.HTTPConfig{
URL: u, URL: u,
Database: "telegraf", Database: "telegraf",
}, },
@ -311,7 +311,7 @@ func TestHTTP_Write(t *testing.T) {
}, },
{ {
name: "default database", name: "default database",
config: &influxdb.HTTPConfig{ config: influxdb.HTTPConfig{
URL: u, URL: u,
}, },
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) { queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
@ -321,7 +321,7 @@ func TestHTTP_Write(t *testing.T) {
}, },
{ {
name: "send headers", name: "send headers",
config: &influxdb.HTTPConfig{ config: influxdb.HTTPConfig{
URL: u, URL: u,
Headers: map[string]string{ Headers: map[string]string{
"A": "B", "A": "B",
@ -336,7 +336,7 @@ func TestHTTP_Write(t *testing.T) {
}, },
{ {
name: "send retention policy", name: "send retention policy",
config: &influxdb.HTTPConfig{ config: influxdb.HTTPConfig{
URL: u, URL: u,
Database: "telegraf", Database: "telegraf",
RetentionPolicy: "foo", RetentionPolicy: "foo",
@ -348,7 +348,7 @@ func TestHTTP_Write(t *testing.T) {
}, },
{ {
name: "send consistency", name: "send consistency",
config: &influxdb.HTTPConfig{ config: influxdb.HTTPConfig{
URL: u, URL: u,
Database: "telegraf", Database: "telegraf",
Consistency: "all", Consistency: "all",
@ -360,7 +360,7 @@ func TestHTTP_Write(t *testing.T) {
}, },
{ {
name: "hinted handoff not empty no log no error", name: "hinted handoff not empty no log no error",
config: &influxdb.HTTPConfig{ config: influxdb.HTTPConfig{
URL: u, URL: u,
Database: "telegraf", Database: "telegraf",
}, },
@ -374,7 +374,7 @@ func TestHTTP_Write(t *testing.T) {
}, },
{ {
name: "partial write errors are logged no error", name: "partial write errors are logged no error",
config: &influxdb.HTTPConfig{ config: influxdb.HTTPConfig{
URL: u, URL: u,
Database: "telegraf", Database: "telegraf",
}, },
@ -388,7 +388,7 @@ func TestHTTP_Write(t *testing.T) {
}, },
{ {
name: "parse errors are logged no error", name: "parse errors are logged no error",
config: &influxdb.HTTPConfig{ config: influxdb.HTTPConfig{
URL: u, URL: u,
Database: "telegraf", Database: "telegraf",
}, },
@ -402,7 +402,7 @@ func TestHTTP_Write(t *testing.T) {
}, },
{ {
name: "http error", name: "http error",
config: &influxdb.HTTPConfig{ config: influxdb.HTTPConfig{
URL: u, URL: u,
Database: "telegraf", Database: "telegraf",
}, },
@ -419,7 +419,7 @@ func TestHTTP_Write(t *testing.T) {
}, },
{ {
name: "http error with desc", name: "http error with desc",
config: &influxdb.HTTPConfig{ config: influxdb.HTTPConfig{
URL: u, URL: u,
Database: "telegraf", Database: "telegraf",
}, },
@ -520,14 +520,14 @@ func TestHTTP_WritePathPrefix(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
metrics := []telegraf.Metric{m} metrics := []telegraf.Metric{m}
config := &influxdb.HTTPConfig{ config := influxdb.HTTPConfig{
URL: u, URL: u,
Database: "telegraf", Database: "telegraf",
} }
client, err := influxdb.NewHTTPClient(config) client, err := influxdb.NewHTTPClient(config)
require.NoError(t, err) require.NoError(t, err)
err = client.CreateDatabase(ctx) err = client.CreateDatabase(ctx, config.Database)
require.NoError(t, err) require.NoError(t, err)
err = client.Write(ctx, metrics) err = client.Write(ctx, metrics)
require.NoError(t, err) require.NoError(t, err)
@ -573,7 +573,7 @@ func TestHTTP_WriteContentEncodingGzip(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
metrics := []telegraf.Metric{m} metrics := []telegraf.Metric{m}
config := &influxdb.HTTPConfig{ config := influxdb.HTTPConfig{
URL: u, URL: u,
Database: "telegraf", Database: "telegraf",
ContentEncoding: "gzip", ContentEncoding: "gzip",
@ -605,7 +605,7 @@ func TestHTTP_UnixSocket(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
config *influxdb.HTTPConfig config influxdb.HTTPConfig
database string database string
queryHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request) queryHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
writeHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request) writeHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
@ -613,7 +613,7 @@ func TestHTTP_UnixSocket(t *testing.T) {
}{ }{
{ {
name: "success", name: "success",
config: &influxdb.HTTPConfig{ config: influxdb.HTTPConfig{
URL: &url.URL{Scheme: "unix", Path: sock}, URL: &url.URL{Scheme: "unix", Path: sock},
Database: "xyzzy", Database: "xyzzy",
}, },
@ -649,7 +649,7 @@ func TestHTTP_UnixSocket(t *testing.T) {
client, err := influxdb.NewHTTPClient(tt.config) client, err := influxdb.NewHTTPClient(tt.config)
require.NoError(t, err) require.NoError(t, err)
err = client.CreateDatabase(ctx) err = client.CreateDatabase(ctx, tt.config.Database)
if tt.errFunc != nil { if tt.errFunc != nil {
tt.errFunc(t, err) tt.errFunc(t, err)
} else { } else {

View File

@ -24,10 +24,9 @@ var (
type Client interface { type Client interface {
Write(context.Context, []telegraf.Metric) error Write(context.Context, []telegraf.Metric) error
CreateDatabase(ctx context.Context) error CreateDatabase(ctx context.Context, database string) error
URL() string
Database() string Database() string
URL() string
} }
// InfluxDB struct is the primary data structure for the plugin // InfluxDB struct is the primary data structure for the plugin
@ -37,6 +36,7 @@ type InfluxDB struct {
Username string Username string
Password string Password string
Database string Database string
DatabaseTag string `toml:"database_tag"`
UserAgent string UserAgent string
RetentionPolicy string RetentionPolicy string
WriteConsistency string WriteConsistency string
@ -72,6 +72,10 @@ var sampleConfig = `
## For UDP url endpoint database needs to be configured on server side. ## For UDP url endpoint database needs to be configured on server side.
# database = "telegraf" # database = "telegraf"
## The value of this tag will be used to determine the database. If this
## tag is not set the 'database' option is used as the default.
# database_tag = ""
## If true, no CREATE DATABASE queries will be sent. Set to true when using ## If true, no CREATE DATABASE queries will be sent. Set to true when using
## Telegraf with a user without permissions to create databases or when the ## Telegraf with a user without permissions to create databases or when the
## database already exists. ## database already exists.
@ -205,14 +209,12 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
} }
switch apiError := err.(type) { switch apiError := err.(type) {
case *APIError: case *DatabaseNotFoundError:
if !i.SkipDatabaseCreation { if !i.SkipDatabaseCreation {
if apiError.Type == DatabaseNotFound { err := client.CreateDatabase(ctx, apiError.Database)
err := client.CreateDatabase(ctx)
if err != nil { if err != nil {
log.Printf("E! [outputs.influxdb] when writing to [%s]: database %q not found and failed to recreate", log.Printf("E! [outputs.influxdb] when writing to [%s]: database %q not found and failed to recreate",
client.URL(), client.Database()) client.URL(), apiError.Database)
}
} }
} }
} }
@ -255,6 +257,8 @@ func (i *InfluxDB) httpClient(ctx context.Context, url *url.URL, proxy *url.URL)
ContentEncoding: i.ContentEncoding, ContentEncoding: i.ContentEncoding,
Headers: i.HTTPHeaders, Headers: i.HTTPHeaders,
Database: i.Database, Database: i.Database,
DatabaseTag: i.DatabaseTag,
SkipDatabaseCreation: i.SkipDatabaseCreation,
RetentionPolicy: i.RetentionPolicy, RetentionPolicy: i.RetentionPolicy,
Consistency: i.WriteConsistency, Consistency: i.WriteConsistency,
Serializer: i.serializer, Serializer: i.serializer,
@ -266,10 +270,10 @@ func (i *InfluxDB) httpClient(ctx context.Context, url *url.URL, proxy *url.URL)
} }
if !i.SkipDatabaseCreation { if !i.SkipDatabaseCreation {
err = c.CreateDatabase(ctx) err = c.CreateDatabase(ctx, c.Database())
if err != nil { if err != nil {
log.Printf("W! [outputs.influxdb] when writing to [%s]: database %q creation failed: %v", log.Printf("W! [outputs.influxdb] when writing to [%s]: database %q creation failed: %v",
c.URL(), c.Database(), err) c.URL(), i.Database, err)
} }
} }
@ -281,10 +285,10 @@ func init() {
return &InfluxDB{ return &InfluxDB{
Timeout: internal.Duration{Duration: time.Second * 5}, Timeout: internal.Duration{Duration: time.Second * 5},
CreateHTTPClientF: func(config *HTTPConfig) (Client, error) { CreateHTTPClientF: func(config *HTTPConfig) (Client, error) {
return NewHTTPClient(config) return NewHTTPClient(*config)
}, },
CreateUDPClientF: func(config *UDPConfig) (Client, error) { CreateUDPClientF: func(config *UDPConfig) (Client, error) {
return NewUDPClient(config) return NewUDPClient(*config)
}, },
} }
}) })

View File

@ -16,25 +16,25 @@ import (
type MockClient struct { type MockClient struct {
URLF func() string URLF func() string
DatabaseF func() string
WriteF func(context.Context, []telegraf.Metric) error WriteF func(context.Context, []telegraf.Metric) error
CreateDatabaseF func(ctx context.Context) error CreateDatabaseF func(ctx context.Context, database string) error
DatabaseF func() string
} }
func (c *MockClient) URL() string { func (c *MockClient) URL() string {
return c.URLF() return c.URLF()
} }
func (c *MockClient) Database() string {
return c.DatabaseF()
}
func (c *MockClient) Write(ctx context.Context, metrics []telegraf.Metric) error { func (c *MockClient) Write(ctx context.Context, metrics []telegraf.Metric) error {
return c.WriteF(ctx, metrics) return c.WriteF(ctx, metrics)
} }
func (c *MockClient) CreateDatabase(ctx context.Context) error { func (c *MockClient) CreateDatabase(ctx context.Context, database string) error {
return c.CreateDatabaseF(ctx) return c.CreateDatabaseF(ctx, database)
}
func (c *MockClient) Database() string {
return c.DatabaseF()
} }
func TestDeprecatedURLSupport(t *testing.T) { func TestDeprecatedURLSupport(t *testing.T) {
@ -58,7 +58,10 @@ func TestDefaultURL(t *testing.T) {
CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) { CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) {
actual = config actual = config
return &MockClient{ return &MockClient{
CreateDatabaseF: func(ctx context.Context) error { DatabaseF: func() string {
return "telegraf"
},
CreateDatabaseF: func(ctx context.Context, database string) error {
return nil return nil
}, },
}, nil }, nil
@ -113,7 +116,10 @@ func TestConnectHTTPConfig(t *testing.T) {
CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) { CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) {
actual = config actual = config
return &MockClient{ return &MockClient{
CreateDatabaseF: func(ctx context.Context) error { DatabaseF: func() string {
return "telegraf"
},
CreateDatabaseF: func(ctx context.Context, database string) error {
return nil return nil
}, },
}, nil }, nil
@ -145,15 +151,19 @@ func TestWriteRecreateDatabaseIfDatabaseNotFound(t *testing.T) {
CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) { CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) {
return &MockClient{ return &MockClient{
CreateDatabaseF: func(ctx context.Context) error { DatabaseF: func() string {
return "telegraf"
},
CreateDatabaseF: func(ctx context.Context, database string) error {
return nil return nil
}, },
WriteF: func(ctx context.Context, metrics []telegraf.Metric) error { WriteF: func(ctx context.Context, metrics []telegraf.Metric) error {
return &influxdb.APIError{ return &influxdb.DatabaseNotFoundError{
APIError: influxdb.APIError{
StatusCode: http.StatusNotFound, StatusCode: http.StatusNotFound,
Title: "404 Not Found", Title: "404 Not Found",
Description: `database not found "telegraf"`, Description: `database not found "telegraf"`,
Type: influxdb.DatabaseNotFound, },
} }
}, },
URLF: func() string { URLF: func() string {

View File

@ -34,7 +34,7 @@ type UDPConfig struct {
Dialer Dialer Dialer Dialer
} }
func NewUDPClient(config *UDPConfig) (*udpClient, error) { func NewUDPClient(config UDPConfig) (*udpClient, error) {
if config.URL == nil { if config.URL == nil {
return nil, ErrMissingURL return nil, ErrMissingURL
} }
@ -113,7 +113,7 @@ func (c *udpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
return nil return nil
} }
func (c *udpClient) CreateDatabase(ctx context.Context) error { func (c *udpClient) CreateDatabase(ctx context.Context, database string) error {
return nil return nil
} }

View File

@ -66,14 +66,14 @@ func (d *MockDialer) DialContext(ctx context.Context, network string, address st
} }
func TestUDP_NewUDPClientNoURL(t *testing.T) { func TestUDP_NewUDPClientNoURL(t *testing.T) {
config := &influxdb.UDPConfig{} config := influxdb.UDPConfig{}
_, err := influxdb.NewUDPClient(config) _, err := influxdb.NewUDPClient(config)
require.Equal(t, err, influxdb.ErrMissingURL) require.Equal(t, err, influxdb.ErrMissingURL)
} }
func TestUDP_URL(t *testing.T) { func TestUDP_URL(t *testing.T) {
u := getURL() u := getURL()
config := &influxdb.UDPConfig{ config := influxdb.UDPConfig{
URL: u, URL: u,
} }
@ -86,7 +86,7 @@ func TestUDP_URL(t *testing.T) {
func TestUDP_Simple(t *testing.T) { func TestUDP_Simple(t *testing.T) {
var buffer bytes.Buffer var buffer bytes.Buffer
config := &influxdb.UDPConfig{ config := influxdb.UDPConfig{
URL: getURL(), URL: getURL(),
Dialer: &MockDialer{ Dialer: &MockDialer{
DialContextF: func(network, address string) (influxdb.Conn, error) { DialContextF: func(network, address string) (influxdb.Conn, error) {
@ -117,7 +117,7 @@ func TestUDP_DialError(t *testing.T) {
u, err := url.Parse("invalid://127.0.0.1:9999") u, err := url.Parse("invalid://127.0.0.1:9999")
require.NoError(t, err) require.NoError(t, err)
config := &influxdb.UDPConfig{ config := influxdb.UDPConfig{
URL: u, URL: u,
Dialer: &MockDialer{ Dialer: &MockDialer{
DialContextF: func(network, address string) (influxdb.Conn, error) { DialContextF: func(network, address string) (influxdb.Conn, error) {
@ -137,7 +137,7 @@ func TestUDP_DialError(t *testing.T) {
func TestUDP_WriteError(t *testing.T) { func TestUDP_WriteError(t *testing.T) {
closed := false closed := false
config := &influxdb.UDPConfig{ config := influxdb.UDPConfig{
URL: getURL(), URL: getURL(),
Dialer: &MockDialer{ Dialer: &MockDialer{
DialContextF: func(network, address string) (influxdb.Conn, error) { DialContextF: func(network, address string) (influxdb.Conn, error) {
@ -167,13 +167,13 @@ func TestUDP_WriteError(t *testing.T) {
func TestUDP_ErrorLogging(t *testing.T) { func TestUDP_ErrorLogging(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
config *influxdb.UDPConfig config influxdb.UDPConfig
metrics []telegraf.Metric metrics []telegraf.Metric
logContains string logContains string
}{ }{
{ {
name: "logs need more space", name: "logs need more space",
config: &influxdb.UDPConfig{ config: influxdb.UDPConfig{
MaxPayloadSize: 1, MaxPayloadSize: 1,
URL: getURL(), URL: getURL(),
Dialer: &MockDialer{ Dialer: &MockDialer{
@ -188,7 +188,7 @@ func TestUDP_ErrorLogging(t *testing.T) {
}, },
{ {
name: "logs series name", name: "logs series name",
config: &influxdb.UDPConfig{ config: influxdb.UDPConfig{
URL: getURL(), URL: getURL(),
Dialer: &MockDialer{ Dialer: &MockDialer{
DialContextF: func(network, address string) (influxdb.Conn, error) { DialContextF: func(network, address string) (influxdb.Conn, error) {
@ -258,7 +258,7 @@ func TestUDP_WriteWithRealConn(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("%s://%s", addr.Network(), addr)) u, err := url.Parse(fmt.Sprintf("%s://%s", addr.Network(), addr))
require.NoError(t, err) require.NoError(t, err)
config := &influxdb.UDPConfig{ config := influxdb.UDPConfig{
URL: u, URL: u,
} }
client, err := influxdb.NewUDPClient(config) client, err := influxdb.NewUDPClient(config)

View File

@ -22,6 +22,10 @@ The InfluxDB output plugin writes metrics to the [InfluxDB v2.x] HTTP service.
## Destination bucket to write into. ## Destination bucket to write into.
bucket = "" bucket = ""
## The value of this tag will be used to determine the bucket. If this
## tag is not set the 'bucket' option is used as the default.
# bucket_tag = ""
## Timeout for HTTP messages. ## Timeout for HTTP messages.
# timeout = "5s" # timeout = "5s"

View File

@ -20,13 +20,10 @@ import (
"github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/plugins/serializers/influx"
) )
type APIErrorType int
type APIError struct { type APIError struct {
StatusCode int StatusCode int
Title string Title string
Description string Description string
Type APIErrorType
} }
func (e APIError) Error() string { func (e APIError) Error() string {
@ -47,6 +44,7 @@ type HTTPConfig struct {
Token string Token string
Organization string Organization string
Bucket string Bucket string
BucketTag string
Timeout time.Duration Timeout time.Duration
Headers map[string]string Headers map[string]string
Proxy *url.URL Proxy *url.URL
@ -58,10 +56,12 @@ type HTTPConfig struct {
} }
type httpClient struct { type httpClient struct {
WriteURL string
ContentEncoding string ContentEncoding string
Timeout time.Duration Timeout time.Duration
Headers map[string]string Headers map[string]string
Organization string
Bucket string
BucketTag string
client *http.Client client *http.Client
serializer *influx.Serializer serializer *influx.Serializer
@ -103,14 +103,6 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
serializer = influx.NewSerializer() serializer = influx.NewSerializer()
} }
writeURL, err := makeWriteURL(
*config.URL,
config.Organization,
config.Bucket)
if err != nil {
return nil, err
}
var transport *http.Transport var transport *http.Transport
switch config.URL.Scheme { switch config.URL.Scheme {
case "http", "https": case "http", "https":
@ -139,10 +131,12 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
Transport: transport, Transport: transport,
}, },
url: config.URL, url: config.URL,
WriteURL: writeURL,
ContentEncoding: config.ContentEncoding, ContentEncoding: config.ContentEncoding,
Timeout: timeout, Timeout: timeout,
Headers: headers, Headers: headers,
Organization: config.Organization,
Bucket: config.Bucket,
BucketTag: config.BucketTag,
} }
return client, nil return client, nil
} }
@ -173,8 +167,45 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
if c.retryTime.After(time.Now()) { if c.retryTime.After(time.Now()) {
return errors.New("Retry time has not elapsed") return errors.New("Retry time has not elapsed")
} }
batches := make(map[string][]telegraf.Metric)
if c.BucketTag == "" {
err := c.writeBatch(ctx, c.Bucket, metrics)
if err != nil {
return err
}
} else {
for _, metric := range metrics {
bucket, ok := metric.GetTag(c.BucketTag)
if !ok {
bucket = c.Bucket
}
if _, ok := batches[bucket]; !ok {
batches[bucket] = make([]telegraf.Metric, 0)
}
batches[bucket] = append(batches[bucket], metric)
}
for bucket, batch := range batches {
err := c.writeBatch(ctx, bucket, batch)
if err != nil {
return err
}
}
}
return nil
}
func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []telegraf.Metric) error {
url, err := makeWriteURL(*c.url, c.Organization, bucket)
if err != nil {
return err
}
reader := influx.NewReader(metrics, c.serializer) reader := influx.NewReader(metrics, c.serializer)
req, err := c.makeWriteRequest(reader) req, err := c.makeWriteRequest(url, reader)
if err != nil { if err != nil {
return err return err
} }
@ -227,7 +258,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
} }
} }
func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) { func (c *httpClient) makeWriteRequest(url string, body io.Reader) (*http.Request, error) {
var err error var err error
if c.ContentEncoding == "gzip" { if c.ContentEncoding == "gzip" {
body, err = internal.CompressWithGzip(body) body, err = internal.CompressWithGzip(body)
@ -236,7 +267,7 @@ func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) {
} }
} }
req, err := http.NewRequest("POST", c.WriteURL, body) req, err := http.NewRequest("POST", url, body)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -1,7 +1,6 @@
package influxdb_v2 package influxdb_v2
import ( import (
"io"
"net/url" "net/url"
"testing" "testing"
@ -46,14 +45,3 @@ func TestMakeWriteURL(t *testing.T) {
} }
} }
} }
func TestMakeWriteRequest(t *testing.T) {
reader, _ := io.Pipe()
cli := httpClient{
WriteURL: "http://localhost:9999/v2/write?bucket=telegraf&org=influx",
ContentEncoding: "gzip",
Headers: map[string]string{"x": "y"},
}
_, err := cli.makeWriteRequest(reader)
require.NoError(t, err)
}

View File

@ -38,6 +38,10 @@ var sampleConfig = `
## Destination bucket to write into. ## Destination bucket to write into.
bucket = "" bucket = ""
## The value of this tag will be used to determine the bucket. If this
## tag is not set the 'bucket' option is used as the default.
# bucket_tag = ""
## Timeout for HTTP messages. ## Timeout for HTTP messages.
# timeout = "5s" # timeout = "5s"
@ -77,6 +81,7 @@ type InfluxDB struct {
Token string `toml:"token"` Token string `toml:"token"`
Organization string `toml:"organization"` Organization string `toml:"organization"`
Bucket string `toml:"bucket"` Bucket string `toml:"bucket"`
BucketTag string `toml:"bucket_tag"`
Timeout internal.Duration `toml:"timeout"` Timeout internal.Duration `toml:"timeout"`
HTTPHeaders map[string]string `toml:"http_headers"` HTTPHeaders map[string]string `toml:"http_headers"`
HTTPProxy string `toml:"http_proxy"` HTTPProxy string `toml:"http_proxy"`
@ -174,6 +179,7 @@ func (i *InfluxDB) getHTTPClient(ctx context.Context, url *url.URL, proxy *url.U
Token: i.Token, Token: i.Token,
Organization: i.Organization, Organization: i.Organization,
Bucket: i.Bucket, Bucket: i.Bucket,
BucketTag: i.BucketTag,
Timeout: i.Timeout.Duration, Timeout: i.Timeout.Duration,
Headers: i.HTTPHeaders, Headers: i.HTTPHeaders,
Proxy: proxy, Proxy: proxy,