From 65b76dc74604d48236d5b4c2bb69719b419254cd Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 27 Feb 2019 10:54:02 -0800 Subject: [PATCH] Add tag based routing in influxdb/influxdb_v2 outputs (#5490) --- plugins/outputs/influxdb/README.md | 4 + plugins/outputs/influxdb/http.go | 202 ++++++++++-------- plugins/outputs/influxdb/http_test.go | 68 +++--- plugins/outputs/influxdb/influxdb.go | 58 ++--- plugins/outputs/influxdb/influxdb_test.go | 42 ++-- plugins/outputs/influxdb/udp.go | 4 +- plugins/outputs/influxdb/udp_test.go | 18 +- plugins/outputs/influxdb_v2/README.md | 4 + plugins/outputs/influxdb_v2/http.go | 63 ++++-- .../outputs/influxdb_v2/http_internal_test.go | 12 -- plugins/outputs/influxdb_v2/influxdb.go | 6 + 11 files changed, 273 insertions(+), 208 deletions(-) diff --git a/plugins/outputs/influxdb/README.md b/plugins/outputs/influxdb/README.md index 8a9f1a5b8..48ab3d51b 100644 --- a/plugins/outputs/influxdb/README.md +++ b/plugins/outputs/influxdb/README.md @@ -19,6 +19,10 @@ The InfluxDB output plugin writes metrics to the [InfluxDB v1.x] HTTP or UDP ser ## For UDP url endpoint database needs to be configured on server side. # database = "telegraf" + ## The value of this tag will be used to determine the database. If this + ## tag is not set the 'database' option is used as the default. + # database_tag = "" + ## If true, no CREATE DATABASE queries will be sent. Set to true when using ## Telegraf with a user without permissions to create databases or when the ## database already exists. diff --git a/plugins/outputs/influxdb/http.go b/plugins/outputs/influxdb/http.go index 5a589dc0e..43aa55ea8 100644 --- a/plugins/outputs/influxdb/http.go +++ b/plugins/outputs/influxdb/http.go @@ -19,13 +19,6 @@ import ( "github.com/influxdata/telegraf/plugins/serializers/influx" ) -type APIErrorType int - -const ( - _ APIErrorType = iota - DatabaseNotFound -) - const ( defaultRequestTimeout = time.Second * 5 defaultDatabase = "telegraf" @@ -37,7 +30,6 @@ const ( ) var ( - // Escape an identifier in InfluxQL. escapeIdentifier = strings.NewReplacer( "\n", `\n`, @@ -46,12 +38,11 @@ var ( ) ) -// APIError is an error reported by the InfluxDB server +// APIError is a general error reported by the InfluxDB server type APIError struct { StatusCode int Title string Description string - Type APIErrorType } func (e APIError) Error() string { @@ -61,6 +52,11 @@ func (e APIError) Error() string { return e.Title } +type DatabaseNotFoundError struct { + APIError + Database string +} + // QueryResponse is the response body from the /query endpoint type QueryResponse struct { Results []QueryResult `json:"results"` @@ -87,51 +83,42 @@ func (r WriteResponse) Error() string { } type HTTPConfig struct { - URL *url.URL - UserAgent string - Timeout time.Duration - Username string - Password string - TLSConfig *tls.Config - Proxy *url.URL - Headers map[string]string - ContentEncoding string - Database string - RetentionPolicy string - Consistency string + URL *url.URL + UserAgent string + Timeout time.Duration + Username string + Password string + TLSConfig *tls.Config + Proxy *url.URL + Headers map[string]string + ContentEncoding string + Database string + DatabaseTag string + RetentionPolicy string + Consistency string + SkipDatabaseCreation bool InfluxUintSupport bool `toml:"influx_uint_support"` Serializer *influx.Serializer } type httpClient struct { - WriteURL string - QueryURL string - ContentEncoding string - Timeout time.Duration - Username string - Password string - Headers map[string]string - - client *http.Client - serializer *influx.Serializer - url *url.URL - database string + client *http.Client + config HTTPConfig + createdDatabases map[string]bool } -func NewHTTPClient(config *HTTPConfig) (*httpClient, error) { +func NewHTTPClient(config HTTPConfig) (*httpClient, error) { if config.URL == nil { return nil, ErrMissingURL } - database := config.Database - if database == "" { - database = defaultDatabase + if config.Database == "" { + config.Database = defaultDatabase } - timeout := config.Timeout - if timeout == 0 { - timeout = defaultRequestTimeout + if config.Timeout == 0 { + config.Timeout = defaultRequestTimeout } userAgent := config.UserAgent @@ -139,10 +126,12 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) { userAgent = "Telegraf/" + internal.Version() } - var headers = make(map[string]string, len(config.Headers)+1) - headers["User-Agent"] = userAgent + if config.Headers == nil { + config.Headers = make(map[string]string) + } + config.Headers["User-Agent"] = userAgent for k, v := range config.Headers { - headers[k] = v + config.Headers[k] = v } var proxy func(*http.Request) (*url.URL, error) @@ -152,22 +141,8 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) { proxy = http.ProxyFromEnvironment } - serializer := config.Serializer - if serializer == nil { - serializer = influx.NewSerializer() - } - - writeURL, err := makeWriteURL( - config.URL, - database, - config.RetentionPolicy, - config.Consistency) - if err != nil { - return nil, err - } - queryURL, err := makeQueryURL(config.URL) - if err != nil { - return nil, err + if config.Serializer == nil { + config.Serializer = influx.NewSerializer() } var transport *http.Transport @@ -192,40 +167,32 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) { } client := &httpClient{ - serializer: serializer, client: &http.Client{ - Timeout: timeout, + Timeout: config.Timeout, Transport: transport, }, - database: database, - url: config.URL, - WriteURL: writeURL, - QueryURL: queryURL, - ContentEncoding: config.ContentEncoding, - Timeout: timeout, - Username: config.Username, - Password: config.Password, - Headers: headers, + createdDatabases: make(map[string]bool), + config: config, } return client, nil } // URL returns the origin URL that this client connects too. func (c *httpClient) URL() string { - return c.url.String() + return c.config.URL.String() } -// URL returns the database that this client connects too. +// Database returns the default database that this client connects too. func (c *httpClient) Database() string { - return c.database + return c.config.Database } // CreateDatabase attempts to create a new database in the InfluxDB server. // Note that some names are not allowed by the server, notably those with // non-printable characters or slashes. -func (c *httpClient) CreateDatabase(ctx context.Context) error { +func (c *httpClient) CreateDatabase(ctx context.Context, database string) error { query := fmt.Sprintf(`CREATE DATABASE "%s"`, - escapeIdentifier.Replace(c.database)) + escapeIdentifier.Replace(database)) req, err := c.makeQueryRequest(query) @@ -241,6 +208,7 @@ func (c *httpClient) CreateDatabase(ctx context.Context) error { if err != nil { if resp.StatusCode == 200 { + c.createdDatabases[database] = true return nil } @@ -252,6 +220,7 @@ func (c *httpClient) CreateDatabase(ctx context.Context) error { // Even with a 200 response there can be an error if resp.StatusCode == http.StatusOK && queryResp.Error() == "" { + c.createdDatabases[database] = true return nil } @@ -264,10 +233,52 @@ func (c *httpClient) CreateDatabase(ctx context.Context) error { // Write sends the metrics to InfluxDB func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error { - var err error + batches := make(map[string][]telegraf.Metric) + if c.config.DatabaseTag == "" { + err := c.writeBatch(ctx, c.config.Database, metrics) + if err != nil { + return err + } + } else { + for _, metric := range metrics { + db, ok := metric.GetTag(c.config.DatabaseTag) + if !ok { + db = c.config.Database + } - reader := influx.NewReader(metrics, c.serializer) - req, err := c.makeWriteRequest(reader) + if _, ok := batches[db]; !ok { + batches[db] = make([]telegraf.Metric, 0) + } + + batches[db] = append(batches[db], metric) + } + + for db, batch := range batches { + if !c.config.SkipDatabaseCreation && !c.createdDatabases[db] { + err := c.CreateDatabase(ctx, db) + if err != nil { + log.Printf("W! [outputs.influxdb] when writing to [%s]: database %q creation failed: %v", + c.config.URL, db, err) + } + } + + err := c.writeBatch(ctx, db, batch) + if err != nil { + return err + } + } + } + return nil +} + +func (c *httpClient) writeBatch(ctx context.Context, db string, metrics []telegraf.Metric) error { + url, err := makeWriteURL(c.config.URL, db, c.config.RetentionPolicy, c.config.Consistency) + if err != nil { + return err + } + + reader := influx.NewReader(metrics, c.config.Serializer) + req, err := c.makeWriteRequest(url, reader) if err != nil { return err } @@ -292,11 +303,13 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error } if strings.Contains(desc, errStringDatabaseNotFound) { - return &APIError{ - StatusCode: resp.StatusCode, - Title: resp.Status, - Description: desc, - Type: DatabaseNotFound, + return &DatabaseNotFoundError{ + APIError: APIError{ + StatusCode: resp.StatusCode, + Title: resp.Status, + Description: desc, + }, + Database: db, } } @@ -340,11 +353,16 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error } func (c *httpClient) makeQueryRequest(query string) (*http.Request, error) { + queryURL, err := makeQueryURL(c.config.URL) + if err != nil { + return nil, err + } + params := url.Values{} params.Set("q", query) form := strings.NewReader(params.Encode()) - req, err := http.NewRequest("POST", c.QueryURL, form) + req, err := http.NewRequest("POST", queryURL, form) if err != nil { return nil, err } @@ -355,16 +373,16 @@ func (c *httpClient) makeQueryRequest(query string) (*http.Request, error) { return req, nil } -func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) { +func (c *httpClient) makeWriteRequest(url string, body io.Reader) (*http.Request, error) { var err error - if c.ContentEncoding == "gzip" { + if c.config.ContentEncoding == "gzip" { body, err = internal.CompressWithGzip(body) if err != nil { return nil, err } } - req, err := http.NewRequest("POST", c.WriteURL, body) + req, err := http.NewRequest("POST", url, body) if err != nil { return nil, err } @@ -372,7 +390,7 @@ func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) { req.Header.Set("Content-Type", "text/plain; charset=utf-8") c.addHeaders(req) - if c.ContentEncoding == "gzip" { + if c.config.ContentEncoding == "gzip" { req.Header.Set("Content-Encoding", "gzip") } @@ -380,11 +398,11 @@ func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) { } func (c *httpClient) addHeaders(req *http.Request) { - if c.Username != "" || c.Password != "" { - req.SetBasicAuth(c.Username, c.Password) + if c.config.Username != "" || c.config.Password != "" { + req.SetBasicAuth(c.config.Username, c.config.Password) } - for header, value := range c.Headers { + for header, value := range c.config.Headers { req.Header.Set(header, value) } } diff --git a/plugins/outputs/influxdb/http_test.go b/plugins/outputs/influxdb/http_test.go index fa648f0f8..2b6b45eef 100644 --- a/plugins/outputs/influxdb/http_test.go +++ b/plugins/outputs/influxdb/http_test.go @@ -33,14 +33,14 @@ func getHTTPURL() *url.URL { } func TestHTTP_EmptyConfig(t *testing.T) { - config := &influxdb.HTTPConfig{} + config := influxdb.HTTPConfig{} _, err := influxdb.NewHTTPClient(config) require.Error(t, err) require.Contains(t, err.Error(), influxdb.ErrMissingURL.Error()) } func TestHTTP_MinimalConfig(t *testing.T) { - config := &influxdb.HTTPConfig{ + config := influxdb.HTTPConfig{ URL: getHTTPURL(), } _, err := influxdb.NewHTTPClient(config) @@ -48,7 +48,7 @@ func TestHTTP_MinimalConfig(t *testing.T) { } func TestHTTP_UnsupportedScheme(t *testing.T) { - config := &influxdb.HTTPConfig{ + config := influxdb.HTTPConfig{ URL: &url.URL{ Scheme: "foo", Host: "localhost", @@ -69,14 +69,14 @@ func TestHTTP_CreateDatabase(t *testing.T) { tests := []struct { name string - config *influxdb.HTTPConfig + config influxdb.HTTPConfig database string queryHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request) errFunc func(t *testing.T, err error) }{ { name: "success", - config: &influxdb.HTTPConfig{ + config: influxdb.HTTPConfig{ URL: u, Database: "xyzzy", }, @@ -88,7 +88,7 @@ func TestHTTP_CreateDatabase(t *testing.T) { }, { name: "send basic auth", - config: &influxdb.HTTPConfig{ + config: influxdb.HTTPConfig{ URL: u, Username: "guy", Password: "smiley", @@ -106,7 +106,7 @@ func TestHTTP_CreateDatabase(t *testing.T) { }, { name: "send user agent", - config: &influxdb.HTTPConfig{ + config: influxdb.HTTPConfig{ URL: u, Headers: map[string]string{ "A": "B", @@ -124,7 +124,7 @@ func TestHTTP_CreateDatabase(t *testing.T) { }, { name: "send headers", - config: &influxdb.HTTPConfig{ + config: influxdb.HTTPConfig{ URL: u, Headers: map[string]string{ "A": "B", @@ -141,7 +141,7 @@ func TestHTTP_CreateDatabase(t *testing.T) { }, { name: "database default", - config: &influxdb.HTTPConfig{ + config: influxdb.HTTPConfig{ URL: u, }, queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) { @@ -152,7 +152,7 @@ func TestHTTP_CreateDatabase(t *testing.T) { }, { name: "database name is escaped", - config: &influxdb.HTTPConfig{ + config: influxdb.HTTPConfig{ URL: u, Database: `a " b`, }, @@ -164,7 +164,7 @@ func TestHTTP_CreateDatabase(t *testing.T) { }, { name: "invalid database name creates api error", - config: &influxdb.HTTPConfig{ + config: influxdb.HTTPConfig{ URL: u, Database: `a \\ b`, }, @@ -185,7 +185,7 @@ func TestHTTP_CreateDatabase(t *testing.T) { }, { name: "error with no response body", - config: &influxdb.HTTPConfig{ + config: influxdb.HTTPConfig{ URL: u, Database: "telegraf", }, @@ -203,7 +203,7 @@ func TestHTTP_CreateDatabase(t *testing.T) { }, { name: "ok with no response body", - config: &influxdb.HTTPConfig{ + config: influxdb.HTTPConfig{ URL: u, Database: "telegraf", }, @@ -230,7 +230,7 @@ func TestHTTP_CreateDatabase(t *testing.T) { client, err := influxdb.NewHTTPClient(tt.config) require.NoError(t, err) - err = client.CreateDatabase(ctx) + err = client.CreateDatabase(ctx, client.Database()) if tt.errFunc != nil { tt.errFunc(t, err) } else { @@ -251,14 +251,14 @@ func TestHTTP_Write(t *testing.T) { tests := []struct { name string - config *influxdb.HTTPConfig + config influxdb.HTTPConfig queryHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request) errFunc func(t *testing.T, err error) logFunc func(t *testing.T, str string) }{ { name: "success", - config: &influxdb.HTTPConfig{ + config: influxdb.HTTPConfig{ URL: u, Database: "telegraf", }, @@ -272,7 +272,7 @@ func TestHTTP_Write(t *testing.T) { }, { name: "send basic auth", - config: &influxdb.HTTPConfig{ + config: influxdb.HTTPConfig{ URL: u, Database: "telegraf", Username: "guy", @@ -288,7 +288,7 @@ func TestHTTP_Write(t *testing.T) { }, { name: "send user agent", - config: &influxdb.HTTPConfig{ + config: influxdb.HTTPConfig{ URL: u, Database: "telegraf", UserAgent: "telegraf", @@ -300,7 +300,7 @@ func TestHTTP_Write(t *testing.T) { }, { name: "default user agent", - config: &influxdb.HTTPConfig{ + config: influxdb.HTTPConfig{ URL: u, Database: "telegraf", }, @@ -311,7 +311,7 @@ func TestHTTP_Write(t *testing.T) { }, { name: "default database", - config: &influxdb.HTTPConfig{ + config: influxdb.HTTPConfig{ URL: u, }, queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) { @@ -321,7 +321,7 @@ func TestHTTP_Write(t *testing.T) { }, { name: "send headers", - config: &influxdb.HTTPConfig{ + config: influxdb.HTTPConfig{ URL: u, Headers: map[string]string{ "A": "B", @@ -336,7 +336,7 @@ func TestHTTP_Write(t *testing.T) { }, { name: "send retention policy", - config: &influxdb.HTTPConfig{ + config: influxdb.HTTPConfig{ URL: u, Database: "telegraf", RetentionPolicy: "foo", @@ -348,7 +348,7 @@ func TestHTTP_Write(t *testing.T) { }, { name: "send consistency", - config: &influxdb.HTTPConfig{ + config: influxdb.HTTPConfig{ URL: u, Database: "telegraf", Consistency: "all", @@ -360,7 +360,7 @@ func TestHTTP_Write(t *testing.T) { }, { name: "hinted handoff not empty no log no error", - config: &influxdb.HTTPConfig{ + config: influxdb.HTTPConfig{ URL: u, Database: "telegraf", }, @@ -374,7 +374,7 @@ func TestHTTP_Write(t *testing.T) { }, { name: "partial write errors are logged no error", - config: &influxdb.HTTPConfig{ + config: influxdb.HTTPConfig{ URL: u, Database: "telegraf", }, @@ -388,7 +388,7 @@ func TestHTTP_Write(t *testing.T) { }, { name: "parse errors are logged no error", - config: &influxdb.HTTPConfig{ + config: influxdb.HTTPConfig{ URL: u, Database: "telegraf", }, @@ -402,7 +402,7 @@ func TestHTTP_Write(t *testing.T) { }, { name: "http error", - config: &influxdb.HTTPConfig{ + config: influxdb.HTTPConfig{ URL: u, Database: "telegraf", }, @@ -419,7 +419,7 @@ func TestHTTP_Write(t *testing.T) { }, { name: "http error with desc", - config: &influxdb.HTTPConfig{ + config: influxdb.HTTPConfig{ URL: u, Database: "telegraf", }, @@ -520,14 +520,14 @@ func TestHTTP_WritePathPrefix(t *testing.T) { require.NoError(t, err) metrics := []telegraf.Metric{m} - config := &influxdb.HTTPConfig{ + config := influxdb.HTTPConfig{ URL: u, Database: "telegraf", } client, err := influxdb.NewHTTPClient(config) require.NoError(t, err) - err = client.CreateDatabase(ctx) + err = client.CreateDatabase(ctx, config.Database) require.NoError(t, err) err = client.Write(ctx, metrics) require.NoError(t, err) @@ -573,7 +573,7 @@ func TestHTTP_WriteContentEncodingGzip(t *testing.T) { require.NoError(t, err) metrics := []telegraf.Metric{m} - config := &influxdb.HTTPConfig{ + config := influxdb.HTTPConfig{ URL: u, Database: "telegraf", ContentEncoding: "gzip", @@ -605,7 +605,7 @@ func TestHTTP_UnixSocket(t *testing.T) { tests := []struct { name string - config *influxdb.HTTPConfig + config influxdb.HTTPConfig database string queryHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request) writeHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request) @@ -613,7 +613,7 @@ func TestHTTP_UnixSocket(t *testing.T) { }{ { name: "success", - config: &influxdb.HTTPConfig{ + config: influxdb.HTTPConfig{ URL: &url.URL{Scheme: "unix", Path: sock}, Database: "xyzzy", }, @@ -649,7 +649,7 @@ func TestHTTP_UnixSocket(t *testing.T) { client, err := influxdb.NewHTTPClient(tt.config) require.NoError(t, err) - err = client.CreateDatabase(ctx) + err = client.CreateDatabase(ctx, tt.config.Database) if tt.errFunc != nil { tt.errFunc(t, err) } else { diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index a3f2fd003..3b3e80206 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -24,10 +24,9 @@ var ( type Client interface { Write(context.Context, []telegraf.Metric) error - CreateDatabase(ctx context.Context) error - - URL() string + CreateDatabase(ctx context.Context, database string) error Database() string + URL() string } // InfluxDB struct is the primary data structure for the plugin @@ -37,6 +36,7 @@ type InfluxDB struct { Username string Password string Database string + DatabaseTag string `toml:"database_tag"` UserAgent string RetentionPolicy string WriteConsistency string @@ -72,6 +72,10 @@ var sampleConfig = ` ## For UDP url endpoint database needs to be configured on server side. # database = "telegraf" + ## The value of this tag will be used to determine the database. If this + ## tag is not set the 'database' option is used as the default. + # database_tag = "" + ## If true, no CREATE DATABASE queries will be sent. Set to true when using ## Telegraf with a user without permissions to create databases or when the ## database already exists. @@ -205,14 +209,12 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { } switch apiError := err.(type) { - case *APIError: + case *DatabaseNotFoundError: if !i.SkipDatabaseCreation { - if apiError.Type == DatabaseNotFound { - err := client.CreateDatabase(ctx) - if err != nil { - log.Printf("E! [outputs.influxdb] when writing to [%s]: database %q not found and failed to recreate", - client.URL(), client.Database()) - } + err := client.CreateDatabase(ctx, apiError.Database) + if err != nil { + log.Printf("E! [outputs.influxdb] when writing to [%s]: database %q not found and failed to recreate", + client.URL(), apiError.Database) } } } @@ -245,19 +247,21 @@ func (i *InfluxDB) httpClient(ctx context.Context, url *url.URL, proxy *url.URL) } config := &HTTPConfig{ - URL: url, - Timeout: i.Timeout.Duration, - TLSConfig: tlsConfig, - UserAgent: i.UserAgent, - Username: i.Username, - Password: i.Password, - Proxy: proxy, - ContentEncoding: i.ContentEncoding, - Headers: i.HTTPHeaders, - Database: i.Database, - RetentionPolicy: i.RetentionPolicy, - Consistency: i.WriteConsistency, - Serializer: i.serializer, + URL: url, + Timeout: i.Timeout.Duration, + TLSConfig: tlsConfig, + UserAgent: i.UserAgent, + Username: i.Username, + Password: i.Password, + Proxy: proxy, + ContentEncoding: i.ContentEncoding, + Headers: i.HTTPHeaders, + Database: i.Database, + DatabaseTag: i.DatabaseTag, + SkipDatabaseCreation: i.SkipDatabaseCreation, + RetentionPolicy: i.RetentionPolicy, + Consistency: i.WriteConsistency, + Serializer: i.serializer, } c, err := i.CreateHTTPClientF(config) @@ -266,10 +270,10 @@ func (i *InfluxDB) httpClient(ctx context.Context, url *url.URL, proxy *url.URL) } if !i.SkipDatabaseCreation { - err = c.CreateDatabase(ctx) + err = c.CreateDatabase(ctx, c.Database()) if err != nil { log.Printf("W! [outputs.influxdb] when writing to [%s]: database %q creation failed: %v", - c.URL(), c.Database(), err) + c.URL(), i.Database, err) } } @@ -281,10 +285,10 @@ func init() { return &InfluxDB{ Timeout: internal.Duration{Duration: time.Second * 5}, CreateHTTPClientF: func(config *HTTPConfig) (Client, error) { - return NewHTTPClient(config) + return NewHTTPClient(*config) }, CreateUDPClientF: func(config *UDPConfig) (Client, error) { - return NewUDPClient(config) + return NewUDPClient(*config) }, } }) diff --git a/plugins/outputs/influxdb/influxdb_test.go b/plugins/outputs/influxdb/influxdb_test.go index 63ecc47be..2f47d8134 100644 --- a/plugins/outputs/influxdb/influxdb_test.go +++ b/plugins/outputs/influxdb/influxdb_test.go @@ -16,25 +16,25 @@ import ( type MockClient struct { URLF func() string - DatabaseF func() string WriteF func(context.Context, []telegraf.Metric) error - CreateDatabaseF func(ctx context.Context) error + CreateDatabaseF func(ctx context.Context, database string) error + DatabaseF func() string } func (c *MockClient) URL() string { return c.URLF() } -func (c *MockClient) Database() string { - return c.DatabaseF() -} - func (c *MockClient) Write(ctx context.Context, metrics []telegraf.Metric) error { return c.WriteF(ctx, metrics) } -func (c *MockClient) CreateDatabase(ctx context.Context) error { - return c.CreateDatabaseF(ctx) +func (c *MockClient) CreateDatabase(ctx context.Context, database string) error { + return c.CreateDatabaseF(ctx, database) +} + +func (c *MockClient) Database() string { + return c.DatabaseF() } func TestDeprecatedURLSupport(t *testing.T) { @@ -58,7 +58,10 @@ func TestDefaultURL(t *testing.T) { CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) { actual = config return &MockClient{ - CreateDatabaseF: func(ctx context.Context) error { + DatabaseF: func() string { + return "telegraf" + }, + CreateDatabaseF: func(ctx context.Context, database string) error { return nil }, }, nil @@ -113,7 +116,10 @@ func TestConnectHTTPConfig(t *testing.T) { CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) { actual = config return &MockClient{ - CreateDatabaseF: func(ctx context.Context) error { + DatabaseF: func() string { + return "telegraf" + }, + CreateDatabaseF: func(ctx context.Context, database string) error { return nil }, }, nil @@ -145,15 +151,19 @@ func TestWriteRecreateDatabaseIfDatabaseNotFound(t *testing.T) { CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) { return &MockClient{ - CreateDatabaseF: func(ctx context.Context) error { + DatabaseF: func() string { + return "telegraf" + }, + CreateDatabaseF: func(ctx context.Context, database string) error { return nil }, WriteF: func(ctx context.Context, metrics []telegraf.Metric) error { - return &influxdb.APIError{ - StatusCode: http.StatusNotFound, - Title: "404 Not Found", - Description: `database not found "telegraf"`, - Type: influxdb.DatabaseNotFound, + return &influxdb.DatabaseNotFoundError{ + APIError: influxdb.APIError{ + StatusCode: http.StatusNotFound, + Title: "404 Not Found", + Description: `database not found "telegraf"`, + }, } }, URLF: func() string { diff --git a/plugins/outputs/influxdb/udp.go b/plugins/outputs/influxdb/udp.go index 8e636d340..31c854def 100644 --- a/plugins/outputs/influxdb/udp.go +++ b/plugins/outputs/influxdb/udp.go @@ -34,7 +34,7 @@ type UDPConfig struct { Dialer Dialer } -func NewUDPClient(config *UDPConfig) (*udpClient, error) { +func NewUDPClient(config UDPConfig) (*udpClient, error) { if config.URL == nil { return nil, ErrMissingURL } @@ -113,7 +113,7 @@ func (c *udpClient) Write(ctx context.Context, metrics []telegraf.Metric) error return nil } -func (c *udpClient) CreateDatabase(ctx context.Context) error { +func (c *udpClient) CreateDatabase(ctx context.Context, database string) error { return nil } diff --git a/plugins/outputs/influxdb/udp_test.go b/plugins/outputs/influxdb/udp_test.go index 2d21fd7bf..136ebb787 100644 --- a/plugins/outputs/influxdb/udp_test.go +++ b/plugins/outputs/influxdb/udp_test.go @@ -66,14 +66,14 @@ func (d *MockDialer) DialContext(ctx context.Context, network string, address st } func TestUDP_NewUDPClientNoURL(t *testing.T) { - config := &influxdb.UDPConfig{} + config := influxdb.UDPConfig{} _, err := influxdb.NewUDPClient(config) require.Equal(t, err, influxdb.ErrMissingURL) } func TestUDP_URL(t *testing.T) { u := getURL() - config := &influxdb.UDPConfig{ + config := influxdb.UDPConfig{ URL: u, } @@ -86,7 +86,7 @@ func TestUDP_URL(t *testing.T) { func TestUDP_Simple(t *testing.T) { var buffer bytes.Buffer - config := &influxdb.UDPConfig{ + config := influxdb.UDPConfig{ URL: getURL(), Dialer: &MockDialer{ DialContextF: func(network, address string) (influxdb.Conn, error) { @@ -117,7 +117,7 @@ func TestUDP_DialError(t *testing.T) { u, err := url.Parse("invalid://127.0.0.1:9999") require.NoError(t, err) - config := &influxdb.UDPConfig{ + config := influxdb.UDPConfig{ URL: u, Dialer: &MockDialer{ DialContextF: func(network, address string) (influxdb.Conn, error) { @@ -137,7 +137,7 @@ func TestUDP_DialError(t *testing.T) { func TestUDP_WriteError(t *testing.T) { closed := false - config := &influxdb.UDPConfig{ + config := influxdb.UDPConfig{ URL: getURL(), Dialer: &MockDialer{ DialContextF: func(network, address string) (influxdb.Conn, error) { @@ -167,13 +167,13 @@ func TestUDP_WriteError(t *testing.T) { func TestUDP_ErrorLogging(t *testing.T) { tests := []struct { name string - config *influxdb.UDPConfig + config influxdb.UDPConfig metrics []telegraf.Metric logContains string }{ { name: "logs need more space", - config: &influxdb.UDPConfig{ + config: influxdb.UDPConfig{ MaxPayloadSize: 1, URL: getURL(), Dialer: &MockDialer{ @@ -188,7 +188,7 @@ func TestUDP_ErrorLogging(t *testing.T) { }, { name: "logs series name", - config: &influxdb.UDPConfig{ + config: influxdb.UDPConfig{ URL: getURL(), Dialer: &MockDialer{ DialContextF: func(network, address string) (influxdb.Conn, error) { @@ -258,7 +258,7 @@ func TestUDP_WriteWithRealConn(t *testing.T) { u, err := url.Parse(fmt.Sprintf("%s://%s", addr.Network(), addr)) require.NoError(t, err) - config := &influxdb.UDPConfig{ + config := influxdb.UDPConfig{ URL: u, } client, err := influxdb.NewUDPClient(config) diff --git a/plugins/outputs/influxdb_v2/README.md b/plugins/outputs/influxdb_v2/README.md index 245391d48..830e70b41 100644 --- a/plugins/outputs/influxdb_v2/README.md +++ b/plugins/outputs/influxdb_v2/README.md @@ -22,6 +22,10 @@ The InfluxDB output plugin writes metrics to the [InfluxDB v2.x] HTTP service. ## Destination bucket to write into. bucket = "" + ## The value of this tag will be used to determine the bucket. If this + ## tag is not set the 'bucket' option is used as the default. + # bucket_tag = "" + ## Timeout for HTTP messages. # timeout = "5s" diff --git a/plugins/outputs/influxdb_v2/http.go b/plugins/outputs/influxdb_v2/http.go index 8709a9b84..cdc40c148 100644 --- a/plugins/outputs/influxdb_v2/http.go +++ b/plugins/outputs/influxdb_v2/http.go @@ -20,13 +20,10 @@ import ( "github.com/influxdata/telegraf/plugins/serializers/influx" ) -type APIErrorType int - type APIError struct { StatusCode int Title string Description string - Type APIErrorType } func (e APIError) Error() string { @@ -47,6 +44,7 @@ type HTTPConfig struct { Token string Organization string Bucket string + BucketTag string Timeout time.Duration Headers map[string]string Proxy *url.URL @@ -58,10 +56,12 @@ type HTTPConfig struct { } type httpClient struct { - WriteURL string ContentEncoding string Timeout time.Duration Headers map[string]string + Organization string + Bucket string + BucketTag string client *http.Client serializer *influx.Serializer @@ -103,14 +103,6 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) { serializer = influx.NewSerializer() } - writeURL, err := makeWriteURL( - *config.URL, - config.Organization, - config.Bucket) - if err != nil { - return nil, err - } - var transport *http.Transport switch config.URL.Scheme { case "http", "https": @@ -139,10 +131,12 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) { Transport: transport, }, url: config.URL, - WriteURL: writeURL, ContentEncoding: config.ContentEncoding, Timeout: timeout, Headers: headers, + Organization: config.Organization, + Bucket: config.Bucket, + BucketTag: config.BucketTag, } return client, nil } @@ -173,8 +167,45 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error if c.retryTime.After(time.Now()) { return errors.New("Retry time has not elapsed") } + + batches := make(map[string][]telegraf.Metric) + if c.BucketTag == "" { + err := c.writeBatch(ctx, c.Bucket, metrics) + if err != nil { + return err + } + } else { + for _, metric := range metrics { + bucket, ok := metric.GetTag(c.BucketTag) + if !ok { + bucket = c.Bucket + } + + if _, ok := batches[bucket]; !ok { + batches[bucket] = make([]telegraf.Metric, 0) + } + + batches[bucket] = append(batches[bucket], metric) + } + + for bucket, batch := range batches { + err := c.writeBatch(ctx, bucket, batch) + if err != nil { + return err + } + } + } + return nil +} + +func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []telegraf.Metric) error { + url, err := makeWriteURL(*c.url, c.Organization, bucket) + if err != nil { + return err + } + reader := influx.NewReader(metrics, c.serializer) - req, err := c.makeWriteRequest(reader) + req, err := c.makeWriteRequest(url, reader) if err != nil { return err } @@ -227,7 +258,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error } } -func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) { +func (c *httpClient) makeWriteRequest(url string, body io.Reader) (*http.Request, error) { var err error if c.ContentEncoding == "gzip" { body, err = internal.CompressWithGzip(body) @@ -236,7 +267,7 @@ func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) { } } - req, err := http.NewRequest("POST", c.WriteURL, body) + req, err := http.NewRequest("POST", url, body) if err != nil { return nil, err } diff --git a/plugins/outputs/influxdb_v2/http_internal_test.go b/plugins/outputs/influxdb_v2/http_internal_test.go index 748519a7b..e9685da12 100644 --- a/plugins/outputs/influxdb_v2/http_internal_test.go +++ b/plugins/outputs/influxdb_v2/http_internal_test.go @@ -1,7 +1,6 @@ package influxdb_v2 import ( - "io" "net/url" "testing" @@ -46,14 +45,3 @@ func TestMakeWriteURL(t *testing.T) { } } } - -func TestMakeWriteRequest(t *testing.T) { - reader, _ := io.Pipe() - cli := httpClient{ - WriteURL: "http://localhost:9999/v2/write?bucket=telegraf&org=influx", - ContentEncoding: "gzip", - Headers: map[string]string{"x": "y"}, - } - _, err := cli.makeWriteRequest(reader) - require.NoError(t, err) -} diff --git a/plugins/outputs/influxdb_v2/influxdb.go b/plugins/outputs/influxdb_v2/influxdb.go index a3722a046..d0d6800a6 100644 --- a/plugins/outputs/influxdb_v2/influxdb.go +++ b/plugins/outputs/influxdb_v2/influxdb.go @@ -38,6 +38,10 @@ var sampleConfig = ` ## Destination bucket to write into. bucket = "" + ## The value of this tag will be used to determine the bucket. If this + ## tag is not set the 'bucket' option is used as the default. + # bucket_tag = "" + ## Timeout for HTTP messages. # timeout = "5s" @@ -77,6 +81,7 @@ type InfluxDB struct { Token string `toml:"token"` Organization string `toml:"organization"` Bucket string `toml:"bucket"` + BucketTag string `toml:"bucket_tag"` Timeout internal.Duration `toml:"timeout"` HTTPHeaders map[string]string `toml:"http_headers"` HTTPProxy string `toml:"http_proxy"` @@ -174,6 +179,7 @@ func (i *InfluxDB) getHTTPClient(ctx context.Context, url *url.URL, proxy *url.U Token: i.Token, Organization: i.Organization, Bucket: i.Bucket, + BucketTag: i.BucketTag, Timeout: i.Timeout.Duration, Headers: i.HTTPHeaders, Proxy: proxy,