From d0db0e8f0a67cf071c0a2a62a8b4220e73b3e602 Mon Sep 17 00:00:00 2001 From: Pontus Rydin Date: Tue, 21 Apr 2020 14:30:29 -0400 Subject: [PATCH] Fix vSphere 6.7 missing data issue (#7233) --- plugins/inputs/vsphere/endpoint.go | 296 +++++++++++++------------ plugins/inputs/vsphere/tscache.go | 18 +- plugins/inputs/vsphere/vsphere.go | 2 +- plugins/inputs/vsphere/vsphere_test.go | 6 +- 4 files changed, 175 insertions(+), 147 deletions(-) diff --git a/plugins/inputs/vsphere/endpoint.go b/plugins/inputs/vsphere/endpoint.go index c049e495f..a7d4db5ba 100644 --- a/plugins/inputs/vsphere/endpoint.go +++ b/plugins/inputs/vsphere/endpoint.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "log" "math" "math/rand" "net/url" @@ -32,10 +31,18 @@ var isIPv6 = regexp.MustCompile("^(?:[A-Fa-f0-9]{0,4}:){1,7}[A-Fa-f0-9]{1,4}$") const metricLookback = 3 // Number of time periods to look back at for non-realtime metrics -const maxSampleConst = 10 // Absolute maximim number of samples regardless of period +const rtMetricLookback = 3 // Number of time periods to look back at for realtime metrics + +const maxSampleConst = 10 // Absolute maximum number of samples regardless of period const maxMetadataSamples = 100 // Number of resources to sample for metric metadata +const hwMarkTTL = time.Duration(4 * time.Hour) + +type queryChunk []types.PerfQuerySpec + +type queryJob func(queryChunk) + // Endpoint is a high-level representation of a connected vCenter endpoint. It is backed by the lower // level Client type. type Endpoint struct { @@ -52,6 +59,9 @@ type Endpoint struct { customFields map[int32]string customAttrFilter filter.Filter customAttrEnabled bool + metricNameLookup map[int32]string + metricNameMux sync.RWMutex + log telegraf.Logger } type resourceKind struct { @@ -107,16 +117,17 @@ func (e *Endpoint) getParent(obj *objectRef, res *resourceKind) (*objectRef, boo // NewEndpoint returns a new connection to a vCenter based on the URL and configuration passed // as parameters. -func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, error) { +func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL, log telegraf.Logger) (*Endpoint, error) { e := Endpoint{ URL: url, Parent: parent, - hwMarks: NewTSCache(1 * time.Hour), + hwMarks: NewTSCache(hwMarkTTL), lun2ds: make(map[string]string), initialized: false, clientFactory: NewClientFactory(ctx, url, parent), customAttrFilter: newFilterOrPanic(parent.CustomAttributeInclude, parent.CustomAttributeExclude), customAttrEnabled: anythingEnabled(parent.CustomAttributeExclude), + log: log, } e.resourceKinds = map[string]*resourceKind{ @@ -254,10 +265,10 @@ func (e *Endpoint) startDiscovery(ctx context.Context) { case <-e.discoveryTicker.C: err := e.discover(ctx) if err != nil && err != context.Canceled { - e.Parent.Log.Errorf("Discovery for %s: %s", e.URL.Host, err.Error()) + e.log.Errorf("Discovery for %s: %s", e.URL.Host, err.Error()) } case <-ctx.Done(): - e.Parent.Log.Debugf("Exiting discovery goroutine for %s", e.URL.Host) + e.log.Debugf("Exiting discovery goroutine for %s", e.URL.Host) e.discoveryTicker.Stop() return } @@ -268,7 +279,7 @@ func (e *Endpoint) startDiscovery(ctx context.Context) { func (e *Endpoint) initalDiscovery(ctx context.Context) { err := e.discover(ctx) if err != nil && err != context.Canceled { - e.Parent.Log.Errorf("Discovery for %s: %s", e.URL.Host, err.Error()) + e.log.Errorf("Discovery for %s: %s", e.URL.Host, err.Error()) } e.startDiscovery(ctx) } @@ -283,7 +294,7 @@ func (e *Endpoint) init(ctx context.Context) error { if e.customAttrEnabled { fields, err := client.GetCustomFields(ctx) if err != nil { - e.Parent.Log.Warn("Could not load custom field metadata") + e.log.Warn("Could not load custom field metadata") } else { e.customFields = fields } @@ -297,21 +308,29 @@ func (e *Endpoint) init(ctx context.Context) error { return nil } -func (e *Endpoint) getMetricNameMap(ctx context.Context) (map[int32]string, error) { +func (e *Endpoint) getMetricNameForId(id int32) string { + e.metricNameMux.RLock() + defer e.metricNameMux.RUnlock() + return e.metricNameLookup[id] +} + +func (e *Endpoint) reloadMetricNameMap(ctx context.Context) error { + e.metricNameMux.Lock() + defer e.metricNameMux.Unlock() client, err := e.clientFactory.GetClient(ctx) if err != nil { - return nil, err + return err } mn, err := client.CounterInfoByName(ctx) if err != nil { - return nil, err + return err } - names := make(map[int32]string) + e.metricNameLookup = make(map[int32]string) for name, m := range mn { - names[m.Key] = name + e.metricNameLookup[m.Key] = name } - return names, nil + return nil } func (e *Endpoint) getMetadata(ctx context.Context, obj *objectRef, sampling int32) (performance.MetricList, error) { @@ -377,7 +396,7 @@ func (e *Endpoint) discover(ctx context.Context) error { return ctx.Err() } - metricNames, err := e.getMetricNameMap(ctx) + err := e.reloadMetricNameMap(ctx) if err != nil { return err } @@ -389,7 +408,7 @@ func (e *Endpoint) discover(ctx context.Context) error { return err } - e.Parent.Log.Debugf("Discover new objects for %s", e.URL.Host) + e.log.Debugf("Discover new objects for %s", e.URL.Host) dcNameCache := make(map[string]string) numRes := int64(0) @@ -397,51 +416,47 @@ func (e *Endpoint) discover(ctx context.Context) error { // Populate resource objects, and endpoint instance info. newObjects := make(map[string]objectMap) for k, res := range e.resourceKinds { - err := func() error { - e.Parent.Log.Debugf("Discovering resources for %s", res.name) - // Need to do this for all resource types even if they are not enabled - if res.enabled || k != "vm" { - rf := ResourceFilter{ - finder: &Finder{client}, - resType: res.vcName, - paths: res.paths, - excludePaths: res.excludePaths, - } + e.log.Debugf("Discovering resources for %s", res.name) + // Need to do this for all resource types even if they are not enabled + if res.enabled || k != "vm" { + rf := ResourceFilter{ + finder: &Finder{client}, + resType: res.vcName, + paths: res.paths, + excludePaths: res.excludePaths} - ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) - defer cancel1() - objects, err := res.getObjects(ctx1, e, &rf) - if err != nil { - return err - } + ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) + objects, err := res.getObjects(ctx1, e, &rf) + cancel1() + if err != nil { + return err + } - // Fill in datacenter names where available (no need to do it for Datacenters) - if res.name != "Datacenter" { - for k, obj := range objects { - if obj.parentRef != nil { - obj.dcname = e.getDatacenterName(ctx, client, dcNameCache, *obj.parentRef) - objects[k] = obj - } + // Fill in datacenter names where available (no need to do it for Datacenters) + if res.name != "Datacenter" { + for k, obj := range objects { + if obj.parentRef != nil { + obj.dcname = e.getDatacenterName(ctx, client, dcNameCache, *obj.parentRef) + objects[k] = obj } } + } - // No need to collect metric metadata if resource type is not enabled - if res.enabled { - if res.simple { - e.simpleMetadataSelect(ctx, client, res) - } else { - e.complexMetadataSelect(ctx, res, objects, metricNames) - } + // No need to collect metric metadata if resource type is not enabled + if res.enabled { + if res.simple { + e.simpleMetadataSelect(ctx, client, res) + } else { + e.complexMetadataSelect(ctx, res, objects) } newObjects[k] = objects SendInternalCounterWithTags("discovered_objects", e.URL.Host, map[string]string{"type": res.name}, int64(len(objects))) numRes += int64(len(objects)) } - return nil - }() + } if err != nil { - return err + e.log.Error(err) } } @@ -461,7 +476,7 @@ func (e *Endpoint) discover(ctx context.Context) error { if e.customAttrEnabled { fields, err = client.GetCustomFields(ctx) if err != nil { - e.Parent.Log.Warn("Could not load custom field metadata") + e.log.Warn("Could not load custom field metadata") fields = nil } } @@ -485,10 +500,10 @@ func (e *Endpoint) discover(ctx context.Context) error { } func (e *Endpoint) simpleMetadataSelect(ctx context.Context, client *Client, res *resourceKind) { - e.Parent.Log.Debugf("Using fast metric metadata selection for %s", res.name) + e.log.Debugf("Using fast metric metadata selection for %s", res.name) m, err := client.CounterInfoByName(ctx) if err != nil { - e.Parent.Log.Errorf("Getting metric metadata. Discovery will be incomplete. Error: %s", err.Error()) + e.log.Errorf("Getting metric metadata. Discovery will be incomplete. Error: %s", err.Error()) return } res.metrics = make(performance.MetricList, 0, len(res.include)) @@ -504,12 +519,12 @@ func (e *Endpoint) simpleMetadataSelect(ctx context.Context, client *Client, res } res.metrics = append(res.metrics, cnt) } else { - e.Parent.Log.Warnf("Metric name %s is unknown. Will not be collected", s) + e.log.Warnf("Metric name %s is unknown. Will not be collected", s) } } } -func (e *Endpoint) complexMetadataSelect(ctx context.Context, res *resourceKind, objects objectMap, metricNames map[int32]string) { +func (e *Endpoint) complexMetadataSelect(ctx context.Context, res *resourceKind, objects objectMap) { // We're only going to get metadata from maxMetadataSamples resources. If we have // more resources than that, we pick maxMetadataSamples samples at random. sampledObjects := make([]*objectRef, len(objects)) @@ -537,7 +552,7 @@ func (e *Endpoint) complexMetadataSelect(ctx context.Context, res *resourceKind, te.Run(ctx, func() { metrics, err := e.getMetadata(ctx, obj, res.sampling) if err != nil { - e.Parent.Log.Errorf("Getting metric metadata. Discovery will be incomplete. Error: %s", err.Error()) + e.log.Errorf("Getting metric metadata. Discovery will be incomplete. Error: %s", err.Error()) } mMap := make(map[string]types.PerfMetricId) for _, m := range metrics { @@ -546,11 +561,11 @@ func (e *Endpoint) complexMetadataSelect(ctx context.Context, res *resourceKind, } else { m.Instance = "" } - if res.filters.Match(metricNames[m.CounterId]) { + if res.filters.Match(e.getMetricNameForId(m.CounterId)) { mMap[strconv.Itoa(int(m.CounterId))+"|"+m.Instance] = m } } - e.Parent.Log.Debugf("Found %d metrics for %s", len(mMap), obj.name) + e.log.Debugf("Found %d metrics for %s", len(mMap), obj.name) instInfoMux.Lock() defer instInfoMux.Unlock() if len(mMap) > len(res.metrics) { @@ -624,12 +639,6 @@ func getClusters(ctx context.Context, e *Endpoint, filter *ResourceFilter) (obje cache[r.Parent.Value] = p } } - m[r.ExtensibleManagedObject.Reference().Value] = &objectRef{ - name: r.Name, - ref: r.ExtensibleManagedObject.Reference(), - parentRef: p, - customValues: e.loadCustomAttributes(&r.ManagedEntity), - } return nil }() if err != nil { @@ -718,6 +727,23 @@ func getVMs(ctx context.Context, e *Endpoint, filter *ResourceFilter) (objectMap guest = cleanGuestID(r.Config.GuestId) uuid = r.Config.Uuid } + cvs := make(map[string]string) + if e.customAttrEnabled { + for _, cv := range r.Summary.CustomValue { + val := cv.(*types.CustomFieldStringValue) + if val.Value == "" { + continue + } + key, ok := e.customFields[val.Key] + if !ok { + e.log.Warnf("Metadata for custom field %d not found. Skipping", val.Key) + continue + } + if e.customAttrFilter.Match(key) { + cvs[key] = val.Value + } + } + } m[r.ExtensibleManagedObject.Reference().Value] = &objectRef{ name: r.Name, ref: r.ExtensibleManagedObject.Reference(), @@ -832,13 +858,13 @@ func (e *Endpoint) Collect(ctx context.Context, acc telegraf.Accumulator) error } // Workaround to make sure pqs is a copy of the loop variable and won't change. -func submitChunkJob(ctx context.Context, te *ThrottledExecutor, job func([]types.PerfQuerySpec), pqs []types.PerfQuerySpec) { +func submitChunkJob(ctx context.Context, te *ThrottledExecutor, job queryJob, pqs queryChunk) { te.Run(ctx, func() { job(pqs) }) } -func (e *Endpoint) chunkify(ctx context.Context, res *resourceKind, now time.Time, latest time.Time, job func([]types.PerfQuerySpec)) { +func (e *Endpoint) chunkify(ctx context.Context, res *resourceKind, now time.Time, latest time.Time, acc telegraf.Accumulator, job queryJob) { te := NewThrottledExecutor(e.Parent.CollectConcurrency) maxMetrics := e.Parent.MaxQueryMetrics if maxMetrics < 1 { @@ -851,54 +877,48 @@ func (e *Endpoint) chunkify(ctx context.Context, res *resourceKind, now time.Tim if res.name == "cluster" && maxMetrics > 10 { maxMetrics = 10 } - pqs := make([]types.PerfQuerySpec, 0, e.Parent.MaxQueryObjects) - metrics := 0 - total := 0 - nRes := 0 - for _, resource := range res.objects { - mr := len(res.metrics) - for mr > 0 { - mc := mr - headroom := maxMetrics - metrics - if !res.realTime && mc > headroom { // Metric query limit only applies to non-realtime metrics - mc = headroom - } - fm := len(res.metrics) - mr - pq := types.PerfQuerySpec{ - Entity: resource.ref, - MaxSample: maxSampleConst, - MetricId: res.metrics[fm : fm+mc], - IntervalId: res.sampling, - Format: "normal", - } - start, ok := e.hwMarks.Get(resource.ref.Value) + pqs := make(queryChunk, 0, e.Parent.MaxQueryObjects) + + for _, object := range res.objects { + timeBuckets := make(map[int64]*types.PerfQuerySpec, 0) + for metricIdx, metric := range res.metrics { + + // Determine time of last successful collection + metricName := e.getMetricNameForId(metric.CounterId) + if metricName == "" { + e.log.Info("Unable to find metric name for id %d. Skipping!", metric.CounterId) + continue + } + start, ok := e.hwMarks.Get(object.ref.Value, metricName) if !ok { - // Look back 3 sampling periods by default start = latest.Add(time.Duration(-res.sampling) * time.Second * (metricLookback - 1)) } - pq.StartTime = &start - pq.EndTime = &now + start = start.Truncate(20 * time.Second) // Truncate to maximum resolution - // Make sure endtime is always after start time. We may occasionally see samples from the future - // returned from vCenter. This is presumably due to time drift between vCenter and EXSi nodes. - if pq.StartTime.After(*pq.EndTime) { - e.Parent.Log.Debugf("Future sample. Res: %s, StartTime: %s, EndTime: %s, Now: %s", pq.Entity, *pq.StartTime, *pq.EndTime, now) - end := start.Add(time.Second) - pq.EndTime = &end + // Create bucket if we don't already have it + bucket, ok := timeBuckets[start.Unix()] + if !ok { + bucket = &types.PerfQuerySpec{ + Entity: object.ref, + MaxSample: maxSampleConst, + MetricId: make([]types.PerfMetricId, 0), + IntervalId: res.sampling, + Format: "normal", + } + bucket.StartTime = &start + bucket.EndTime = &now + timeBuckets[start.Unix()] = bucket } - pqs = append(pqs, pq) - mr -= mc - metrics += mc + // Add this metric to the bucket + bucket.MetricId = append(bucket.MetricId, metric) - // We need to dump the current chunk of metrics for one of two reasons: - // 1) We filled up the metric quota while processing the current resource - // 2) We are at the last resource and have no more data to process. - // 3) The query contains more than 100,000 individual metrics - if mr > 0 || nRes >= e.Parent.MaxQueryObjects || len(pqs) > 100000 { - e.Parent.Log.Debugf("Queueing query: %d objects, %d metrics (%d remaining) of type %s for %s. Processed objects: %d. Total objects %d", - len(pqs), metrics, mr, res.name, e.URL.Host, total+1, len(res.objects)) + // Bucket filled to capacity? (Only applies to non real time) + // OR if we're past the absolute maximum limit + if (!res.realTime && len(bucket.MetricId) >= maxMetrics) || len(bucket.MetricId) > 100000 { + e.log.Debugf("Submitting partial query: %d metrics (%d remaining) of type %s for %s. Total objects %d", + len(bucket.MetricId), len(res.metrics)-metricIdx, res.name, e.URL.Host, len(res.objects)) // Don't send work items if the context has been cancelled. if ctx.Err() == context.Canceled { @@ -906,20 +926,23 @@ func (e *Endpoint) chunkify(ctx context.Context, res *resourceKind, now time.Tim } // Run collection job - submitChunkJob(ctx, te, job, pqs) - pqs = make([]types.PerfQuerySpec, 0, e.Parent.MaxQueryObjects) - metrics = 0 - nRes = 0 + delete(timeBuckets, start.Unix()) + submitChunkJob(ctx, te, job, queryChunk{*bucket}) + } + } + // Handle data in time bucket and submit job if we've reached the maximum number of object. + for _, bucket := range timeBuckets { + pqs = append(pqs, *bucket) + if (!res.realTime && len(pqs) > e.Parent.MaxQueryObjects) || len(pqs) > 100000 { + e.log.Debugf("Submitting final bucket job for %s: %d metrics", res.name, len(bucket.MetricId)) + submitChunkJob(ctx, te, job, pqs) + pqs = make(queryChunk, 0, e.Parent.MaxQueryObjects) } } - total++ - nRes++ } - // Handle final partially filled chunk + // Submit any jobs left in the queue if len(pqs) > 0 { - // Run collection job - e.Parent.Log.Debugf("Queuing query: %d objects, %d metrics (0 remaining) of type %s for %s. Total objects %d (final chunk)", - len(pqs), metrics, res.name, e.URL.Host, len(res.objects)) + e.log.Debugf("Submitting job for %s: %d objects", res.name, len(pqs)) submitChunkJob(ctx, te, job, pqs) } @@ -950,18 +973,18 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc if estInterval < s { estInterval = s } - e.Parent.Log.Debugf("Raw interval %s, padded: %s, estimated: %s", rawInterval, paddedInterval, estInterval) + e.log.Debugf("Raw interval %s, padded: %s, estimated: %s", rawInterval, paddedInterval, estInterval) } - e.Parent.Log.Debugf("Interval estimated to %s", estInterval) + e.log.Debugf("Interval estimated to %s", estInterval) res.lastColl = localNow latest := res.latestSample if !latest.IsZero() { elapsed := now.Sub(latest).Seconds() + 5.0 // Allow 5 second jitter. - e.Parent.Log.Debugf("Latest: %s, elapsed: %f, resource: %s", latest, elapsed, resourceType) + e.log.Debugf("Latest: %s, elapsed: %f, resource: %s", latest, elapsed, resourceType) if !res.realTime && elapsed < float64(res.sampling) { // No new data would be available. We're outta here! - e.Parent.Log.Debugf("Sampling period for %s of %d has not elapsed on %s", + e.log.Debugf("Sampling period for %s of %d has not elapsed on %s", resourceType, res.sampling, e.URL.Host) return nil } @@ -972,7 +995,7 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc internalTags := map[string]string{"resourcetype": resourceType} sw := NewStopwatchWithTags("gather_duration", e.URL.Host, internalTags) - e.Parent.Log.Debugf("Collecting metrics for %d objects of type %s for %s", + e.log.Debugf("Collecting metrics for %d objects of type %s for %s", len(res.objects), resourceType, e.URL.Host) count := int64(0) @@ -981,9 +1004,10 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc latestSample := time.Time{} // Divide workload into chunks and process them concurrently - e.chunkify(ctx, res, now, latest, - func(chunk []types.PerfQuerySpec) { - n, localLatest, err := e.collectChunk(ctx, chunk, res, acc, estInterval) + e.chunkify(ctx, res, now, latest, acc, + func(chunk queryChunk) { + n, localLatest, err := e.collectChunk(ctx, chunk, res, acc, now, estInterval) + e.log.Debugf("CollectChunk for %s returned %d metrics", resourceType, n) if err != nil { acc.AddError(errors.New("while collecting " + res.name + ": " + err.Error())) return @@ -997,7 +1021,7 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc } }) - e.Parent.Log.Debugf("Latest sample for %s set to %s", resourceType, latestSample) + e.log.Debugf("Latest sample for %s set to %s", resourceType, latestSample) if !latestSample.IsZero() { res.latestSample = latestSample } @@ -1006,7 +1030,7 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc return nil } -func alignSamples(info []types.PerfSampleInfo, values []int64, interval time.Duration) ([]types.PerfSampleInfo, []float64) { +func (e *Endpoint) alignSamples(info []types.PerfSampleInfo, values []int64, interval time.Duration) ([]types.PerfSampleInfo, []float64) { rInfo := make([]types.PerfSampleInfo, 0, len(info)) rValues := make([]float64, 0, len(values)) bi := 1.0 @@ -1015,7 +1039,7 @@ func alignSamples(info []types.PerfSampleInfo, values []int64, interval time.Dur // According to the docs, SampleInfo and Value should have the same length, but we've seen corrupted // data coming back with missing values. Take care of that gracefully! if idx >= len(values) { - log.Printf("D! [inputs.vsphere] len(SampleInfo)>len(Value) %d > %d", len(info), len(values)) + e.log.Debugf("len(SampleInfo)>len(Value) %d > %d during alignment", len(info), len(values)) break } v := float64(values[idx]) @@ -1044,8 +1068,8 @@ func alignSamples(info []types.PerfSampleInfo, values []int64, interval time.Dur return rInfo, rValues } -func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, res *resourceKind, acc telegraf.Accumulator, interval time.Duration) (int, time.Time, error) { - e.Parent.Log.Debugf("Query for %s has %d QuerySpecs", res.name, len(pqs)) +func (e *Endpoint) collectChunk(ctx context.Context, pqs queryChunk, res *resourceKind, acc telegraf.Accumulator, now time.Time, interval time.Duration) (int, time.Time, error) { + e.log.Debugf("Query for %s has %d QuerySpecs", res.name, len(pqs)) latestSample := time.Time{} count := 0 resourceType := res.name @@ -1066,14 +1090,14 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, return count, latestSample, err } - e.Parent.Log.Debugf("Query for %s returned metrics for %d objects", resourceType, len(ems)) + e.log.Debugf("Query for %s returned metrics for %d objects", resourceType, len(ems)) // Iterate through results for _, em := range ems { moid := em.Entity.Reference().Value instInfo, found := res.objects[moid] if !found { - e.Parent.Log.Errorf("MOID %s not found in cache. Skipping! (This should not happen!)", moid) + e.log.Errorf("MOID %s not found in cache. Skipping! (This should not happen!)", moid) continue } buckets := make(map[string]metricEntry) @@ -1088,19 +1112,19 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, // Populate tags objectRef, ok := res.objects[moid] if !ok { - e.Parent.Log.Errorf("MOID %s not found in cache. Skipping", moid) + e.log.Errorf("MOID %s not found in cache. Skipping", moid) continue } e.populateTags(objectRef, resourceType, res, t, &v) nValues := 0 - alignedInfo, alignedValues := alignSamples(em.SampleInfo, v.Value, interval) + alignedInfo, alignedValues := e.alignSamples(em.SampleInfo, v.Value, interval) for idx, sample := range alignedInfo { // According to the docs, SampleInfo and Value should have the same length, but we've seen corrupted // data coming back with missing values. Take care of that gracefully! if idx >= len(alignedValues) { - e.Parent.Log.Debugf("Len(SampleInfo)>len(Value) %d > %d", len(alignedInfo), len(alignedValues)) + e.log.Debugf("Len(SampleInfo)>len(Value) %d > %d", len(alignedInfo), len(alignedValues)) break } ts := sample.Timestamp @@ -1121,7 +1145,7 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, // Percentage values must be scaled down by 100. info, ok := metricInfo[name] if !ok { - e.Parent.Log.Errorf("Could not determine unit for %s. Skipping", name) + e.log.Errorf("Could not determine unit for %s. Skipping", name) } v := alignedValues[idx] if info.UnitInfo.GetElementDescription().Key == "percent" { @@ -1136,10 +1160,10 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, count++ // Update highwater marks - e.hwMarks.Put(moid, ts) + e.hwMarks.Put(moid, name, ts) } if nValues == 0 { - e.Parent.Log.Debugf("Missing value for: %s, %s", name, objectRef.name) + e.log.Debugf("Missing value for: %s, %s", name, objectRef.name) continue } } diff --git a/plugins/inputs/vsphere/tscache.go b/plugins/inputs/vsphere/tscache.go index 6e7d00c8b..1be75d760 100644 --- a/plugins/inputs/vsphere/tscache.go +++ b/plugins/inputs/vsphere/tscache.go @@ -10,7 +10,6 @@ import ( type TSCache struct { ttl time.Duration table map[string]time.Time - done chan struct{} mux sync.RWMutex } @@ -19,7 +18,6 @@ func NewTSCache(ttl time.Duration) *TSCache { return &TSCache{ ttl: ttl, table: make(map[string]time.Time), - done: make(chan struct{}), } } @@ -39,10 +37,10 @@ func (t *TSCache) Purge() { // IsNew returns true if the supplied timestamp for the supplied key is more recent than the // timestamp we have on record. -func (t *TSCache) IsNew(key string, tm time.Time) bool { +func (t *TSCache) IsNew(key string, metricName string, tm time.Time) bool { t.mux.RLock() defer t.mux.RUnlock() - v, ok := t.table[key] + v, ok := t.table[makeKey(key, metricName)] if !ok { return true // We've never seen this before, so consider everything a new sample } @@ -50,16 +48,20 @@ func (t *TSCache) IsNew(key string, tm time.Time) bool { } // Get returns a timestamp (if present) -func (t *TSCache) Get(key string) (time.Time, bool) { +func (t *TSCache) Get(key string, metricName string) (time.Time, bool) { t.mux.RLock() defer t.mux.RUnlock() - ts, ok := t.table[key] + ts, ok := t.table[makeKey(key, metricName)] return ts, ok } // Put updates the latest timestamp for the supplied key. -func (t *TSCache) Put(key string, time time.Time) { +func (t *TSCache) Put(key string, metricName string, time time.Time) { t.mux.Lock() defer t.mux.Unlock() - t.table[key] = time + t.table[makeKey(key, metricName)] = time +} + +func makeKey(resource string, metric string) string { + return resource + "|" + metric } diff --git a/plugins/inputs/vsphere/vsphere.go b/plugins/inputs/vsphere/vsphere.go index 141b25599..098c49334 100644 --- a/plugins/inputs/vsphere/vsphere.go +++ b/plugins/inputs/vsphere/vsphere.go @@ -275,7 +275,7 @@ func (v *VSphere) Start(acc telegraf.Accumulator) error { if err != nil { return err } - ep, err := NewEndpoint(ctx, v, u) + ep, err := NewEndpoint(ctx, v, u, v.Log) if err != nil { return err } diff --git a/plugins/inputs/vsphere/vsphere_test.go b/plugins/inputs/vsphere/vsphere_test.go index dce21fa78..fe0dfe41e 100644 --- a/plugins/inputs/vsphere/vsphere_test.go +++ b/plugins/inputs/vsphere/vsphere_test.go @@ -182,7 +182,8 @@ func testAlignUniform(t *testing.T, n int) { } values[i] = 1 } - newInfo, newValues := alignSamples(info, values, 60*time.Second) + e := Endpoint{log: testutil.Logger{}} + newInfo, newValues := e.alignSamples(info, values, 60*time.Second) require.Equal(t, n/3, len(newInfo), "Aligned infos have wrong size") require.Equal(t, n/3, len(newValues), "Aligned values have wrong size") for _, v := range newValues { @@ -207,7 +208,8 @@ func TestAlignMetrics(t *testing.T) { } values[i] = int64(i%3 + 1) } - newInfo, newValues := alignSamples(info, values, 60*time.Second) + e := Endpoint{log: testutil.Logger{}} + newInfo, newValues := e.alignSamples(info, values, 60*time.Second) require.Equal(t, n/3, len(newInfo), "Aligned infos have wrong size") require.Equal(t, n/3, len(newValues), "Aligned values have wrong size") for _, v := range newValues {