package client import ( "bytes" "compress/gzip" "crypto/tls" "encoding/json" "fmt" "io" "io/ioutil" "net/http" "net/url" "path" "time" ) var ( defaultRequestTimeout = time.Second * 5 ) func NewHTTP(config HTTPConfig, defaultWP WriteParams) (Client, error) { // validate required parameters: if len(config.URL) == 0 { return nil, fmt.Errorf("config.URL is required to create an HTTP client") } if len(defaultWP.Database) == 0 { return nil, fmt.Errorf("A default database is required to create an HTTP client") } // set defaults: if config.Timeout == 0 { config.Timeout = defaultRequestTimeout } // parse URL: u, err := url.Parse(config.URL) if err != nil { return nil, fmt.Errorf("error parsing config.URL: %s", err) } if u.Scheme != "http" && u.Scheme != "https" { return nil, fmt.Errorf("config.URL scheme must be http(s), got %s", u.Scheme) } var transport http.Transport if len(config.HTTPProxy) > 0 { proxyURL, err := url.Parse(config.HTTPProxy) if err != nil { return nil, fmt.Errorf("error parsing config.HTTPProxy: %s", err) } transport = http.Transport{ Proxy: http.ProxyURL(proxyURL), TLSClientConfig: config.TLSConfig, } } else { transport = http.Transport{ Proxy: http.ProxyFromEnvironment, TLSClientConfig: config.TLSConfig, } } return &httpClient{ writeURL: writeURL(u, defaultWP), config: config, url: u, client: &http.Client{ Timeout: config.Timeout, Transport: &transport, }, }, nil } type HTTPHeaders map[string]string type HTTPConfig struct { // URL should be of the form "http://host:port" (REQUIRED) URL string // UserAgent sets the User-Agent header. UserAgent string // Timeout specifies a time limit for requests made by this // Client. The timeout includes connection time, any // redirects, and reading the response body. The timer remains // running after Get, Head, Post, or Do return and will // interrupt reading of the Response.Body. // // A Timeout of zero means no timeout. Timeout time.Duration // Username is the basic auth username for the server. Username string // Password is the basic auth password for the server. Password string // TLSConfig is the tls auth settings to use for each request. TLSConfig *tls.Config // Proxy URL should be of the form "http://host:port" HTTPProxy string // HTTP headers to append to HTTP requests. HTTPHeaders HTTPHeaders // The content encoding mechanism to use for each request. ContentEncoding string } // Response represents a list of statement results. type Response struct { // ignore Results: Results []interface{} `json:"-"` Err string `json:"error,omitempty"` } // Error returns the first error from any statement. // Returns nil if no errors occurred on any statements. func (r *Response) Error() error { if r.Err != "" { return fmt.Errorf(r.Err) } return nil } type httpClient struct { writeURL string config HTTPConfig client *http.Client url *url.URL } func (c *httpClient) Query(command string) error { req, err := c.makeRequest(queryURL(c.url, command), bytes.NewReader([]byte(""))) if err != nil { return err } return c.doRequest(req, http.StatusOK) } func (c *httpClient) WriteStream(r io.Reader) error { req, err := c.makeWriteRequest(r, c.writeURL) if err != nil { return err } return c.doRequest(req, http.StatusNoContent) } func (c *httpClient) doRequest( req *http.Request, expectedCode int, ) error { resp, err := c.client.Do(req) if err != nil { return err } code := resp.StatusCode // If it's a "no content" response, then release and return nil if code == http.StatusNoContent { return nil } // not a "no content" response, so parse the result: var response Response body, err := ioutil.ReadAll(resp.Body) if err != nil { return fmt.Errorf("Fatal error reading body: %s", err) } decErr := json.Unmarshal(body, &response) // If we got a JSON decode error, send that back if decErr != nil { err = fmt.Errorf("Unable to decode json: received status code %d err: %s", code, decErr) } // Unexpected response code OR error in JSON response body overrides // a JSON decode error: if code != expectedCode || response.Error() != nil { err = fmt.Errorf("Response Error: Status Code [%d], expected [%d], [%v]", code, expectedCode, response.Error()) } return err } func (c *httpClient) makeWriteRequest( body io.Reader, writeURL string, ) (*http.Request, error) { req, err := c.makeRequest(writeURL, body) if err != nil { return nil, err } if c.config.ContentEncoding == "gzip" { req.Header.Set("Content-Encoding", "gzip") } return req, nil } func (c *httpClient) makeRequest(uri string, body io.Reader) (*http.Request, error) { var req *http.Request var err error if c.config.ContentEncoding == "gzip" { body, err = compressWithGzip(body) if err != nil { return nil, err } } req, err = http.NewRequest("POST", uri, body) if err != nil { return nil, err } req.Header.Set("Content-Type", "text/plain; charset=utf-8") for header, value := range c.config.HTTPHeaders { req.Header.Set(header, value) } req.Header.Set("User-Agent", c.config.UserAgent) if c.config.Username != "" && c.config.Password != "" { req.SetBasicAuth(c.config.Username, c.config.Password) } return req, nil } func compressWithGzip(data io.Reader) (io.Reader, error) { pr, pw := io.Pipe() gw := gzip.NewWriter(pw) var err error go func() { _, err = io.Copy(gw, data) gw.Close() pw.Close() }() return pr, err } func (c *httpClient) Close() error { // Nothing to do. return nil } func writeURL(u *url.URL, wp WriteParams) string { params := url.Values{} params.Set("db", wp.Database) if wp.RetentionPolicy != "" { params.Set("rp", wp.RetentionPolicy) } if wp.Precision != "n" && wp.Precision != "" { params.Set("precision", wp.Precision) } if wp.Consistency != "one" && wp.Consistency != "" { params.Set("consistency", wp.Consistency) } u.RawQuery = params.Encode() p := u.Path u.Path = path.Join(p, "write") s := u.String() u.Path = p return s } func queryURL(u *url.URL, command string) string { params := url.Values{} params.Set("q", command) u.RawQuery = params.Encode() p := u.Path u.Path = path.Join(p, "query") s := u.String() u.Path = p return s }