Close HTTP2 connections on timeout in influxdb outputs (#7517)
This commit is contained in:
parent
443ac6df23
commit
edd8338180
|
@ -4,6 +4,7 @@ import (
|
||||||
"crypto/subtle"
|
"crypto/subtle"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/url"
|
||||||
)
|
)
|
||||||
|
|
||||||
type BasicAuthErrorFunc func(rw http.ResponseWriter)
|
type BasicAuthErrorFunc func(rw http.ResponseWriter)
|
||||||
|
@ -95,3 +96,13 @@ func (h *ipRangeHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
||||||
|
|
||||||
h.onError(rw, http.StatusForbidden)
|
h.onError(rw, http.StatusForbidden)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func OnClientError(client *http.Client, err error) {
|
||||||
|
// Close connection after a timeout error. If this is a HTTP2
|
||||||
|
// connection this ensures that next interval a new connection will be
|
||||||
|
// used and name lookup will be performed.
|
||||||
|
// https://github.com/golang/go/issues/36026
|
||||||
|
if err, ok := err.(*url.Error); ok && err.Timeout() {
|
||||||
|
client.CloseIdleConnections()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,15 +0,0 @@
|
||||||
// +build !go1.12
|
|
||||||
|
|
||||||
package internal
|
|
||||||
|
|
||||||
import "net/http"
|
|
||||||
|
|
||||||
func CloseIdleConnections(c *http.Client) {
|
|
||||||
type closeIdler interface {
|
|
||||||
CloseIdleConnections()
|
|
||||||
}
|
|
||||||
|
|
||||||
if tr, ok := c.Transport.(closeIdler); ok {
|
|
||||||
tr.CloseIdleConnections()
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,9 +0,0 @@
|
||||||
// +build go1.12
|
|
||||||
|
|
||||||
package internal
|
|
||||||
|
|
||||||
import "net/http"
|
|
||||||
|
|
||||||
func CloseIdleConnections(c *http.Client) {
|
|
||||||
c.CloseIdleConnections()
|
|
||||||
}
|
|
|
@ -209,6 +209,7 @@ func (c *httpClient) CreateDatabase(ctx context.Context, database string) error
|
||||||
|
|
||||||
resp, err := c.client.Do(req.WithContext(ctx))
|
resp, err := c.client.Do(req.WithContext(ctx))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
internal.OnClientError(c.client, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
@ -311,7 +312,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *httpClient) writeBatch(ctx context.Context, db, rp string, metrics []telegraf.Metric) error {
|
func (c *httpClient) writeBatch(ctx context.Context, db, rp string, metrics []telegraf.Metric) error {
|
||||||
url, err := makeWriteURL(c.config.URL, db, rp, c.config.Consistency)
|
loc, err := makeWriteURL(c.config.URL, db, rp, c.config.Consistency)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -322,13 +323,14 @@ func (c *httpClient) writeBatch(ctx context.Context, db, rp string, metrics []te
|
||||||
}
|
}
|
||||||
defer reader.Close()
|
defer reader.Close()
|
||||||
|
|
||||||
req, err := c.makeWriteRequest(url, reader)
|
req, err := c.makeWriteRequest(loc, reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := c.client.Do(req.WithContext(ctx))
|
resp, err := c.client.Do(req.WithContext(ctx))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
internal.OnClientError(c.client, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
@ -505,5 +507,5 @@ func makeQueryURL(loc *url.URL) (string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *httpClient) Close() {
|
func (c *httpClient) Close() {
|
||||||
internal.CloseIdleConnections(c.client)
|
c.client.CloseIdleConnections()
|
||||||
}
|
}
|
||||||
|
|
|
@ -210,7 +210,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []telegraf.Metric) error {
|
func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []telegraf.Metric) error {
|
||||||
url, err := makeWriteURL(*c.url, c.Organization, bucket)
|
loc, err := makeWriteURL(*c.url, c.Organization, bucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -221,13 +221,14 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
|
||||||
}
|
}
|
||||||
defer reader.Close()
|
defer reader.Close()
|
||||||
|
|
||||||
req, err := c.makeWriteRequest(url, reader)
|
req, err := c.makeWriteRequest(loc, reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := c.client.Do(req.WithContext(ctx))
|
resp, err := c.client.Do(req.WithContext(ctx))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
internal.OnClientError(c.client, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
@ -347,5 +348,5 @@ func makeWriteURL(loc url.URL, org, bucket string) (string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *httpClient) Close() {
|
func (c *httpClient) Close() {
|
||||||
internal.CloseIdleConnections(c.client)
|
c.client.CloseIdleConnections()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue