Add support for setting retention policy using tag (#7141)

This commit is contained in:
Daniel Nelson 2020-03-10 15:20:03 -07:00 committed by GitHub
parent c50b02e58d
commit c7146be2f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 317 additions and 89 deletions

View File

@ -35,6 +35,13 @@ The InfluxDB output plugin writes metrics to the [InfluxDB v1.x] HTTP or UDP ser
## the default retention policy. Only takes effect when using HTTP.
# retention_policy = ""
## The value of this tag will be used to determine the retention policy. If this
## tag is not set the 'retention_policy' option is used as the default.
# retention_policy_tag = ""
## If true, the 'retention_policy_tag' will not be removed from the metric.
# exclude_retention_policy_tag = false
## Write consistency (clusters only), can be: "any", "one", "quorum", "all".
## Only takes effect when using HTTP.
# write_consistency = "any"

View File

@ -83,21 +83,23 @@ func (r WriteResponse) Error() string {
}
type HTTPConfig struct {
URL *url.URL
UserAgent string
Timeout time.Duration
Username string
Password string
TLSConfig *tls.Config
Proxy *url.URL
Headers map[string]string
ContentEncoding string
Database string
DatabaseTag string
ExcludeDatabaseTag bool
RetentionPolicy string
Consistency string
SkipDatabaseCreation bool
URL *url.URL
UserAgent string
Timeout time.Duration
Username string
Password string
TLSConfig *tls.Config
Proxy *url.URL
Headers map[string]string
ContentEncoding string
Database string
DatabaseTag string
ExcludeDatabaseTag bool
RetentionPolicy string
RetentionPolicyTag string
ExcludeRetentionPolicyTag bool
Consistency string
SkipDatabaseCreation bool
InfluxUintSupport bool `toml:"influx_uint_support"`
Serializer *influx.Serializer
@ -236,55 +238,66 @@ func (c *httpClient) CreateDatabase(ctx context.Context, database string) error
}
}
type dbrp struct {
Database string
RetentionPolicy string
}
// Write sends the metrics to InfluxDB
func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error {
batches := make(map[string][]telegraf.Metric)
if c.config.DatabaseTag == "" {
err := c.writeBatch(ctx, c.config.Database, metrics)
// If these options are not used, we can skip in plugin batching and send
// the full batch in a single request.
if c.config.DatabaseTag == "" && c.config.RetentionPolicyTag == "" {
return c.writeBatch(ctx, c.config.Database, c.config.RetentionPolicy, metrics)
}
batches := make(map[dbrp][]telegraf.Metric)
for _, metric := range metrics {
db, ok := metric.GetTag(c.config.DatabaseTag)
if !ok {
db = c.config.Database
}
rp, ok := metric.GetTag(c.config.RetentionPolicyTag)
if !ok {
rp = c.config.RetentionPolicy
}
dbrp := dbrp{
Database: db,
RetentionPolicy: rp,
}
if c.config.ExcludeDatabaseTag || c.config.ExcludeRetentionPolicyTag {
// Avoid modifying the metric in case we need to retry the request.
metric = metric.Copy()
metric.Accept()
metric.RemoveTag(c.config.DatabaseTag)
metric.RemoveTag(c.config.RetentionPolicyTag)
}
batches[dbrp] = append(batches[dbrp], metric)
}
for dbrp, batch := range batches {
if !c.config.SkipDatabaseCreation && !c.createdDatabases[dbrp.Database] {
err := c.CreateDatabase(ctx, dbrp.Database)
if err != nil {
c.log.Warnf("When writing to [%s]: database %q creation failed: %v",
c.config.URL, dbrp.Database, err)
}
}
err := c.writeBatch(ctx, dbrp.Database, dbrp.RetentionPolicy, batch)
if err != nil {
return err
}
} else {
for _, metric := range metrics {
db, ok := metric.GetTag(c.config.DatabaseTag)
if !ok {
db = c.config.Database
}
if _, ok := batches[db]; !ok {
batches[db] = make([]telegraf.Metric, 0)
}
if c.config.ExcludeDatabaseTag {
// Avoid modifying the metric in case we need to retry the request.
metric = metric.Copy()
metric.Accept()
metric.RemoveTag(c.config.DatabaseTag)
}
batches[db] = append(batches[db], metric)
}
for db, batch := range batches {
if !c.config.SkipDatabaseCreation && !c.createdDatabases[db] {
err := c.CreateDatabase(ctx, db)
if err != nil {
c.log.Warnf("When writing to [%s]: database %q creation failed: %v",
c.config.URL, db, err)
}
}
err := c.writeBatch(ctx, db, batch)
if err != nil {
return err
}
}
}
return nil
}
func (c *httpClient) writeBatch(ctx context.Context, db string, metrics []telegraf.Metric) error {
url, err := makeWriteURL(c.config.URL, db, c.config.RetentionPolicy, c.config.Consistency)
func (c *httpClient) writeBatch(ctx context.Context, db, rp string, metrics []telegraf.Metric) error {
url, err := makeWriteURL(c.config.URL, db, rp, c.config.Consistency)
if err != nil {
return err
}

View File

@ -733,3 +733,200 @@ func TestHTTP_WriteDatabaseTagWorksOnRetry(t *testing.T) {
err = client.Write(ctx, metrics)
require.NoError(t, err)
}
func TestDBRPTags(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)
tests := []struct {
name string
config influxdb.HTTPConfig
metrics []telegraf.Metric
handlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
url string
}{
{
name: "defaults",
config: influxdb.HTTPConfig{
URL: u,
Database: "telegraf",
},
metrics: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"database": "foo",
},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
handlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.FormValue("db"), "telegraf")
require.Equal(t, r.FormValue("rp"), "")
w.WriteHeader(http.StatusNoContent)
},
},
{
name: "static retention policy",
config: influxdb.HTTPConfig{
URL: u,
Database: "telegraf",
RetentionPolicy: "foo",
},
metrics: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
handlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.FormValue("db"), "telegraf")
require.Equal(t, r.FormValue("rp"), "foo")
w.WriteHeader(http.StatusNoContent)
},
},
{
name: "retention policy tag",
config: influxdb.HTTPConfig{
URL: u,
SkipDatabaseCreation: true,
Database: "telegraf",
RetentionPolicyTag: "rp",
Log: testutil.Logger{},
},
metrics: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"rp": "foo",
},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
handlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.FormValue("db"), "telegraf")
require.Equal(t, r.FormValue("rp"), "foo")
body, err := ioutil.ReadAll(r.Body)
require.NoError(t, err)
require.Contains(t, string(body), "cpu,rp=foo value=42")
w.WriteHeader(http.StatusNoContent)
},
},
{
name: "retention policy tag fallback to static rp",
config: influxdb.HTTPConfig{
URL: u,
SkipDatabaseCreation: true,
Database: "telegraf",
RetentionPolicy: "foo",
RetentionPolicyTag: "rp",
Log: testutil.Logger{},
},
metrics: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
handlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.FormValue("db"), "telegraf")
require.Equal(t, r.FormValue("rp"), "foo")
w.WriteHeader(http.StatusNoContent)
},
},
{
name: "retention policy tag fallback to unset rp",
config: influxdb.HTTPConfig{
URL: u,
SkipDatabaseCreation: true,
Database: "telegraf",
RetentionPolicyTag: "rp",
Log: testutil.Logger{},
},
metrics: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
handlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.FormValue("db"), "telegraf")
require.Equal(t, r.FormValue("rp"), "")
w.WriteHeader(http.StatusNoContent)
},
},
{
name: "exclude retention policy tag",
config: influxdb.HTTPConfig{
URL: u,
SkipDatabaseCreation: true,
Database: "telegraf",
RetentionPolicyTag: "rp",
ExcludeRetentionPolicyTag: true,
Log: testutil.Logger{},
},
metrics: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"rp": "foo",
},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
handlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.FormValue("db"), "telegraf")
require.Equal(t, r.FormValue("rp"), "foo")
body, err := ioutil.ReadAll(r.Body)
require.NoError(t, err)
require.Contains(t, string(body), "cpu value=42")
w.WriteHeader(http.StatusNoContent)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
tt.handlerFunc(t, w, r)
return
default:
w.WriteHeader(http.StatusNotFound)
return
}
})
client, err := influxdb.NewHTTPClient(tt.config)
require.NoError(t, err)
ctx := context.Background()
err = client.Write(ctx, tt.metrics)
require.NoError(t, err)
})
}
}

