Add support for connecting to InfluxDB over a unix domain socket (#3942)

This commit is contained in:
Daniel Nelson 2018-03-27 18:36:08 -07:00 committed by GitHub
parent b0b18df0bf
commit 43bd23e555
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 122 additions and 8 deletions

View File

@ -8,6 +8,7 @@ import (
"fmt" "fmt"
"io" "io"
"log" "log"
"net"
"net/http" "net/http"
"net/url" "net/url"
"path" "path"
@ -164,14 +165,32 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
config.Consistency) config.Consistency)
queryURL := makeQueryURL(config.URL) queryURL := makeQueryURL(config.URL)
var transport *http.Transport
switch config.URL.Scheme {
case "http", "https":
transport = &http.Transport{
Proxy: proxy,
TLSClientConfig: config.TLSConfig,
}
case "unix":
transport = &http.Transport{
Dial: func(_, _ string) (net.Conn, error) {
return net.DialTimeout(
config.URL.Scheme,
config.URL.Path,
defaultRequestTimeout,
)
},
}
default:
return nil, fmt.Errorf("unsupported scheme %q", config.URL.Scheme)
}
client := &httpClient{ client := &httpClient{
serializer: serializer, serializer: serializer,
client: &http.Client{ client: &http.Client{
Timeout: timeout, Timeout: timeout,
Transport: &http.Transport{ Transport: transport,
Proxy: proxy,
TLSClientConfig: config.TLSConfig,
},
}, },
database: database, database: database,
url: config.URL, url: config.URL,
@ -392,13 +411,27 @@ func makeWriteURL(loc *url.URL, db, rp, consistency string) string {
} }
u := *loc u := *loc
switch u.Scheme {
case "unix":
u.Scheme = "http"
u.Host = "127.0.0.1"
u.Path = "/write"
case "http":
u.Path = path.Join(u.Path, "write") u.Path = path.Join(u.Path, "write")
}
u.RawQuery = params.Encode() u.RawQuery = params.Encode()
return u.String() return u.String()
} }
func makeQueryURL(loc *url.URL) string { func makeQueryURL(loc *url.URL) string {
u := *loc u := *loc
switch u.Scheme {
case "unix":
u.Scheme = "http"
u.Host = "127.0.0.1"
u.Path = "/query"
case "http":
u.Path = path.Join(u.Path, "query") u.Path = path.Join(u.Path, "query")
}
return u.String() return u.String()
} }

View File

@ -7,9 +7,12 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"net"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url" "net/url"
"os"
"path"
"strings" "strings"
"testing" "testing"
"time" "time"
@ -556,3 +559,80 @@ func TestHTTP_WriteContentEncodingGzip(t *testing.T) {
err = client.Write(ctx, metrics) err = client.Write(ctx, metrics)
require.NoError(t, err) require.NoError(t, err)
} }
func TestHTTP_UnixSocket(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "telegraf-test")
if err != nil {
require.NoError(t, err)
}
defer os.RemoveAll(tmpdir)
sock := path.Join(tmpdir, "test.sock")
listener, err := net.Listen("unix", sock)
require.NoError(t, err)
ts := httptest.NewUnstartedServer(http.NotFoundHandler())
ts.Listener = listener
ts.Start()
defer ts.Close()
x, _ := url.Parse("unix://" + sock)
fmt.Println(x)
successResponse := []byte(`{"results": [{"statement_id": 0}]}`)
tests := []struct {
name string
config *influxdb.HTTPConfig
database string
queryHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
writeHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
errFunc func(t *testing.T, err error)
}{
{
name: "success",
config: &influxdb.HTTPConfig{
URL: &url.URL{Scheme: "unix", Path: sock},
Database: "xyzzy",
},
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, `CREATE DATABASE "xyzzy"`, r.FormValue("q"))
w.WriteHeader(http.StatusOK)
w.Write(successResponse)
},
writeHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
w.Write(successResponse)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/query":
tt.queryHandlerFunc(t, w, r)
return
case "/write":
tt.queryHandlerFunc(t, w, r)
return
default:
w.WriteHeader(http.StatusNotFound)
return
}
})
ctx := context.Background()
client, err := influxdb.NewHTTPClient(tt.config)
require.NoError(t, err)
err = client.CreateDatabase(ctx)
if tt.errFunc != nil {
tt.errFunc(t, err)
} else {
require.NoError(t, err)
}
})
}
}

View File

@ -70,6 +70,7 @@ var sampleConfig = `
## ##
## Multiple URLs can be specified for a single cluster, only ONE of the ## Multiple URLs can be specified for a single cluster, only ONE of the
## urls will be written to each interval. ## urls will be written to each interval.
# urls = ["unix:///var/run/influxdb.sock"]
# urls = ["udp://127.0.0.1:8089"] # urls = ["udp://127.0.0.1:8089"]
# urls = ["http://127.0.0.1:8086"] # urls = ["http://127.0.0.1:8086"]
@ -157,7 +158,7 @@ func (i *InfluxDB) Connect() error {
} }
i.clients = append(i.clients, c) i.clients = append(i.clients, c)
case "http", "https": case "http", "https", "unix":
c, err := i.httpClient(ctx, u, proxy) c, err := i.httpClient(ctx, u, proxy)
if err != nil { if err != nil {
return err return err