diff --git a/plugins/outputs/influxdb/README.md b/plugins/outputs/influxdb/README.md index 1d11443ac..f82a3b344 100644 --- a/plugins/outputs/influxdb/README.md +++ b/plugins/outputs/influxdb/README.md @@ -35,6 +35,13 @@ The InfluxDB output plugin writes metrics to the [InfluxDB v1.x] HTTP or UDP ser ## the default retention policy. Only takes effect when using HTTP. # retention_policy = "" + ## The value of this tag will be used to determine the retention policy. If this + ## tag is not set the 'retention_policy' option is used as the default. + # retention_policy_tag = "" + + ## If true, the 'retention_policy_tag' will not be removed from the metric. + # exclude_retention_policy_tag = false + ## Write consistency (clusters only), can be: "any", "one", "quorum", "all". ## Only takes effect when using HTTP. # write_consistency = "any" diff --git a/plugins/outputs/influxdb/http.go b/plugins/outputs/influxdb/http.go index d449c9456..b663d9198 100644 --- a/plugins/outputs/influxdb/http.go +++ b/plugins/outputs/influxdb/http.go @@ -83,21 +83,23 @@ func (r WriteResponse) Error() string { } type HTTPConfig struct { - URL *url.URL - UserAgent string - Timeout time.Duration - Username string - Password string - TLSConfig *tls.Config - Proxy *url.URL - Headers map[string]string - ContentEncoding string - Database string - DatabaseTag string - ExcludeDatabaseTag bool - RetentionPolicy string - Consistency string - SkipDatabaseCreation bool + URL *url.URL + UserAgent string + Timeout time.Duration + Username string + Password string + TLSConfig *tls.Config + Proxy *url.URL + Headers map[string]string + ContentEncoding string + Database string + DatabaseTag string + ExcludeDatabaseTag bool + RetentionPolicy string + RetentionPolicyTag string + ExcludeRetentionPolicyTag bool + Consistency string + SkipDatabaseCreation bool InfluxUintSupport bool `toml:"influx_uint_support"` Serializer *influx.Serializer @@ -236,55 +238,66 @@ func (c *httpClient) CreateDatabase(ctx context.Context, database string) error } } +type dbrp struct { + Database string + RetentionPolicy string +} + // Write sends the metrics to InfluxDB func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error { - batches := make(map[string][]telegraf.Metric) - if c.config.DatabaseTag == "" { - err := c.writeBatch(ctx, c.config.Database, metrics) + // If these options are not used, we can skip in plugin batching and send + // the full batch in a single request. + if c.config.DatabaseTag == "" && c.config.RetentionPolicyTag == "" { + return c.writeBatch(ctx, c.config.Database, c.config.RetentionPolicy, metrics) + } + + batches := make(map[dbrp][]telegraf.Metric) + for _, metric := range metrics { + db, ok := metric.GetTag(c.config.DatabaseTag) + if !ok { + db = c.config.Database + } + + rp, ok := metric.GetTag(c.config.RetentionPolicyTag) + if !ok { + rp = c.config.RetentionPolicy + } + + dbrp := dbrp{ + Database: db, + RetentionPolicy: rp, + } + + if c.config.ExcludeDatabaseTag || c.config.ExcludeRetentionPolicyTag { + // Avoid modifying the metric in case we need to retry the request. + metric = metric.Copy() + metric.Accept() + metric.RemoveTag(c.config.DatabaseTag) + metric.RemoveTag(c.config.RetentionPolicyTag) + } + + batches[dbrp] = append(batches[dbrp], metric) + } + + for dbrp, batch := range batches { + if !c.config.SkipDatabaseCreation && !c.createdDatabases[dbrp.Database] { + err := c.CreateDatabase(ctx, dbrp.Database) + if err != nil { + c.log.Warnf("When writing to [%s]: database %q creation failed: %v", + c.config.URL, dbrp.Database, err) + } + } + + err := c.writeBatch(ctx, dbrp.Database, dbrp.RetentionPolicy, batch) if err != nil { return err } - } else { - for _, metric := range metrics { - db, ok := metric.GetTag(c.config.DatabaseTag) - if !ok { - db = c.config.Database - } - - if _, ok := batches[db]; !ok { - batches[db] = make([]telegraf.Metric, 0) - } - - if c.config.ExcludeDatabaseTag { - // Avoid modifying the metric in case we need to retry the request. - metric = metric.Copy() - metric.Accept() - metric.RemoveTag(c.config.DatabaseTag) - } - - batches[db] = append(batches[db], metric) - } - - for db, batch := range batches { - if !c.config.SkipDatabaseCreation && !c.createdDatabases[db] { - err := c.CreateDatabase(ctx, db) - if err != nil { - c.log.Warnf("When writing to [%s]: database %q creation failed: %v", - c.config.URL, db, err) - } - } - - err := c.writeBatch(ctx, db, batch) - if err != nil { - return err - } - } } return nil } -func (c *httpClient) writeBatch(ctx context.Context, db string, metrics []telegraf.Metric) error { - url, err := makeWriteURL(c.config.URL, db, c.config.RetentionPolicy, c.config.Consistency) +func (c *httpClient) writeBatch(ctx context.Context, db, rp string, metrics []telegraf.Metric) error { + url, err := makeWriteURL(c.config.URL, db, rp, c.config.Consistency) if err != nil { return err } diff --git a/plugins/outputs/influxdb/http_test.go b/plugins/outputs/influxdb/http_test.go index a09b02d43..3f5ef0bc6 100644 --- a/plugins/outputs/influxdb/http_test.go +++ b/plugins/outputs/influxdb/http_test.go @@ -733,3 +733,200 @@ func TestHTTP_WriteDatabaseTagWorksOnRetry(t *testing.T) { err = client.Write(ctx, metrics) require.NoError(t, err) } + +func TestDBRPTags(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) + + tests := []struct { + name string + config influxdb.HTTPConfig + metrics []telegraf.Metric + handlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request) + url string + }{ + { + name: "defaults", + config: influxdb.HTTPConfig{ + URL: u, + Database: "telegraf", + }, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "database": "foo", + }, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + }, + handlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + require.Equal(t, r.FormValue("db"), "telegraf") + require.Equal(t, r.FormValue("rp"), "") + w.WriteHeader(http.StatusNoContent) + }, + }, + { + name: "static retention policy", + config: influxdb.HTTPConfig{ + URL: u, + Database: "telegraf", + RetentionPolicy: "foo", + }, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + }, + handlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + require.Equal(t, r.FormValue("db"), "telegraf") + require.Equal(t, r.FormValue("rp"), "foo") + w.WriteHeader(http.StatusNoContent) + }, + }, + { + name: "retention policy tag", + config: influxdb.HTTPConfig{ + URL: u, + SkipDatabaseCreation: true, + Database: "telegraf", + RetentionPolicyTag: "rp", + Log: testutil.Logger{}, + }, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "rp": "foo", + }, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + }, + handlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + require.Equal(t, r.FormValue("db"), "telegraf") + require.Equal(t, r.FormValue("rp"), "foo") + body, err := ioutil.ReadAll(r.Body) + require.NoError(t, err) + require.Contains(t, string(body), "cpu,rp=foo value=42") + w.WriteHeader(http.StatusNoContent) + }, + }, + { + name: "retention policy tag fallback to static rp", + config: influxdb.HTTPConfig{ + URL: u, + SkipDatabaseCreation: true, + Database: "telegraf", + RetentionPolicy: "foo", + RetentionPolicyTag: "rp", + Log: testutil.Logger{}, + }, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + }, + handlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + require.Equal(t, r.FormValue("db"), "telegraf") + require.Equal(t, r.FormValue("rp"), "foo") + w.WriteHeader(http.StatusNoContent) + }, + }, + { + name: "retention policy tag fallback to unset rp", + config: influxdb.HTTPConfig{ + URL: u, + SkipDatabaseCreation: true, + Database: "telegraf", + RetentionPolicyTag: "rp", + Log: testutil.Logger{}, + }, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + }, + handlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + require.Equal(t, r.FormValue("db"), "telegraf") + require.Equal(t, r.FormValue("rp"), "") + w.WriteHeader(http.StatusNoContent) + }, + }, + { + name: "exclude retention policy tag", + config: influxdb.HTTPConfig{ + URL: u, + SkipDatabaseCreation: true, + Database: "telegraf", + RetentionPolicyTag: "rp", + ExcludeRetentionPolicyTag: true, + Log: testutil.Logger{}, + }, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "rp": "foo", + }, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + }, + handlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + require.Equal(t, r.FormValue("db"), "telegraf") + require.Equal(t, r.FormValue("rp"), "foo") + body, err := ioutil.ReadAll(r.Body) + require.NoError(t, err) + require.Contains(t, string(body), "cpu value=42") + w.WriteHeader(http.StatusNoContent) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/write": + tt.handlerFunc(t, w, r) + return + default: + w.WriteHeader(http.StatusNotFound) + return + } + }) + + client, err := influxdb.NewHTTPClient(tt.config) + require.NoError(t, err) + + ctx := context.Background() + err = client.Write(ctx, tt.metrics) + require.NoError(t, err) + }) + } +} diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index be462ba03..4306f55c6 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -31,23 +31,25 @@ type Client interface { // InfluxDB struct is the primary data structure for the plugin type InfluxDB struct { - URL string // url deprecated in 0.1.9; use urls - URLs []string `toml:"urls"` - Username string - Password string - Database string - DatabaseTag string `toml:"database_tag"` - ExcludeDatabaseTag bool `toml:"exclude_database_tag"` - UserAgent string - RetentionPolicy string - WriteConsistency string - Timeout internal.Duration - UDPPayload internal.Size `toml:"udp_payload"` - HTTPProxy string `toml:"http_proxy"` - HTTPHeaders map[string]string `toml:"http_headers"` - ContentEncoding string `toml:"content_encoding"` - SkipDatabaseCreation bool `toml:"skip_database_creation"` - InfluxUintSupport bool `toml:"influx_uint_support"` + URL string // url deprecated in 0.1.9; use urls + URLs []string `toml:"urls"` + Username string `toml:"username"` + Password string `toml:"password"` + Database string `toml:"database"` + DatabaseTag string `toml:"database_tag"` + ExcludeDatabaseTag bool `toml:"exclude_database_tag"` + RetentionPolicy string `toml:"retention_policy"` + RetentionPolicyTag string `toml:"retention_policy_tag"` + ExcludeRetentionPolicyTag bool `toml:"exclude_retention_policy_tag"` + UserAgent string `toml:"user_agent"` + WriteConsistency string `toml:"write_consistency"` + Timeout internal.Duration `toml:"timeout"` + UDPPayload internal.Size `toml:"udp_payload"` + HTTPProxy string `toml:"http_proxy"` + HTTPHeaders map[string]string `toml:"http_headers"` + ContentEncoding string `toml:"content_encoding"` + SkipDatabaseCreation bool `toml:"skip_database_creation"` + InfluxUintSupport bool `toml:"influx_uint_support"` tls.ClientConfig Precision string // precision deprecated in 1.0; value is ignored @@ -89,6 +91,13 @@ var sampleConfig = ` ## the default retention policy. Only takes effect when using HTTP. # retention_policy = "" + ## The value of this tag will be used to determine the retention policy. If this + ## tag is not set the 'retention_policy' option is used as the default. + # retention_policy_tag = "" + + ## If true, the 'retention_policy_tag' will not be removed from the metric. + # exclude_retention_policy_tag = false + ## Write consistency (clusters only), can be: "any", "one", "quorum", "all". ## Only takes effect when using HTTP. # write_consistency = "any" @@ -250,23 +259,25 @@ func (i *InfluxDB) httpClient(ctx context.Context, url *url.URL, proxy *url.URL) } config := &HTTPConfig{ - URL: url, - Timeout: i.Timeout.Duration, - TLSConfig: tlsConfig, - UserAgent: i.UserAgent, - Username: i.Username, - Password: i.Password, - Proxy: proxy, - ContentEncoding: i.ContentEncoding, - Headers: i.HTTPHeaders, - Database: i.Database, - DatabaseTag: i.DatabaseTag, - ExcludeDatabaseTag: i.ExcludeDatabaseTag, - SkipDatabaseCreation: i.SkipDatabaseCreation, - RetentionPolicy: i.RetentionPolicy, - Consistency: i.WriteConsistency, - Serializer: i.newSerializer(), - Log: i.Log, + URL: url, + Timeout: i.Timeout.Duration, + TLSConfig: tlsConfig, + UserAgent: i.UserAgent, + Username: i.Username, + Password: i.Password, + Proxy: proxy, + ContentEncoding: i.ContentEncoding, + Headers: i.HTTPHeaders, + Database: i.Database, + DatabaseTag: i.DatabaseTag, + ExcludeDatabaseTag: i.ExcludeDatabaseTag, + SkipDatabaseCreation: i.SkipDatabaseCreation, + RetentionPolicy: i.RetentionPolicy, + RetentionPolicyTag: i.RetentionPolicyTag, + ExcludeRetentionPolicyTag: i.ExcludeRetentionPolicyTag, + Consistency: i.WriteConsistency, + Serializer: i.newSerializer(), + Log: i.Log, } c, err := i.CreateHTTPClientF(config)