View File

@ -31,23 +31,25 @@ type Client interface {
// InfluxDB struct is the primary data structure for the plugin
type InfluxDB struct {
URL string // url deprecated in 0.1.9; use urls
URLs []string `toml:"urls"`
Username string
Password string
Database string
DatabaseTag string `toml:"database_tag"`
ExcludeDatabaseTag bool `toml:"exclude_database_tag"`
UserAgent string
RetentionPolicy string
WriteConsistency string
Timeout internal.Duration
UDPPayload internal.Size `toml:"udp_payload"`
HTTPProxy string `toml:"http_proxy"`
HTTPHeaders map[string]string `toml:"http_headers"`
ContentEncoding string `toml:"content_encoding"`
SkipDatabaseCreation bool `toml:"skip_database_creation"`
InfluxUintSupport bool `toml:"influx_uint_support"`
URL string // url deprecated in 0.1.9; use urls
URLs []string `toml:"urls"`
Username string `toml:"username"`
Password string `toml:"password"`
Database string `toml:"database"`
DatabaseTag string `toml:"database_tag"`
ExcludeDatabaseTag bool `toml:"exclude_database_tag"`
RetentionPolicy string `toml:"retention_policy"`
RetentionPolicyTag string `toml:"retention_policy_tag"`
ExcludeRetentionPolicyTag bool `toml:"exclude_retention_policy_tag"`
UserAgent string `toml:"user_agent"`
WriteConsistency string `toml:"write_consistency"`
Timeout internal.Duration `toml:"timeout"`
UDPPayload internal.Size `toml:"udp_payload"`
HTTPProxy string `toml:"http_proxy"`
HTTPHeaders map[string]string `toml:"http_headers"`
ContentEncoding string `toml:"content_encoding"`
SkipDatabaseCreation bool `toml:"skip_database_creation"`
InfluxUintSupport bool `toml:"influx_uint_support"`
tls.ClientConfig
Precision string // precision deprecated in 1.0; value is ignored
@ -89,6 +91,13 @@ var sampleConfig = `
## the default retention policy. Only takes effect when using HTTP.
# retention_policy = ""
## The value of this tag will be used to determine the retention policy. If this
## tag is not set the 'retention_policy' option is used as the default.
# retention_policy_tag = ""
## If true, the 'retention_policy_tag' will not be removed from the metric.
# exclude_retention_policy_tag = false
## Write consistency (clusters only), can be: "any", "one", "quorum", "all".
## Only takes effect when using HTTP.
# write_consistency = "any"
@ -250,23 +259,25 @@ func (i *InfluxDB) httpClient(ctx context.Context, url *url.URL, proxy *url.URL)
}
config := &HTTPConfig{
URL: url,
Timeout: i.Timeout.Duration,
TLSConfig: tlsConfig,
UserAgent: i.UserAgent,
Username: i.Username,
Password: i.Password,
Proxy: proxy,
ContentEncoding: i.ContentEncoding,
Headers: i.HTTPHeaders,
Database: i.Database,
DatabaseTag: i.DatabaseTag,
ExcludeDatabaseTag: i.ExcludeDatabaseTag,
SkipDatabaseCreation: i.SkipDatabaseCreation,
RetentionPolicy: i.RetentionPolicy,
Consistency: i.WriteConsistency,
Serializer: i.newSerializer(),
Log: i.Log,
URL: url,
Timeout: i.Timeout.Duration,
TLSConfig: tlsConfig,
UserAgent: i.UserAgent,
Username: i.Username,
Password: i.Password,
Proxy: proxy,
ContentEncoding: i.ContentEncoding,
Headers: i.HTTPHeaders,
Database: i.Database,
DatabaseTag: i.DatabaseTag,
ExcludeDatabaseTag: i.ExcludeDatabaseTag,
SkipDatabaseCreation: i.SkipDatabaseCreation,
RetentionPolicy: i.RetentionPolicy,
RetentionPolicyTag: i.RetentionPolicyTag,
ExcludeRetentionPolicyTag: i.ExcludeRetentionPolicyTag,
Consistency: i.WriteConsistency,
Serializer: i.newSerializer(),
Log: i.Log,
}
c, err := i.CreateHTTPClientF(config)