Run create database query once per database (#7333)
This commit is contained in:
parent
c4e9f72936
commit
b77dac9fdf
|
@ -107,9 +107,13 @@ type HTTPConfig struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type httpClient struct {
|
type httpClient struct {
|
||||||
client *http.Client
|
client *http.Client
|
||||||
config HTTPConfig
|
config HTTPConfig
|
||||||
createdDatabases map[string]bool
|
// Tracks that the 'create database` statement was executed for the
|
||||||
|
// database. An attempt to create the database is made each time a new
|
||||||
|
// database is encountered in the database_tag and after a "database not
|
||||||
|
// found" error occurs.
|
||||||
|
createDatabaseExecuted map[string]bool
|
||||||
|
|
||||||
log telegraf.Logger
|
log telegraf.Logger
|
||||||
}
|
}
|
||||||
|
@ -177,9 +181,9 @@ func NewHTTPClient(config HTTPConfig) (*httpClient, error) {
|
||||||
Timeout: config.Timeout,
|
Timeout: config.Timeout,
|
||||||
Transport: transport,
|
Transport: transport,
|
||||||
},
|
},
|
||||||
createdDatabases: make(map[string]bool),
|
createDatabaseExecuted: make(map[string]bool),
|
||||||
config: config,
|
config: config,
|
||||||
log: config.Log,
|
log: config.Log,
|
||||||
}
|
}
|
||||||
return client, nil
|
return client, nil
|
||||||
}
|
}
|
||||||
|
@ -215,7 +219,6 @@ func (c *httpClient) CreateDatabase(ctx context.Context, database string) error
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if resp.StatusCode == 200 {
|
if resp.StatusCode == 200 {
|
||||||
c.createdDatabases[database] = true
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -225,12 +228,19 @@ func (c *httpClient) CreateDatabase(ctx context.Context, database string) error
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Even with a 200 response there can be an error
|
// Even with a 200 status code there can be an error in the response body.
|
||||||
|
// If there is also no error string then the operation was successful.
|
||||||
if resp.StatusCode == http.StatusOK && queryResp.Error() == "" {
|
if resp.StatusCode == http.StatusOK && queryResp.Error() == "" {
|
||||||
c.createdDatabases[database] = true
|
c.createDatabaseExecuted[database] = true
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Don't attempt to recreate the database after a 403 Forbidden error.
|
||||||
|
// This behavior exists only to maintain backwards compatiblity.
|
||||||
|
if resp.StatusCode == http.StatusForbidden {
|
||||||
|
c.createDatabaseExecuted[database] = true
|
||||||
|
}
|
||||||
|
|
||||||
return &APIError{
|
return &APIError{
|
||||||
StatusCode: resp.StatusCode,
|
StatusCode: resp.StatusCode,
|
||||||
Title: resp.Status,
|
Title: resp.Status,
|
||||||
|
@ -284,7 +294,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
|
||||||
}
|
}
|
||||||
|
|
||||||
for dbrp, batch := range batches {
|
for dbrp, batch := range batches {
|
||||||
if !c.config.SkipDatabaseCreation && !c.createdDatabases[dbrp.Database] {
|
if !c.config.SkipDatabaseCreation && !c.createDatabaseExecuted[dbrp.Database] {
|
||||||
err := c.CreateDatabase(ctx, dbrp.Database)
|
err := c.CreateDatabase(ctx, dbrp.Database)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Warnf("When writing to [%s]: database %q creation failed: %v",
|
c.log.Warnf("When writing to [%s]: database %q creation failed: %v",
|
||||||
|
|
|
@ -959,3 +959,179 @@ func TestDBRPTags(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type MockHandlerChain struct {
|
||||||
|
handlers []http.HandlerFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *MockHandlerChain) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if len(h.handlers) == 0 {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
next, rest := h.handlers[0], h.handlers[1:]
|
||||||
|
h.handlers = rest
|
||||||
|
next(w, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *MockHandlerChain) Done() bool {
|
||||||
|
return len(h.handlers) == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDBRPTagsCreateDatabaseNotCalledOnRetryAfterForbidden(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.NotFoundHandler())
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
handlers := &MockHandlerChain{
|
||||||
|
handlers: []http.HandlerFunc{
|
||||||
|
func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.URL.Path {
|
||||||
|
case "/query":
|
||||||
|
if r.FormValue("q") != `CREATE DATABASE "telegraf"` {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusForbidden)
|
||||||
|
w.Write([]byte(`{"results": [{"error": "error authorizing query"}]}`))
|
||||||
|
default:
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.URL.Path {
|
||||||
|
case "/write":
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
default:
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.URL.Path {
|
||||||
|
case "/write":
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
default:
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
ts.Config.Handler = handlers
|
||||||
|
|
||||||
|
metrics := []telegraf.Metric{
|
||||||
|
testutil.MustMetric(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"time_idle": 42.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
output := influxdb.InfluxDB{
|
||||||
|
URL: u.String(),
|
||||||
|
Database: "telegraf",
|
||||||
|
DatabaseTag: "database",
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) {
|
||||||
|
return influxdb.NewHTTPClient(*config)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err = output.Connect()
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = output.Write(metrics)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = output.Write(metrics)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.True(t, handlers.Done(), "all handlers not called")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDBRPTagsCreateDatabaseCalledOnDatabaseNotFound(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.NotFoundHandler())
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
handlers := &MockHandlerChain{
|
||||||
|
handlers: []http.HandlerFunc{
|
||||||
|
func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.URL.Path {
|
||||||
|
case "/query":
|
||||||
|
if r.FormValue("q") != `CREATE DATABASE "telegraf"` {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusForbidden)
|
||||||
|
w.Write([]byte(`{"results": [{"error": "error authorizing query"}]}`))
|
||||||
|
default:
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.URL.Path {
|
||||||
|
case "/write":
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
w.Write([]byte(`{"error": "database not found: \"telegraf\""}`))
|
||||||
|
default:
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.URL.Path {
|
||||||
|
case "/query":
|
||||||
|
if r.FormValue("q") != `CREATE DATABASE "telegraf"` {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusForbidden)
|
||||||
|
default:
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.URL.Path {
|
||||||
|
case "/write":
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
default:
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
ts.Config.Handler = handlers
|
||||||
|
|
||||||
|
metrics := []telegraf.Metric{
|
||||||
|
testutil.MustMetric(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"time_idle": 42.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
output := influxdb.InfluxDB{
|
||||||
|
URL: u.String(),
|
||||||
|
Database: "telegraf",
|
||||||
|
DatabaseTag: "database",
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) {
|
||||||
|
return influxdb.NewHTTPClient(*config)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
err = output.Connect()
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = output.Write(metrics)
|
||||||
|
require.Error(t, err)
|
||||||
|
err = output.Write(metrics)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.True(t, handlers.Done(), "all handlers not called")
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue