Fix discovery race condition in vsphere input (#5217)

This commit is contained in:
Pontus Rydin 2019-01-03 14:30:05 -05:00 committed by Daniel Nelson
parent 184f7b6a8b
commit 3356f1dc82
1 changed files with 11 additions and 9 deletions

View File

@ -39,7 +39,7 @@ const maxMetadataSamples = 100 // Number of resources to sample for metric metad
type Endpoint struct { type Endpoint struct {
Parent *VSphere Parent *VSphere
URL *url.URL URL *url.URL
resourceKinds map[string]resourceKind resourceKinds map[string]*resourceKind
hwMarks *TSCache hwMarks *TSCache
lun2ds map[string]string lun2ds map[string]string
discoveryTicker *time.Ticker discoveryTicker *time.Ticker
@ -107,7 +107,7 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint,
clientFactory: NewClientFactory(ctx, url, parent), clientFactory: NewClientFactory(ctx, url, parent),
} }
e.resourceKinds = map[string]resourceKind{ e.resourceKinds = map[string]*resourceKind{
"datacenter": { "datacenter": {
name: "datacenter", name: "datacenter",
pKey: "dcname", pKey: "dcname",
@ -363,6 +363,7 @@ func (e *Endpoint) discover(ctx context.Context) error {
numRes := int64(0) numRes := int64(0)
// Populate resource objects, and endpoint instance info. // Populate resource objects, and endpoint instance info.
newObjects := make(map[string]objectMap)
for k, res := range e.resourceKinds { for k, res := range e.resourceKinds {
log.Printf("D! [input.vsphere] Discovering resources for %s", res.name) log.Printf("D! [input.vsphere] Discovering resources for %s", res.name)
// Need to do this for all resource types even if they are not enabled // Need to do this for all resource types even if they are not enabled
@ -385,13 +386,12 @@ func (e *Endpoint) discover(ctx context.Context) error {
// No need to collect metric metadata if resource type is not enabled // No need to collect metric metadata if resource type is not enabled
if res.enabled { if res.enabled {
if res.simple { if res.simple {
e.simpleMetadataSelect(ctx, client, &res) e.simpleMetadataSelect(ctx, client, res)
} else { } else {
e.complexMetadataSelect(ctx, &res, objects, metricNames) e.complexMetadataSelect(ctx, res, objects, metricNames)
} }
} }
res.objects = objects newObjects[k] = objects
resourceKinds[k] = res
SendInternalCounterWithTags("discovered_objects", e.URL.Host, map[string]string{"type": res.name}, int64(len(objects))) SendInternalCounterWithTags("discovered_objects", e.URL.Host, map[string]string{"type": res.name}, int64(len(objects)))
numRes += int64(len(objects)) numRes += int64(len(objects))
@ -413,7 +413,9 @@ func (e *Endpoint) discover(ctx context.Context) error {
e.collectMux.Lock() e.collectMux.Lock()
defer e.collectMux.Unlock() defer e.collectMux.Unlock()
e.resourceKinds = resourceKinds for k, v := range newObjects {
e.resourceKinds[k].objects = v
}
e.lun2ds = l2d e.lun2ds = l2d
sw.Stop() sw.Stop()
@ -793,9 +795,9 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc
latestSample := time.Time{} latestSample := time.Time{}
// Divide workload into chunks and process them concurrently // Divide workload into chunks and process them concurrently
e.chunkify(ctx, &res, now, latest, acc, e.chunkify(ctx, res, now, latest, acc,
func(chunk []types.PerfQuerySpec) { func(chunk []types.PerfQuerySpec) {
n, localLatest, err := e.collectChunk(ctx, chunk, &res, acc, now, estInterval) n, localLatest, err := e.collectChunk(ctx, chunk, res, acc, now, estInterval)
log.Printf("D! [inputs.vsphere] CollectChunk for %s returned %d metrics", resourceType, n) log.Printf("D! [inputs.vsphere] CollectChunk for %s returned %d metrics", resourceType, n)
if err != nil { if err != nil {
acc.AddError(errors.New("While collecting " + res.name + ": " + err.Error())) acc.AddError(errors.New("While collecting " + res.name + ": " + err.Error()))