diff --git a/plugins/outputs/influxdb/http.go b/plugins/outputs/influxdb/http.go index 4f3d57e63..164261feb 100644 --- a/plugins/outputs/influxdb/http.go +++ b/plugins/outputs/influxdb/http.go @@ -159,12 +159,18 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) { serializer = influx.NewSerializer() } - writeURL := makeWriteURL( + writeURL, err := makeWriteURL( config.URL, database, config.RetentionPolicy, config.Consistency) - queryURL := makeQueryURL(config.URL) + if err != nil { + return nil, err + } + queryURL, err := makeQueryURL(config.URL) + if err != nil { + return nil, err + } var transport *http.Transport switch config.URL.Scheme { @@ -399,7 +405,7 @@ func (c *httpClient) addHeaders(req *http.Request) { } } -func makeWriteURL(loc *url.URL, db, rp, consistency string) string { +func makeWriteURL(loc *url.URL, db, rp, consistency string) (string, error) { params := url.Values{} params.Set("db", db) @@ -417,24 +423,26 @@ func makeWriteURL(loc *url.URL, db, rp, consistency string) string { u.Scheme = "http" u.Host = "127.0.0.1" u.Path = "/write" - case "http": - case "https": + case "http", "https": u.Path = path.Join(u.Path, "write") + default: + return "", fmt.Errorf("unsupported scheme: %q", loc.Scheme) } u.RawQuery = params.Encode() - return u.String() + return u.String(), nil } -func makeQueryURL(loc *url.URL) string { +func makeQueryURL(loc *url.URL) (string, error) { u := *loc switch u.Scheme { case "unix": u.Scheme = "http" u.Host = "127.0.0.1" u.Path = "/query" - case "http": - case "https": + case "http", "https": u.Path = path.Join(u.Path, "query") + default: + return "", fmt.Errorf("unsupported scheme: %q", loc.Scheme) } - return u.String() + return u.String(), nil } diff --git a/plugins/outputs/influxdb/http_test.go b/plugins/outputs/influxdb/http_test.go index e84957fcf..30cc1f8b6 100644 --- a/plugins/outputs/influxdb/http_test.go +++ b/plugins/outputs/influxdb/http_test.go @@ -46,6 +46,17 @@ func TestHTTP_MinimalConfig(t *testing.T) { require.NoError(t, err) } +func TestHTTP_UnsupportedScheme(t *testing.T) { + config := &influxdb.HTTPConfig{ + URL: &url.URL{ + Scheme: "foo", + Host: "localhost", + }, + } + _, err := influxdb.NewHTTPClient(config) + require.Error(t, err) +} + func TestHTTP_CreateDatabase(t *testing.T) { ts := httptest.NewServer(http.NotFoundHandler()) defer ts.Close() @@ -576,9 +587,6 @@ func TestHTTP_UnixSocket(t *testing.T) { ts.Start() defer ts.Close() - x, _ := url.Parse("unix://" + sock) - fmt.Println(x) - successResponse := []byte(`{"results": [{"statement_id": 0}]}`) tests := []struct {