Fix vSphere 6.7 missing data issue (#7233)

This commit is contained in:
Pontus Rydin 2020-04-21 14:30:29 -04:00 committed by GitHub
parent d1f109b316
commit d0db0e8f0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 175 additions and 147 deletions

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"log"
"math" "math"
"math/rand" "math/rand"
"net/url" "net/url"
@ -32,10 +31,18 @@ var isIPv6 = regexp.MustCompile("^(?:[A-Fa-f0-9]{0,4}:){1,7}[A-Fa-f0-9]{1,4}$")
const metricLookback = 3 // Number of time periods to look back at for non-realtime metrics const metricLookback = 3 // Number of time periods to look back at for non-realtime metrics
const maxSampleConst = 10 // Absolute maximim number of samples regardless of period const rtMetricLookback = 3 // Number of time periods to look back at for realtime metrics
const maxSampleConst = 10 // Absolute maximum number of samples regardless of period
const maxMetadataSamples = 100 // Number of resources to sample for metric metadata const maxMetadataSamples = 100 // Number of resources to sample for metric metadata
const hwMarkTTL = time.Duration(4 * time.Hour)
type queryChunk []types.PerfQuerySpec
type queryJob func(queryChunk)
// Endpoint is a high-level representation of a connected vCenter endpoint. It is backed by the lower // Endpoint is a high-level representation of a connected vCenter endpoint. It is backed by the lower
// level Client type. // level Client type.
type Endpoint struct { type Endpoint struct {
@ -52,6 +59,9 @@ type Endpoint struct {
customFields map[int32]string customFields map[int32]string
customAttrFilter filter.Filter customAttrFilter filter.Filter
customAttrEnabled bool customAttrEnabled bool
metricNameLookup map[int32]string
metricNameMux sync.RWMutex
log telegraf.Logger
} }
type resourceKind struct { type resourceKind struct {
@ -107,16 +117,17 @@ func (e *Endpoint) getParent(obj *objectRef, res *resourceKind) (*objectRef, boo
// NewEndpoint returns a new connection to a vCenter based on the URL and configuration passed // NewEndpoint returns a new connection to a vCenter based on the URL and configuration passed
// as parameters. // as parameters.
func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, error) { func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL, log telegraf.Logger) (*Endpoint, error) {
e := Endpoint{ e := Endpoint{
URL: url, URL: url,
Parent: parent, Parent: parent,
hwMarks: NewTSCache(1 * time.Hour), hwMarks: NewTSCache(hwMarkTTL),
lun2ds: make(map[string]string), lun2ds: make(map[string]string),
initialized: false, initialized: false,
clientFactory: NewClientFactory(ctx, url, parent), clientFactory: NewClientFactory(ctx, url, parent),
customAttrFilter: newFilterOrPanic(parent.CustomAttributeInclude, parent.CustomAttributeExclude), customAttrFilter: newFilterOrPanic(parent.CustomAttributeInclude, parent.CustomAttributeExclude),
customAttrEnabled: anythingEnabled(parent.CustomAttributeExclude), customAttrEnabled: anythingEnabled(parent.CustomAttributeExclude),
log: log,
} }
e.resourceKinds = map[string]*resourceKind{ e.resourceKinds = map[string]*resourceKind{
@ -254,10 +265,10 @@ func (e *Endpoint) startDiscovery(ctx context.Context) {
case <-e.discoveryTicker.C: case <-e.discoveryTicker.C:
err := e.discover(ctx) err := e.discover(ctx)
if err != nil && err != context.Canceled { if err != nil && err != context.Canceled {
e.Parent.Log.Errorf("Discovery for %s: %s", e.URL.Host, err.Error()) e.log.Errorf("Discovery for %s: %s", e.URL.Host, err.Error())
} }
case <-ctx.Done(): case <-ctx.Done():
e.Parent.Log.Debugf("Exiting discovery goroutine for %s", e.URL.Host) e.log.Debugf("Exiting discovery goroutine for %s", e.URL.Host)
e.discoveryTicker.Stop() e.discoveryTicker.Stop()
return return
} }
@ -268,7 +279,7 @@ func (e *Endpoint) startDiscovery(ctx context.Context) {
func (e *Endpoint) initalDiscovery(ctx context.Context) { func (e *Endpoint) initalDiscovery(ctx context.Context) {
err := e.discover(ctx) err := e.discover(ctx)
if err != nil && err != context.Canceled { if err != nil && err != context.Canceled {
e.Parent.Log.Errorf("Discovery for %s: %s", e.URL.Host, err.Error()) e.log.Errorf("Discovery for %s: %s", e.URL.Host, err.Error())
} }
e.startDiscovery(ctx) e.startDiscovery(ctx)
} }
@ -283,7 +294,7 @@ func (e *Endpoint) init(ctx context.Context) error {
if e.customAttrEnabled { if e.customAttrEnabled {
fields, err := client.GetCustomFields(ctx) fields, err := client.GetCustomFields(ctx)
if err != nil { if err != nil {
e.Parent.Log.Warn("Could not load custom field metadata") e.log.Warn("Could not load custom field metadata")
} else { } else {
e.customFields = fields e.customFields = fields
} }
@ -297,21 +308,29 @@ func (e *Endpoint) init(ctx context.Context) error {
return nil return nil
} }
func (e *Endpoint) getMetricNameMap(ctx context.Context) (map[int32]string, error) { func (e *Endpoint) getMetricNameForId(id int32) string {
e.metricNameMux.RLock()
defer e.metricNameMux.RUnlock()
return e.metricNameLookup[id]
}
func (e *Endpoint) reloadMetricNameMap(ctx context.Context) error {
e.metricNameMux.Lock()
defer e.metricNameMux.Unlock()
client, err := e.clientFactory.GetClient(ctx) client, err := e.clientFactory.GetClient(ctx)
if err != nil { if err != nil {
return nil, err return err
} }
mn, err := client.CounterInfoByName(ctx) mn, err := client.CounterInfoByName(ctx)
if err != nil { if err != nil {
return nil, err return err
} }
names := make(map[int32]string) e.metricNameLookup = make(map[int32]string)
for name, m := range mn { for name, m := range mn {
names[m.Key] = name e.metricNameLookup[m.Key] = name
} }
return names, nil return nil
} }
func (e *Endpoint) getMetadata(ctx context.Context, obj *objectRef, sampling int32) (performance.MetricList, error) { func (e *Endpoint) getMetadata(ctx context.Context, obj *objectRef, sampling int32) (performance.MetricList, error) {
@ -377,7 +396,7 @@ func (e *Endpoint) discover(ctx context.Context) error {
return ctx.Err() return ctx.Err()
} }
metricNames, err := e.getMetricNameMap(ctx) err := e.reloadMetricNameMap(ctx)
if err != nil { if err != nil {
return err return err
} }
@ -389,7 +408,7 @@ func (e *Endpoint) discover(ctx context.Context) error {
return err return err
} }
e.Parent.Log.Debugf("Discover new objects for %s", e.URL.Host) e.log.Debugf("Discover new objects for %s", e.URL.Host)
dcNameCache := make(map[string]string) dcNameCache := make(map[string]string)
numRes := int64(0) numRes := int64(0)
@ -397,51 +416,47 @@ func (e *Endpoint) discover(ctx context.Context) error {
// Populate resource objects, and endpoint instance info. // Populate resource objects, and endpoint instance info.
newObjects := make(map[string]objectMap) newObjects := make(map[string]objectMap)
for k, res := range e.resourceKinds { for k, res := range e.resourceKinds {
err := func() error { e.log.Debugf("Discovering resources for %s", res.name)
e.Parent.Log.Debugf("Discovering resources for %s", res.name) // Need to do this for all resource types even if they are not enabled
// Need to do this for all resource types even if they are not enabled if res.enabled || k != "vm" {
if res.enabled || k != "vm" { rf := ResourceFilter{
rf := ResourceFilter{ finder: &Finder{client},
finder: &Finder{client}, resType: res.vcName,
resType: res.vcName, paths: res.paths,
paths: res.paths, excludePaths: res.excludePaths}
excludePaths: res.excludePaths,
}
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1() objects, err := res.getObjects(ctx1, e, &rf)
objects, err := res.getObjects(ctx1, e, &rf) cancel1()
if err != nil { if err != nil {
return err return err
} }
// Fill in datacenter names where available (no need to do it for Datacenters) // Fill in datacenter names where available (no need to do it for Datacenters)
if res.name != "Datacenter" { if res.name != "Datacenter" {
for k, obj := range objects { for k, obj := range objects {
if obj.parentRef != nil { if obj.parentRef != nil {
obj.dcname = e.getDatacenterName(ctx, client, dcNameCache, *obj.parentRef) obj.dcname = e.getDatacenterName(ctx, client, dcNameCache, *obj.parentRef)
objects[k] = obj objects[k] = obj
}
} }
} }
}
// No need to collect metric metadata if resource type is not enabled // No need to collect metric metadata if resource type is not enabled
if res.enabled { if res.enabled {
if res.simple { if res.simple {
e.simpleMetadataSelect(ctx, client, res) e.simpleMetadataSelect(ctx, client, res)
} else { } else {
e.complexMetadataSelect(ctx, res, objects, metricNames) e.complexMetadataSelect(ctx, res, objects)
}
} }
newObjects[k] = objects newObjects[k] = objects
SendInternalCounterWithTags("discovered_objects", e.URL.Host, map[string]string{"type": res.name}, int64(len(objects))) SendInternalCounterWithTags("discovered_objects", e.URL.Host, map[string]string{"type": res.name}, int64(len(objects)))
numRes += int64(len(objects)) numRes += int64(len(objects))
} }
return nil }
}()
if err != nil { if err != nil {
return err e.log.Error(err)
} }
} }
@ -461,7 +476,7 @@ func (e *Endpoint) discover(ctx context.Context) error {
if e.customAttrEnabled { if e.customAttrEnabled {
fields, err = client.GetCustomFields(ctx) fields, err = client.GetCustomFields(ctx)
if err != nil { if err != nil {
e.Parent.Log.Warn("Could not load custom field metadata") e.log.Warn("Could not load custom field metadata")
fields = nil fields = nil
} }
} }
@ -485,10 +500,10 @@ func (e *Endpoint) discover(ctx context.Context) error {
} }
func (e *Endpoint) simpleMetadataSelect(ctx context.Context, client *Client, res *resourceKind) { func (e *Endpoint) simpleMetadataSelect(ctx context.Context, client *Client, res *resourceKind) {
e.Parent.Log.Debugf("Using fast metric metadata selection for %s", res.name) e.log.Debugf("Using fast metric metadata selection for %s", res.name)
m, err := client.CounterInfoByName(ctx) m, err := client.CounterInfoByName(ctx)
if err != nil { if err != nil {
e.Parent.Log.Errorf("Getting metric metadata. Discovery will be incomplete. Error: %s", err.Error()) e.log.Errorf("Getting metric metadata. Discovery will be incomplete. Error: %s", err.Error())
return return
} }
res.metrics = make(performance.MetricList, 0, len(res.include)) res.metrics = make(performance.MetricList, 0, len(res.include))
@ -504,12 +519,12 @@ func (e *Endpoint) simpleMetadataSelect(ctx context.Context, client *Client, res
} }
res.metrics = append(res.metrics, cnt) res.metrics = append(res.metrics, cnt)
} else { } else {
e.Parent.Log.Warnf("Metric name %s is unknown. Will not be collected", s) e.log.Warnf("Metric name %s is unknown. Will not be collected", s)
} }
} }
} }
func (e *Endpoint) complexMetadataSelect(ctx context.Context, res *resourceKind, objects objectMap, metricNames map[int32]string) { func (e *Endpoint) complexMetadataSelect(ctx context.Context, res *resourceKind, objects objectMap) {
// We're only going to get metadata from maxMetadataSamples resources. If we have // We're only going to get metadata from maxMetadataSamples resources. If we have
// more resources than that, we pick maxMetadataSamples samples at random. // more resources than that, we pick maxMetadataSamples samples at random.
sampledObjects := make([]*objectRef, len(objects)) sampledObjects := make([]*objectRef, len(objects))
@ -537,7 +552,7 @@ func (e *Endpoint) complexMetadataSelect(ctx context.Context, res *resourceKind,
te.Run(ctx, func() { te.Run(ctx, func() {
metrics, err := e.getMetadata(ctx, obj, res.sampling) metrics, err := e.getMetadata(ctx, obj, res.sampling)
if err != nil { if err != nil {
e.Parent.Log.Errorf("Getting metric metadata. Discovery will be incomplete. Error: %s", err.Error()) e.log.Errorf("Getting metric metadata. Discovery will be incomplete. Error: %s", err.Error())
} }
mMap := make(map[string]types.PerfMetricId) mMap := make(map[string]types.PerfMetricId)
for _, m := range metrics { for _, m := range metrics {
@ -546,11 +561,11 @@ func (e *Endpoint) complexMetadataSelect(ctx context.Context, res *resourceKind,
} else { } else {
m.Instance = "" m.Instance = ""
} }
if res.filters.Match(metricNames[m.CounterId]) { if res.filters.Match(e.getMetricNameForId(m.CounterId)) {
mMap[strconv.Itoa(int(m.CounterId))+"|"+m.Instance] = m mMap[strconv.Itoa(int(m.CounterId))+"|"+m.Instance] = m
} }
} }
e.Parent.Log.Debugf("Found %d metrics for %s", len(mMap), obj.name) e.log.Debugf("Found %d metrics for %s", len(mMap), obj.name)
instInfoMux.Lock() instInfoMux.Lock()
defer instInfoMux.Unlock() defer instInfoMux.Unlock()
if len(mMap) > len(res.metrics) { if len(mMap) > len(res.metrics) {
@ -624,12 +639,6 @@ func getClusters(ctx context.Context, e *Endpoint, filter *ResourceFilter) (obje
cache[r.Parent.Value] = p cache[r.Parent.Value] = p
} }
} }
m[r.ExtensibleManagedObject.Reference().Value] = &objectRef{
name: r.Name,
ref: r.ExtensibleManagedObject.Reference(),
parentRef: p,
customValues: e.loadCustomAttributes(&r.ManagedEntity),
}
return nil return nil
}() }()
if err != nil { if err != nil {
@ -718,6 +727,23 @@ func getVMs(ctx context.Context, e *Endpoint, filter *ResourceFilter) (objectMap
guest = cleanGuestID(r.Config.GuestId) guest = cleanGuestID(r.Config.GuestId)
uuid = r.Config.Uuid uuid = r.Config.Uuid
} }
cvs := make(map[string]string)
if e.customAttrEnabled {
for _, cv := range r.Summary.CustomValue {
val := cv.(*types.CustomFieldStringValue)
if val.Value == "" {
continue
}
key, ok := e.customFields[val.Key]
if !ok {
e.log.Warnf("Metadata for custom field %d not found. Skipping", val.Key)
continue
}
if e.customAttrFilter.Match(key) {
cvs[key] = val.Value
}
}
}
m[r.ExtensibleManagedObject.Reference().Value] = &objectRef{ m[r.ExtensibleManagedObject.Reference().Value] = &objectRef{
name: r.Name, name: r.Name,
ref: r.ExtensibleManagedObject.Reference(), ref: r.ExtensibleManagedObject.Reference(),
@ -832,13 +858,13 @@ func (e *Endpoint) Collect(ctx context.Context, acc telegraf.Accumulator) error
} }
// Workaround to make sure pqs is a copy of the loop variable and won't change. // Workaround to make sure pqs is a copy of the loop variable and won't change.
func submitChunkJob(ctx context.Context, te *ThrottledExecutor, job func([]types.PerfQuerySpec), pqs []types.PerfQuerySpec) { func submitChunkJob(ctx context.Context, te *ThrottledExecutor, job queryJob, pqs queryChunk) {
te.Run(ctx, func() { te.Run(ctx, func() {
job(pqs) job(pqs)
}) })
} }
func (e *Endpoint) chunkify(ctx context.Context, res *resourceKind, now time.Time, latest time.Time, job func([]types.PerfQuerySpec)) { func (e *Endpoint) chunkify(ctx context.Context, res *resourceKind, now time.Time, latest time.Time, acc telegraf.Accumulator, job queryJob) {
te := NewThrottledExecutor(e.Parent.CollectConcurrency) te := NewThrottledExecutor(e.Parent.CollectConcurrency)
maxMetrics := e.Parent.MaxQueryMetrics maxMetrics := e.Parent.MaxQueryMetrics
if maxMetrics < 1 { if maxMetrics < 1 {
@ -851,54 +877,48 @@ func (e *Endpoint) chunkify(ctx context.Context, res *resourceKind, now time.Tim
if res.name == "cluster" && maxMetrics > 10 { if res.name == "cluster" && maxMetrics > 10 {
maxMetrics = 10 maxMetrics = 10
} }
pqs := make([]types.PerfQuerySpec, 0, e.Parent.MaxQueryObjects)
metrics := 0
total := 0
nRes := 0
for _, resource := range res.objects {
mr := len(res.metrics)
for mr > 0 {
mc := mr
headroom := maxMetrics - metrics
if !res.realTime && mc > headroom { // Metric query limit only applies to non-realtime metrics
mc = headroom
}
fm := len(res.metrics) - mr
pq := types.PerfQuerySpec{
Entity: resource.ref,
MaxSample: maxSampleConst,
MetricId: res.metrics[fm : fm+mc],
IntervalId: res.sampling,
Format: "normal",
}
start, ok := e.hwMarks.Get(resource.ref.Value) pqs := make(queryChunk, 0, e.Parent.MaxQueryObjects)
for _, object := range res.objects {
timeBuckets := make(map[int64]*types.PerfQuerySpec, 0)
for metricIdx, metric := range res.metrics {
// Determine time of last successful collection
metricName := e.getMetricNameForId(metric.CounterId)
if metricName == "" {
e.log.Info("Unable to find metric name for id %d. Skipping!", metric.CounterId)
continue
}
start, ok := e.hwMarks.Get(object.ref.Value, metricName)
if !ok { if !ok {
// Look back 3 sampling periods by default
start = latest.Add(time.Duration(-res.sampling) * time.Second * (metricLookback - 1)) start = latest.Add(time.Duration(-res.sampling) * time.Second * (metricLookback - 1))
} }
pq.StartTime = &start start = start.Truncate(20 * time.Second) // Truncate to maximum resolution
pq.EndTime = &now
// Make sure endtime is always after start time. We may occasionally see samples from the future // Create bucket if we don't already have it
// returned from vCenter. This is presumably due to time drift between vCenter and EXSi nodes. bucket, ok := timeBuckets[start.Unix()]
if pq.StartTime.After(*pq.EndTime) { if !ok {
e.Parent.Log.Debugf("Future sample. Res: %s, StartTime: %s, EndTime: %s, Now: %s", pq.Entity, *pq.StartTime, *pq.EndTime, now) bucket = &types.PerfQuerySpec{
end := start.Add(time.Second) Entity: object.ref,
pq.EndTime = &end MaxSample: maxSampleConst,
MetricId: make([]types.PerfMetricId, 0),
IntervalId: res.sampling,
Format: "normal",
}
bucket.StartTime = &start
bucket.EndTime = &now
timeBuckets[start.Unix()] = bucket
} }
pqs = append(pqs, pq) // Add this metric to the bucket
mr -= mc bucket.MetricId = append(bucket.MetricId, metric)
metrics += mc
// We need to dump the current chunk of metrics for one of two reasons: // Bucket filled to capacity? (Only applies to non real time)
// 1) We filled up the metric quota while processing the current resource // OR if we're past the absolute maximum limit
// 2) We are at the last resource and have no more data to process. if (!res.realTime && len(bucket.MetricId) >= maxMetrics) || len(bucket.MetricId) > 100000 {
// 3) The query contains more than 100,000 individual metrics e.log.Debugf("Submitting partial query: %d metrics (%d remaining) of type %s for %s. Total objects %d",
if mr > 0 || nRes >= e.Parent.MaxQueryObjects || len(pqs) > 100000 { len(bucket.MetricId), len(res.metrics)-metricIdx, res.name, e.URL.Host, len(res.objects))
e.Parent.Log.Debugf("Queueing query: %d objects, %d metrics (%d remaining) of type %s for %s. Processed objects: %d. Total objects %d",
len(pqs), metrics, mr, res.name, e.URL.Host, total+1, len(res.objects))
// Don't send work items if the context has been cancelled. // Don't send work items if the context has been cancelled.
if ctx.Err() == context.Canceled { if ctx.Err() == context.Canceled {
@ -906,20 +926,23 @@ func (e *Endpoint) chunkify(ctx context.Context, res *resourceKind, now time.Tim
} }
// Run collection job // Run collection job
submitChunkJob(ctx, te, job, pqs) delete(timeBuckets, start.Unix())
pqs = make([]types.PerfQuerySpec, 0, e.Parent.MaxQueryObjects) submitChunkJob(ctx, te, job, queryChunk{*bucket})
metrics = 0 }
nRes = 0 }
// Handle data in time bucket and submit job if we've reached the maximum number of object.
for _, bucket := range timeBuckets {
pqs = append(pqs, *bucket)
if (!res.realTime && len(pqs) > e.Parent.MaxQueryObjects) || len(pqs) > 100000 {
e.log.Debugf("Submitting final bucket job for %s: %d metrics", res.name, len(bucket.MetricId))
submitChunkJob(ctx, te, job, pqs)
pqs = make(queryChunk, 0, e.Parent.MaxQueryObjects)
} }
} }
total++
nRes++
} }
// Handle final partially filled chunk // Submit any jobs left in the queue
if len(pqs) > 0 { if len(pqs) > 0 {
// Run collection job e.log.Debugf("Submitting job for %s: %d objects", res.name, len(pqs))
e.Parent.Log.Debugf("Queuing query: %d objects, %d metrics (0 remaining) of type %s for %s. Total objects %d (final chunk)",
len(pqs), metrics, res.name, e.URL.Host, len(res.objects))
submitChunkJob(ctx, te, job, pqs) submitChunkJob(ctx, te, job, pqs)
} }
@ -950,18 +973,18 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc
if estInterval < s { if estInterval < s {
estInterval = s estInterval = s
} }
e.Parent.Log.Debugf("Raw interval %s, padded: %s, estimated: %s", rawInterval, paddedInterval, estInterval) e.log.Debugf("Raw interval %s, padded: %s, estimated: %s", rawInterval, paddedInterval, estInterval)
} }
e.Parent.Log.Debugf("Interval estimated to %s", estInterval) e.log.Debugf("Interval estimated to %s", estInterval)
res.lastColl = localNow res.lastColl = localNow
latest := res.latestSample latest := res.latestSample
if !latest.IsZero() { if !latest.IsZero() {
elapsed := now.Sub(latest).Seconds() + 5.0 // Allow 5 second jitter. elapsed := now.Sub(latest).Seconds() + 5.0 // Allow 5 second jitter.
e.Parent.Log.Debugf("Latest: %s, elapsed: %f, resource: %s", latest, elapsed, resourceType) e.log.Debugf("Latest: %s, elapsed: %f, resource: %s", latest, elapsed, resourceType)
if !res.realTime && elapsed < float64(res.sampling) { if !res.realTime && elapsed < float64(res.sampling) {
// No new data would be available. We're outta here! // No new data would be available. We're outta here!
e.Parent.Log.Debugf("Sampling period for %s of %d has not elapsed on %s", e.log.Debugf("Sampling period for %s of %d has not elapsed on %s",
resourceType, res.sampling, e.URL.Host) resourceType, res.sampling, e.URL.Host)
return nil return nil
} }
@ -972,7 +995,7 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc
internalTags := map[string]string{"resourcetype": resourceType} internalTags := map[string]string{"resourcetype": resourceType}
sw := NewStopwatchWithTags("gather_duration", e.URL.Host, internalTags) sw := NewStopwatchWithTags("gather_duration", e.URL.Host, internalTags)
e.Parent.Log.Debugf("Collecting metrics for %d objects of type %s for %s", e.log.Debugf("Collecting metrics for %d objects of type %s for %s",
len(res.objects), resourceType, e.URL.Host) len(res.objects), resourceType, e.URL.Host)
count := int64(0) count := int64(0)
@ -981,9 +1004,10 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc
latestSample := time.Time{} latestSample := time.Time{}
// Divide workload into chunks and process them concurrently // Divide workload into chunks and process them concurrently
e.chunkify(ctx, res, now, latest, e.chunkify(ctx, res, now, latest, acc,
func(chunk []types.PerfQuerySpec) { func(chunk queryChunk) {
n, localLatest, err := e.collectChunk(ctx, chunk, res, acc, estInterval) n, localLatest, err := e.collectChunk(ctx, chunk, res, acc, now, estInterval)
e.log.Debugf("CollectChunk for %s returned %d metrics", resourceType, n)
if err != nil { if err != nil {
acc.AddError(errors.New("while collecting " + res.name + ": " + err.Error())) acc.AddError(errors.New("while collecting " + res.name + ": " + err.Error()))
return return
@ -997,7 +1021,7 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc
} }
}) })
e.Parent.Log.Debugf("Latest sample for %s set to %s", resourceType, latestSample) e.log.Debugf("Latest sample for %s set to %s", resourceType, latestSample)
if !latestSample.IsZero() { if !latestSample.IsZero() {
res.latestSample = latestSample res.latestSample = latestSample
} }
@ -1006,7 +1030,7 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc
return nil return nil
} }
func alignSamples(info []types.PerfSampleInfo, values []int64, interval time.Duration) ([]types.PerfSampleInfo, []float64) { func (e *Endpoint) alignSamples(info []types.PerfSampleInfo, values []int64, interval time.Duration) ([]types.PerfSampleInfo, []float64) {
rInfo := make([]types.PerfSampleInfo, 0, len(info)) rInfo := make([]types.PerfSampleInfo, 0, len(info))
rValues := make([]float64, 0, len(values)) rValues := make([]float64, 0, len(values))
bi := 1.0 bi := 1.0
@ -1015,7 +1039,7 @@ func alignSamples(info []types.PerfSampleInfo, values []int64, interval time.Dur
// According to the docs, SampleInfo and Value should have the same length, but we've seen corrupted // According to the docs, SampleInfo and Value should have the same length, but we've seen corrupted
// data coming back with missing values. Take care of that gracefully! // data coming back with missing values. Take care of that gracefully!
if idx >= len(values) { if idx >= len(values) {
log.Printf("D! [inputs.vsphere] len(SampleInfo)>len(Value) %d > %d", len(info), len(values)) e.log.Debugf("len(SampleInfo)>len(Value) %d > %d during alignment", len(info), len(values))
break break
} }
v := float64(values[idx]) v := float64(values[idx])
@ -1044,8 +1068,8 @@ func alignSamples(info []types.PerfSampleInfo, values []int64, interval time.Dur
return rInfo, rValues return rInfo, rValues
} }
func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, res *resourceKind, acc telegraf.Accumulator, interval time.Duration) (int, time.Time, error) { func (e *Endpoint) collectChunk(ctx context.Context, pqs queryChunk, res *resourceKind, acc telegraf.Accumulator, now time.Time, interval time.Duration) (int, time.Time, error) {
e.Parent.Log.Debugf("Query for %s has %d QuerySpecs", res.name, len(pqs)) e.log.Debugf("Query for %s has %d QuerySpecs", res.name, len(pqs))
latestSample := time.Time{} latestSample := time.Time{}
count := 0 count := 0
resourceType := res.name resourceType := res.name
@ -1066,14 +1090,14 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec,
return count, latestSample, err return count, latestSample, err
} }
e.Parent.Log.Debugf("Query for %s returned metrics for %d objects", resourceType, len(ems)) e.log.Debugf("Query for %s returned metrics for %d objects", resourceType, len(ems))
// Iterate through results // Iterate through results
for _, em := range ems { for _, em := range ems {
moid := em.Entity.Reference().Value moid := em.Entity.Reference().Value
instInfo, found := res.objects[moid] instInfo, found := res.objects[moid]
if !found { if !found {
e.Parent.Log.Errorf("MOID %s not found in cache. Skipping! (This should not happen!)", moid) e.log.Errorf("MOID %s not found in cache. Skipping! (This should not happen!)", moid)
continue continue
} }
buckets := make(map[string]metricEntry) buckets := make(map[string]metricEntry)
@ -1088,19 +1112,19 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec,
// Populate tags // Populate tags
objectRef, ok := res.objects[moid] objectRef, ok := res.objects[moid]
if !ok { if !ok {
e.Parent.Log.Errorf("MOID %s not found in cache. Skipping", moid) e.log.Errorf("MOID %s not found in cache. Skipping", moid)
continue continue
} }
e.populateTags(objectRef, resourceType, res, t, &v) e.populateTags(objectRef, resourceType, res, t, &v)
nValues := 0 nValues := 0
alignedInfo, alignedValues := alignSamples(em.SampleInfo, v.Value, interval) alignedInfo, alignedValues := e.alignSamples(em.SampleInfo, v.Value, interval)
for idx, sample := range alignedInfo { for idx, sample := range alignedInfo {
// According to the docs, SampleInfo and Value should have the same length, but we've seen corrupted // According to the docs, SampleInfo and Value should have the same length, but we've seen corrupted
// data coming back with missing values. Take care of that gracefully! // data coming back with missing values. Take care of that gracefully!
if idx >= len(alignedValues) { if idx >= len(alignedValues) {
e.Parent.Log.Debugf("Len(SampleInfo)>len(Value) %d > %d", len(alignedInfo), len(alignedValues)) e.log.Debugf("Len(SampleInfo)>len(Value) %d > %d", len(alignedInfo), len(alignedValues))
break break
} }
ts := sample.Timestamp ts := sample.Timestamp
@ -1121,7 +1145,7 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec,
// Percentage values must be scaled down by 100. // Percentage values must be scaled down by 100.
info, ok := metricInfo[name] info, ok := metricInfo[name]
if !ok { if !ok {
e.Parent.Log.Errorf("Could not determine unit for %s. Skipping", name) e.log.Errorf("Could not determine unit for %s. Skipping", name)
} }
v := alignedValues[idx] v := alignedValues[idx]
if info.UnitInfo.GetElementDescription().Key == "percent" { if info.UnitInfo.GetElementDescription().Key == "percent" {
@ -1136,10 +1160,10 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec,
count++ count++
// Update highwater marks // Update highwater marks
e.hwMarks.Put(moid, ts) e.hwMarks.Put(moid, name, ts)
} }
if nValues == 0 { if nValues == 0 {
e.Parent.Log.Debugf("Missing value for: %s, %s", name, objectRef.name) e.log.Debugf("Missing value for: %s, %s", name, objectRef.name)
continue continue
} }
} }

View File

@ -10,7 +10,6 @@ import (
type TSCache struct { type TSCache struct {
ttl time.Duration ttl time.Duration
table map[string]time.Time table map[string]time.Time
done chan struct{}
mux sync.RWMutex mux sync.RWMutex
} }
@ -19,7 +18,6 @@ func NewTSCache(ttl time.Duration) *TSCache {
return &TSCache{ return &TSCache{
ttl: ttl, ttl: ttl,
table: make(map[string]time.Time), table: make(map[string]time.Time),
done: make(chan struct{}),
} }
} }
@ -39,10 +37,10 @@ func (t *TSCache) Purge() {
// IsNew returns true if the supplied timestamp for the supplied key is more recent than the // IsNew returns true if the supplied timestamp for the supplied key is more recent than the
// timestamp we have on record. // timestamp we have on record.
func (t *TSCache) IsNew(key string, tm time.Time) bool { func (t *TSCache) IsNew(key string, metricName string, tm time.Time) bool {
t.mux.RLock() t.mux.RLock()
defer t.mux.RUnlock() defer t.mux.RUnlock()
v, ok := t.table[key] v, ok := t.table[makeKey(key, metricName)]
if !ok { if !ok {
return true // We've never seen this before, so consider everything a new sample return true // We've never seen this before, so consider everything a new sample
} }
@ -50,16 +48,20 @@ func (t *TSCache) IsNew(key string, tm time.Time) bool {
} }
// Get returns a timestamp (if present) // Get returns a timestamp (if present)
func (t *TSCache) Get(key string) (time.Time, bool) { func (t *TSCache) Get(key string, metricName string) (time.Time, bool) {
t.mux.RLock() t.mux.RLock()
defer t.mux.RUnlock() defer t.mux.RUnlock()
ts, ok := t.table[key] ts, ok := t.table[makeKey(key, metricName)]
return ts, ok return ts, ok
} }
// Put updates the latest timestamp for the supplied key. // Put updates the latest timestamp for the supplied key.
func (t *TSCache) Put(key string, time time.Time) { func (t *TSCache) Put(key string, metricName string, time time.Time) {
t.mux.Lock() t.mux.Lock()
defer t.mux.Unlock() defer t.mux.Unlock()
t.table[key] = time t.table[makeKey(key, metricName)] = time
}
func makeKey(resource string, metric string) string {
return resource + "|" + metric
} }

View File

@ -275,7 +275,7 @@ func (v *VSphere) Start(acc telegraf.Accumulator) error {
if err != nil { if err != nil {
return err return err
} }
ep, err := NewEndpoint(ctx, v, u) ep, err := NewEndpoint(ctx, v, u, v.Log)
if err != nil { if err != nil {
return err return err
} }

View File

@ -182,7 +182,8 @@ func testAlignUniform(t *testing.T, n int) {
} }
values[i] = 1 values[i] = 1
} }
newInfo, newValues := alignSamples(info, values, 60*time.Second) e := Endpoint{log: testutil.Logger{}}
newInfo, newValues := e.alignSamples(info, values, 60*time.Second)
require.Equal(t, n/3, len(newInfo), "Aligned infos have wrong size") require.Equal(t, n/3, len(newInfo), "Aligned infos have wrong size")
require.Equal(t, n/3, len(newValues), "Aligned values have wrong size") require.Equal(t, n/3, len(newValues), "Aligned values have wrong size")
for _, v := range newValues { for _, v := range newValues {
@ -207,7 +208,8 @@ func TestAlignMetrics(t *testing.T) {
} }
values[i] = int64(i%3 + 1) values[i] = int64(i%3 + 1)
} }
newInfo, newValues := alignSamples(info, values, 60*time.Second) e := Endpoint{log: testutil.Logger{}}
newInfo, newValues := e.alignSamples(info, values, 60*time.Second)
require.Equal(t, n/3, len(newInfo), "Aligned infos have wrong size") require.Equal(t, n/3, len(newInfo), "Aligned infos have wrong size")
require.Equal(t, n/3, len(newValues), "Aligned values have wrong size") require.Equal(t, n/3, len(newValues), "Aligned values have wrong size")
for _, v := range newValues { for _, v := range newValues {