Fix missing timeouts in vsphere input (#4840)

This commit is contained in:
Pontus Rydin 2018-10-11 16:08:09 -04:00 committed by Daniel Nelson
parent bde73d8328
commit c117ed624d
3 changed files with 85 additions and 31 deletions

View File

@ -34,6 +34,7 @@ type Client struct {
Root *view.ContainerView Root *view.ContainerView
Perf *performance.Manager Perf *performance.Manager
Valid bool Valid bool
Timeout time.Duration
closeGate sync.Once closeGate sync.Once
} }
@ -53,7 +54,7 @@ func (cf *ClientFactory) GetClient(ctx context.Context) (*Client, error) {
defer cf.mux.Unlock() defer cf.mux.Unlock()
if cf.client == nil { if cf.client == nil {
var err error var err error
if cf.client, err = NewClient(cf.url, cf.parent); err != nil { if cf.client, err = NewClient(ctx, cf.url, cf.parent); err != nil {
return nil, err return nil, err
} }
} }
@ -61,9 +62,13 @@ func (cf *ClientFactory) GetClient(ctx context.Context) (*Client, error) {
// Execute a dummy call against the server to make sure the client is // Execute a dummy call against the server to make sure the client is
// still functional. If not, try to log back in. If that doesn't work, // still functional. If not, try to log back in. If that doesn't work,
// we give up. // we give up.
if _, err := methods.GetCurrentTime(ctx, cf.client.Client); err != nil { ctx1, cancel1 := context.WithTimeout(ctx, cf.parent.Timeout.Duration)
defer cancel1()
if _, err := methods.GetCurrentTime(ctx1, cf.client.Client); err != nil {
log.Printf("I! [input.vsphere]: Client session seems to have time out. Reauthenticating!") log.Printf("I! [input.vsphere]: Client session seems to have time out. Reauthenticating!")
if cf.client.Client.SessionManager.Login(ctx, url.UserPassword(cf.parent.Username, cf.parent.Password)) != nil { ctx2, cancel2 := context.WithTimeout(ctx, cf.parent.Timeout.Duration)
defer cancel2()
if cf.client.Client.SessionManager.Login(ctx2, url.UserPassword(cf.parent.Username, cf.parent.Password)) != nil {
return nil, err return nil, err
} }
} }
@ -72,7 +77,7 @@ func (cf *ClientFactory) GetClient(ctx context.Context) (*Client, error) {
} }
// NewClient creates a new vSphere client based on the url and setting passed as parameters. // NewClient creates a new vSphere client based on the url and setting passed as parameters.
func NewClient(u *url.URL, vs *VSphere) (*Client, error) { func NewClient(ctx context.Context, u *url.URL, vs *VSphere) (*Client, error) {
sw := NewStopwatch("connect", u.Host) sw := NewStopwatch("connect", u.Host)
tlsCfg, err := vs.ClientConfig.TLSConfig() tlsCfg, err := vs.ClientConfig.TLSConfig()
if err != nil { if err != nil {
@ -85,7 +90,6 @@ func NewClient(u *url.URL, vs *VSphere) (*Client, error) {
if vs.Username != "" { if vs.Username != "" {
u.User = url.UserPassword(vs.Username, vs.Password) u.User = url.UserPassword(vs.Username, vs.Password)
} }
ctx := context.Background()
log.Printf("D! [input.vsphere]: Creating client: %s", u.Host) log.Printf("D! [input.vsphere]: Creating client: %s", u.Host)
soapClient := soap.NewClient(u, tlsCfg.InsecureSkipVerify) soapClient := soap.NewClient(u, tlsCfg.InsecureSkipVerify)
@ -103,7 +107,9 @@ func NewClient(u *url.URL, vs *VSphere) (*Client, error) {
} }
} }
vimClient, err := vim25.NewClient(ctx, soapClient) ctx1, cancel1 := context.WithTimeout(ctx, vs.Timeout.Duration)
defer cancel1()
vimClient, err := vim25.NewClient(ctx1, soapClient)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -111,7 +117,9 @@ func NewClient(u *url.URL, vs *VSphere) (*Client, error) {
// If TSLKey is specified, try to log in as an extension using a cert. // If TSLKey is specified, try to log in as an extension using a cert.
if vs.TLSKey != "" { if vs.TLSKey != "" {
if err := sm.LoginExtensionByCertificate(ctx, vs.TLSKey); err != nil { ctx2, cancel2 := context.WithTimeout(ctx, vs.Timeout.Duration)
defer cancel2()
if err := sm.LoginExtensionByCertificate(ctx2, vs.TLSKey); err != nil {
return nil, err return nil, err
} }
} }
@ -147,6 +155,7 @@ func NewClient(u *url.URL, vs *VSphere) (*Client, error) {
Root: v, Root: v,
Perf: p, Perf: p,
Valid: true, Valid: true,
Timeout: vs.Timeout.Duration,
}, nil }, nil
} }
@ -164,7 +173,8 @@ func (c *Client) close() {
// Use a Once to prevent us from panics stemming from trying // Use a Once to prevent us from panics stemming from trying
// to close it multiple times. // to close it multiple times.
c.closeGate.Do(func() { c.closeGate.Do(func() {
ctx := context.Background() ctx, cancel := context.WithTimeout(context.Background(), c.Timeout)
defer cancel()
if c.Client != nil { if c.Client != nil {
if err := c.Client.Logout(ctx); err != nil { if err := c.Client.Logout(ctx); err != nil {
log.Printf("E! [input.vsphere]: Error during logout: %s", err) log.Printf("E! [input.vsphere]: Error during logout: %s", err)

View File

@ -46,7 +46,7 @@ type resourceKind struct {
objects objectMap objects objectMap
filters filter.Filter filters filter.Filter
collectInstances bool collectInstances bool
getObjects func(context.Context, *view.ContainerView) (objectMap, error) getObjects func(context.Context, *Endpoint, *view.ContainerView) (objectMap, error)
} }
type metricEntry struct { type metricEntry struct {
@ -253,7 +253,9 @@ func (e *Endpoint) getMetricNameMap(ctx context.Context) (map[int32]string, erro
return nil, err return nil, err
} }
mn, err := client.Perf.CounterInfoByName(ctx) ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
mn, err := client.Perf.CounterInfoByName(ctx1)
if err != nil { if err != nil {
return nil, err return nil, err
@ -272,7 +274,9 @@ func (e *Endpoint) getMetadata(ctx context.Context, in interface{}) interface{}
} }
rq := in.(*metricQRequest) rq := in.(*metricQRequest)
metrics, err := client.Perf.AvailableMetric(ctx, rq.obj.ref.Reference(), rq.res.sampling) ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
metrics, err := client.Perf.AvailableMetric(ctx1, rq.obj.ref.Reference(), rq.res.sampling)
if err != nil && err != context.Canceled { if err != nil && err != context.Canceled {
log.Printf("E! [input.vsphere]: Error while getting metric metadata. Discovery will be incomplete. Error: %s", err) log.Printf("E! [input.vsphere]: Error while getting metric metadata. Discovery will be incomplete. Error: %s", err)
} }
@ -292,7 +296,9 @@ func (e *Endpoint) getDatacenterName(ctx context.Context, client *Client, cache
path = append(path, here.Reference().String()) path = append(path, here.Reference().String())
o := object.NewCommon(client.Client.Client, r) o := object.NewCommon(client.Client.Client, r)
var result mo.ManagedEntity var result mo.ManagedEntity
err := o.Properties(ctx, here, []string{"parent", "name"}, &result) ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
err := o.Properties(ctx1, here, []string{"parent", "name"}, &result)
if err != nil { if err != nil {
log.Printf("W! [input.vsphere]: Error while resolving parent. Assuming no parent exists. Error: %s", err) log.Printf("W! [input.vsphere]: Error while resolving parent. Assuming no parent exists. Error: %s", err)
break break
@ -344,7 +350,7 @@ func (e *Endpoint) discover(ctx context.Context) error {
log.Printf("D! [input.vsphere] Discovering resources for %s", res.name) log.Printf("D! [input.vsphere] Discovering resources for %s", res.name)
// Need to do this for all resource types even if they are not enabled (but datastore) // Need to do this for all resource types even if they are not enabled (but datastore)
if res.enabled || (k != "datastore" && k != "vm") { if res.enabled || (k != "datastore" && k != "vm") {
objects, err := res.getObjects(ctx, client.Root) objects, err := res.getObjects(ctx, e, client.Root)
if err != nil { if err != nil {
return err return err
} }
@ -411,9 +417,11 @@ func (e *Endpoint) discover(ctx context.Context) error {
return nil return nil
} }
func getDatacenters(ctx context.Context, root *view.ContainerView) (objectMap, error) { func getDatacenters(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) {
var resources []mo.Datacenter var resources []mo.Datacenter
err := root.Retrieve(ctx, []string{"Datacenter"}, []string{"name", "parent"}, &resources) ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
err := root.Retrieve(ctx1, []string{"Datacenter"}, []string{"name", "parent"}, &resources)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -425,9 +433,11 @@ func getDatacenters(ctx context.Context, root *view.ContainerView) (objectMap, e
return m, nil return m, nil
} }
func getClusters(ctx context.Context, root *view.ContainerView) (objectMap, error) { func getClusters(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) {
var resources []mo.ClusterComputeResource var resources []mo.ClusterComputeResource
err := root.Retrieve(ctx, []string{"ClusterComputeResource"}, []string{"name", "parent"}, &resources) ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
err := root.Retrieve(ctx1, []string{"ClusterComputeResource"}, []string{"name", "parent"}, &resources)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -439,7 +449,9 @@ func getClusters(ctx context.Context, root *view.ContainerView) (objectMap, erro
if !ok { if !ok {
o := object.NewFolder(root.Client(), *r.Parent) o := object.NewFolder(root.Client(), *r.Parent)
var folder mo.Folder var folder mo.Folder
err := o.Properties(ctx, *r.Parent, []string{"parent"}, &folder) ctx2, cancel2 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel2()
err := o.Properties(ctx2, *r.Parent, []string{"parent"}, &folder)
if err != nil { if err != nil {
log.Printf("W! [input.vsphere] Error while getting folder parent: %e", err) log.Printf("W! [input.vsphere] Error while getting folder parent: %e", err)
p = nil p = nil
@ -455,7 +467,7 @@ func getClusters(ctx context.Context, root *view.ContainerView) (objectMap, erro
return m, nil return m, nil
} }
func getHosts(ctx context.Context, root *view.ContainerView) (objectMap, error) { func getHosts(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) {
var resources []mo.HostSystem var resources []mo.HostSystem
err := root.Retrieve(ctx, []string{"HostSystem"}, []string{"name", "parent"}, &resources) err := root.Retrieve(ctx, []string{"HostSystem"}, []string{"name", "parent"}, &resources)
if err != nil { if err != nil {
@ -469,9 +481,11 @@ func getHosts(ctx context.Context, root *view.ContainerView) (objectMap, error)
return m, nil return m, nil
} }
func getVMs(ctx context.Context, root *view.ContainerView) (objectMap, error) { func getVMs(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) {
var resources []mo.VirtualMachine var resources []mo.VirtualMachine
err := root.Retrieve(ctx, []string{"VirtualMachine"}, []string{"name", "runtime.host", "config.guestId", "config.uuid"}, &resources) ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
err := root.Retrieve(ctx1, []string{"VirtualMachine"}, []string{"name", "runtime.host", "config.guestId", "config.uuid"}, &resources)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -491,9 +505,11 @@ func getVMs(ctx context.Context, root *view.ContainerView) (objectMap, error) {
return m, nil return m, nil
} }
func getDatastores(ctx context.Context, root *view.ContainerView) (objectMap, error) { func getDatastores(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) {
var resources []mo.Datastore var resources []mo.Datastore
err := root.Retrieve(ctx, []string{"Datastore"}, []string{"name", "parent"}, &resources) ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
err := root.Retrieve(ctx1, []string{"Datastore"}, []string{"name", "parent"}, &resources)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -696,17 +712,23 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec,
return 0, err return 0, err
} }
metricInfo, err := client.Perf.CounterInfoByName(ctx) ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
metricInfo, err := client.Perf.CounterInfoByName(ctx1)
if err != nil { if err != nil {
return count, err return count, err
} }
metrics, err := client.Perf.Query(ctx, pqs) ctx2, cancel2 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel2()
metrics, err := client.Perf.Query(ctx2, pqs)
if err != nil { if err != nil {
return count, err return count, err
} }
ems, err := client.Perf.ToMetricSeries(ctx, metrics) ctx3, cancel3 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel3()
ems, err := client.Perf.ToMetricSeries(ctx3, metrics)
if err != nil { if err != nil {
return count, err return count, err
} }

View File

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"regexp" "regexp"
"sort" "sort"
"strings"
"testing" "testing"
"time" "time"
@ -229,6 +230,27 @@ func TestWorkerPool(t *testing.T) {
} }
} }
func TestTimeout(t *testing.T) {
m, s, err := createSim()
if err != nil {
t.Fatal(err)
}
defer m.Remove()
defer s.Close()
var acc testutil.Accumulator
v := defaultVSphere()
v.Vcenters = []string{s.URL.String()}
v.Timeout = internal.Duration{Duration: 1 * time.Nanosecond}
require.NoError(t, v.Start(nil)) // We're not using the Accumulator, so it can be nil.
defer v.Stop()
require.NoError(t, v.Gather(&acc))
// The accumulator must contain exactly one error and it must be a deadline exceeded.
require.Equal(t, 1, len(acc.Errors))
require.True(t, strings.Contains(acc.Errors[0].Error(), "context deadline exceeded"))
}
func TestAll(t *testing.T) { func TestAll(t *testing.T) {
m, s, err := createSim() m, s, err := createSim()
if err != nil { if err != nil {