Add support for connecting to InfluxDB over a unix domain socket (#3942)
This commit is contained in:
parent
bc9123848b
commit
937c7365af
|
@ -8,6 +8,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
|
@ -164,14 +165,32 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
|
|||
config.Consistency)
|
||||
queryURL := makeQueryURL(config.URL)
|
||||
|
||||
var transport *http.Transport
|
||||
switch config.URL.Scheme {
|
||||
case "http", "https":
|
||||
transport = &http.Transport{
|
||||
Proxy: proxy,
|
||||
TLSClientConfig: config.TLSConfig,
|
||||
}
|
||||
case "unix":
|
||||
transport = &http.Transport{
|
||||
Dial: func(_, _ string) (net.Conn, error) {
|
||||
return net.DialTimeout(
|
||||
config.URL.Scheme,
|
||||
config.URL.Path,
|
||||
defaultRequestTimeout,
|
||||
)
|
||||
},
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported scheme %q", config.URL.Scheme)
|
||||
}
|
||||
|
||||
client := &httpClient{
|
||||
serializer: serializer,
|
||||
client: &http.Client{
|
||||
Timeout: timeout,
|
||||
Transport: &http.Transport{
|
||||
Proxy: proxy,
|
||||
TLSClientConfig: config.TLSConfig,
|
||||
},
|
||||
Transport: transport,
|
||||
},
|
||||
database: database,
|
||||
url: config.URL,
|
||||
|
@ -392,13 +411,27 @@ func makeWriteURL(loc *url.URL, db, rp, consistency string) string {
|
|||
}
|
||||
|
||||
u := *loc
|
||||
switch u.Scheme {
|
||||
case "unix":
|
||||
u.Scheme = "http"
|
||||
u.Host = "127.0.0.1"
|
||||
u.Path = "/write"
|
||||
case "http":
|
||||
u.Path = path.Join(u.Path, "write")
|
||||
}
|
||||
u.RawQuery = params.Encode()
|
||||
return u.String()
|
||||
}
|
||||
|
||||
func makeQueryURL(loc *url.URL) string {
|
||||
u := *loc
|
||||
switch u.Scheme {
|
||||
case "unix":
|
||||
u.Scheme = "http"
|
||||
u.Host = "127.0.0.1"
|
||||
u.Path = "/query"
|
||||
case "http":
|
||||
u.Path = path.Join(u.Path, "query")
|
||||
}
|
||||
return u.String()
|
||||
}
|
||||
|
|
|
@ -7,9 +7,12 @@ import (
|
|||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -556,3 +559,80 @@ func TestHTTP_WriteContentEncodingGzip(t *testing.T) {
|
|||
err = client.Write(ctx, metrics)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestHTTP_UnixSocket(t *testing.T) {
|
||||
tmpdir, err := ioutil.TempDir("", "telegraf-test")
|
||||
if err != nil {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
defer os.RemoveAll(tmpdir)
|
||||
|
||||
sock := path.Join(tmpdir, "test.sock")
|
||||
listener, err := net.Listen("unix", sock)
|
||||
require.NoError(t, err)
|
||||
|
||||
ts := httptest.NewUnstartedServer(http.NotFoundHandler())
|
||||
ts.Listener = listener
|
||||
ts.Start()
|
||||
defer ts.Close()
|
||||
|
||||
x, _ := url.Parse("unix://" + sock)
|
||||
fmt.Println(x)
|
||||
|
||||
successResponse := []byte(`{"results": [{"statement_id": 0}]}`)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
config *influxdb.HTTPConfig
|
||||
database string
|
||||
queryHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
|
||||
writeHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
|
||||
errFunc func(t *testing.T, err error)
|
||||
}{
|
||||
{
|
||||
name: "success",
|
||||
config: &influxdb.HTTPConfig{
|
||||
URL: &url.URL{Scheme: "unix", Path: sock},
|
||||
Database: "xyzzy",
|
||||
},
|
||||
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||
require.Equal(t, `CREATE DATABASE "xyzzy"`, r.FormValue("q"))
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write(successResponse)
|
||||
},
|
||||
writeHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
w.Write(successResponse)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.URL.Path {
|
||||
case "/query":
|
||||
tt.queryHandlerFunc(t, w, r)
|
||||
return
|
||||
case "/write":
|
||||
tt.queryHandlerFunc(t, w, r)
|
||||
return
|
||||
default:
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
})
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
client, err := influxdb.NewHTTPClient(tt.config)
|
||||
require.NoError(t, err)
|
||||
err = client.CreateDatabase(ctx)
|
||||
if tt.errFunc != nil {
|
||||
tt.errFunc(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -70,6 +70,7 @@ var sampleConfig = `
|
|||
##
|
||||
## Multiple URLs can be specified for a single cluster, only ONE of the
|
||||
## urls will be written to each interval.
|
||||
# urls = ["unix:///var/run/influxdb.sock"]
|
||||
# urls = ["udp://127.0.0.1:8089"]
|
||||
# urls = ["http://127.0.0.1:8086"]
|
||||
|
||||
|
@ -157,7 +158,7 @@ func (i *InfluxDB) Connect() error {
|
|||
}
|
||||
|
||||
i.clients = append(i.clients, c)
|
||||
case "http", "https":
|
||||
case "http", "https", "unix":
|
||||
c, err := i.httpClient(ctx, u, proxy)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
Loading…
Reference in New Issue