diff --git a/plugins/outputs/influxdb/http.go b/plugins/outputs/influxdb/http.go index 852784f0d..fdbb5bd8c 100644 --- a/plugins/outputs/influxdb/http.go +++ b/plugins/outputs/influxdb/http.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "log" + "net" "net/http" "net/url" "path" @@ -164,14 +165,32 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) { config.Consistency) queryURL := makeQueryURL(config.URL) + var transport *http.Transport + switch config.URL.Scheme { + case "http", "https": + transport = &http.Transport{ + Proxy: proxy, + TLSClientConfig: config.TLSConfig, + } + case "unix": + transport = &http.Transport{ + Dial: func(_, _ string) (net.Conn, error) { + return net.DialTimeout( + config.URL.Scheme, + config.URL.Path, + defaultRequestTimeout, + ) + }, + } + default: + return nil, fmt.Errorf("unsupported scheme %q", config.URL.Scheme) + } + client := &httpClient{ serializer: serializer, client: &http.Client{ - Timeout: timeout, - Transport: &http.Transport{ - Proxy: proxy, - TLSClientConfig: config.TLSConfig, - }, + Timeout: timeout, + Transport: transport, }, database: database, url: config.URL, @@ -392,13 +411,27 @@ func makeWriteURL(loc *url.URL, db, rp, consistency string) string { } u := *loc - u.Path = path.Join(u.Path, "write") + switch u.Scheme { + case "unix": + u.Scheme = "http" + u.Host = "127.0.0.1" + u.Path = "/write" + case "http": + u.Path = path.Join(u.Path, "write") + } u.RawQuery = params.Encode() return u.String() } func makeQueryURL(loc *url.URL) string { u := *loc - u.Path = path.Join(u.Path, "query") + switch u.Scheme { + case "unix": + u.Scheme = "http" + u.Host = "127.0.0.1" + u.Path = "/query" + case "http": + u.Path = path.Join(u.Path, "query") + } return u.String() } diff --git a/plugins/outputs/influxdb/http_test.go b/plugins/outputs/influxdb/http_test.go index d6463c050..e84957fcf 100644 --- a/plugins/outputs/influxdb/http_test.go +++ b/plugins/outputs/influxdb/http_test.go @@ -7,9 +7,12 @@ import ( "fmt" "io/ioutil" "log" + "net" "net/http" "net/http/httptest" "net/url" + "os" + "path" "strings" "testing" "time" @@ -556,3 +559,80 @@ func TestHTTP_WriteContentEncodingGzip(t *testing.T) { err = client.Write(ctx, metrics) require.NoError(t, err) } + +func TestHTTP_UnixSocket(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "telegraf-test") + if err != nil { + require.NoError(t, err) + } + defer os.RemoveAll(tmpdir) + + sock := path.Join(tmpdir, "test.sock") + listener, err := net.Listen("unix", sock) + require.NoError(t, err) + + ts := httptest.NewUnstartedServer(http.NotFoundHandler()) + ts.Listener = listener + ts.Start() + defer ts.Close() + + x, _ := url.Parse("unix://" + sock) + fmt.Println(x) + + successResponse := []byte(`{"results": [{"statement_id": 0}]}`) + + tests := []struct { + name string + config *influxdb.HTTPConfig + database string + queryHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request) + writeHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request) + errFunc func(t *testing.T, err error) + }{ + { + name: "success", + config: &influxdb.HTTPConfig{ + URL: &url.URL{Scheme: "unix", Path: sock}, + Database: "xyzzy", + }, + queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + require.Equal(t, `CREATE DATABASE "xyzzy"`, r.FormValue("q")) + w.WriteHeader(http.StatusOK) + w.Write(successResponse) + }, + writeHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNoContent) + w.Write(successResponse) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/query": + tt.queryHandlerFunc(t, w, r) + return + case "/write": + tt.queryHandlerFunc(t, w, r) + return + default: + w.WriteHeader(http.StatusNotFound) + return + } + }) + + ctx := context.Background() + + client, err := influxdb.NewHTTPClient(tt.config) + require.NoError(t, err) + err = client.CreateDatabase(ctx) + if tt.errFunc != nil { + tt.errFunc(t, err) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index 5f38263b8..2e6e0d9c6 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -70,6 +70,7 @@ var sampleConfig = ` ## ## Multiple URLs can be specified for a single cluster, only ONE of the ## urls will be written to each interval. + # urls = ["unix:///var/run/influxdb.sock"] # urls = ["udp://127.0.0.1:8089"] # urls = ["http://127.0.0.1:8086"] @@ -157,7 +158,7 @@ func (i *InfluxDB) Connect() error { } i.clients = append(i.clients, c) - case "http", "https": + case "http", "https", "unix": c, err := i.httpClient(ctx, u, proxy) if err != nil { return err