Close idle connections in influxdb outputs when reloading (#5912)
This commit is contained in:
		
							parent
							
								
									9cc3b3d234
								
							
						
					
					
						commit
						17d66b864c
					
				|  | @ -0,0 +1,15 @@ | ||||||
|  | // +build !go1.12
 | ||||||
|  | 
 | ||||||
|  | package internal | ||||||
|  | 
 | ||||||
|  | import "net/http" | ||||||
|  | 
 | ||||||
|  | func CloseIdleConnections(c *http.Client) { | ||||||
|  | 	type closeIdler interface { | ||||||
|  | 		CloseIdleConnections() | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if tr, ok := c.Transport.(closeIdler); ok { | ||||||
|  | 		tr.CloseIdleConnections() | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | @ -0,0 +1,9 @@ | ||||||
|  | // +build go1.12
 | ||||||
|  | 
 | ||||||
|  | package internal | ||||||
|  | 
 | ||||||
|  | import "net/http" | ||||||
|  | 
 | ||||||
|  | func CloseIdleConnections(c *http.Client) { | ||||||
|  | 	c.CloseIdleConnections() | ||||||
|  | } | ||||||
|  | @ -448,3 +448,7 @@ func makeQueryURL(loc *url.URL) (string, error) { | ||||||
| 	} | 	} | ||||||
| 	return u.String(), nil | 	return u.String(), nil | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func (c *httpClient) Close() { | ||||||
|  | 	internal.CloseIdleConnections(c.client) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | @ -27,6 +27,7 @@ type Client interface { | ||||||
| 	CreateDatabase(ctx context.Context, database string) error | 	CreateDatabase(ctx context.Context, database string) error | ||||||
| 	Database() string | 	Database() string | ||||||
| 	URL() string | 	URL() string | ||||||
|  | 	Close() | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // InfluxDB struct is the primary data structure for the plugin
 | // InfluxDB struct is the primary data structure for the plugin
 | ||||||
|  | @ -183,6 +184,9 @@ func (i *InfluxDB) Connect() error { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (i *InfluxDB) Close() error { | func (i *InfluxDB) Close() error { | ||||||
|  | 	for _, client := range i.clients { | ||||||
|  | 		client.Close() | ||||||
|  | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -19,6 +19,7 @@ type MockClient struct { | ||||||
| 	WriteF          func(context.Context, []telegraf.Metric) error | 	WriteF          func(context.Context, []telegraf.Metric) error | ||||||
| 	CreateDatabaseF func(ctx context.Context, database string) error | 	CreateDatabaseF func(ctx context.Context, database string) error | ||||||
| 	DatabaseF       func() string | 	DatabaseF       func() string | ||||||
|  | 	CloseF          func() | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *MockClient) URL() string { | func (c *MockClient) URL() string { | ||||||
|  | @ -37,6 +38,10 @@ func (c *MockClient) Database() string { | ||||||
| 	return c.DatabaseF() | 	return c.DatabaseF() | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (c *MockClient) Close() { | ||||||
|  | 	c.CloseF() | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func TestDeprecatedURLSupport(t *testing.T) { | func TestDeprecatedURLSupport(t *testing.T) { | ||||||
| 	var actual *influxdb.UDPConfig | 	var actual *influxdb.UDPConfig | ||||||
| 	output := influxdb.InfluxDB{ | 	output := influxdb.InfluxDB{ | ||||||
|  |  | ||||||
|  | @ -136,3 +136,6 @@ func scanLines(data []byte, atEOF bool) (advance int, token []byte, err error) { | ||||||
| 	} | 	} | ||||||
| 	return 0, nil, nil | 	return 0, nil, nil | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func (c *udpClient) Close() { | ||||||
|  | } | ||||||
|  |  | ||||||
|  | @ -307,3 +307,7 @@ func makeWriteURL(loc url.URL, org, bucket string) (string, error) { | ||||||
| 	loc.RawQuery = params.Encode() | 	loc.RawQuery = params.Encode() | ||||||
| 	return loc.String(), nil | 	return loc.String(), nil | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func (c *httpClient) Close() { | ||||||
|  | 	internal.CloseIdleConnections(c.client) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | @ -74,6 +74,7 @@ type Client interface { | ||||||
| 	Write(context.Context, []telegraf.Metric) error | 	Write(context.Context, []telegraf.Metric) error | ||||||
| 
 | 
 | ||||||
| 	URL() string // for logging
 | 	URL() string // for logging
 | ||||||
|  | 	Close() | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type InfluxDB struct { | type InfluxDB struct { | ||||||
|  | @ -137,6 +138,9 @@ func (i *InfluxDB) Connect() error { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (i *InfluxDB) Close() error { | func (i *InfluxDB) Close() error { | ||||||
|  | 	for _, client := range i.clients { | ||||||
|  | 		client.Close() | ||||||
|  | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue