From 2d782fbaacf195b0c1d97fe8a173a234e8ee6506 Mon Sep 17 00:00:00 2001 From: Pontus Rydin Date: Tue, 6 Nov 2018 15:22:43 -0700 Subject: [PATCH] Fix potential missing datastore metrics in vSphere plugin (#4968) --- plugins/inputs/vsphere/README.md | 2 +- plugins/inputs/vsphere/client.go | 72 ++++++++++++++++++++++++-- plugins/inputs/vsphere/endpoint.go | 68 +++++++++++++++++++----- plugins/inputs/vsphere/tscache.go | 57 ++++++++++++++++++++ plugins/inputs/vsphere/vsphere.go | 9 +++- plugins/inputs/vsphere/vsphere_test.go | 44 ++++++++++++++-- 6 files changed, 228 insertions(+), 24 deletions(-) create mode 100644 plugins/inputs/vsphere/tscache.go diff --git a/plugins/inputs/vsphere/README.md b/plugins/inputs/vsphere/README.md index b56393345..7ba323bc7 100644 --- a/plugins/inputs/vsphere/README.md +++ b/plugins/inputs/vsphere/README.md @@ -159,7 +159,7 @@ vm_metric_exclude = [ "*" ] # object_discovery_interval = "300s" ## timeout applies to any of the api request made to vcenter - # timeout = "20s" + # timeout = "60s" ## Optional SSL Config # ssl_ca = "/path/to/cafile" diff --git a/plugins/inputs/vsphere/client.go b/plugins/inputs/vsphere/client.go index 5278cd349..ebad2bea7 100644 --- a/plugins/inputs/vsphere/client.go +++ b/plugins/inputs/vsphere/client.go @@ -5,10 +5,13 @@ import ( "crypto/tls" "log" "net/url" + "strconv" + "strings" "sync" "time" "github.com/vmware/govmomi" + "github.com/vmware/govmomi/object" "github.com/vmware/govmomi/performance" "github.com/vmware/govmomi/session" "github.com/vmware/govmomi/view" @@ -17,6 +20,10 @@ import ( "github.com/vmware/govmomi/vim25/soap" ) +// The highest number of metrics we can query for, no matter what settings +// and server say. +const absoluteMaxMetrics = 10000 + // ClientFactory is used to obtain Clients to be used throughout the plugin. Typically, // a single Client is reused across all functions and goroutines, but the client // is periodically recycled to avoid authentication expiration issues. @@ -79,6 +86,8 @@ func (cf *ClientFactory) GetClient(ctx context.Context) (*Client, error) { // NewClient creates a new vSphere client based on the url and setting passed as parameters. func NewClient(ctx context.Context, u *url.URL, vs *VSphere) (*Client, error) { sw := NewStopwatch("connect", u.Host) + defer sw.Stop() + tlsCfg, err := vs.ClientConfig.TLSConfig() if err != nil { return nil, err @@ -147,16 +156,27 @@ func NewClient(ctx context.Context, u *url.URL, vs *VSphere) (*Client, error) { p := performance.NewManager(c.Client) - sw.Stop() - - return &Client{ + client := &Client{ Client: c, Views: m, Root: v, Perf: p, Valid: true, Timeout: vs.Timeout.Duration, - }, nil + } + // Adjust max query size if needed + ctx3, cancel3 := context.WithTimeout(ctx, vs.Timeout.Duration) + defer cancel3() + n, err := client.GetMaxQueryMetrics(ctx3) + if err != nil { + return nil, err + } + log.Printf("D! [input.vsphere] vCenter says max_query_metrics should be %d", n) + if n < vs.MaxQueryMetrics { + log.Printf("W! [input.vsphere] Configured max_query_metrics is %d, but server limits it to %d. Reducing.", vs.MaxQueryMetrics, n) + vs.MaxQueryMetrics = n + } + return client, nil } // Close shuts down a ClientFactory and releases any resources associated with it. @@ -191,3 +211,47 @@ func (c *Client) GetServerTime(ctx context.Context) (time.Time, error) { } return *t, nil } + +// GetMaxQueryMetrics returns the max_query_metrics setting as configured in vCenter +func (c *Client) GetMaxQueryMetrics(ctx context.Context) (int, error) { + ctx, cancel := context.WithTimeout(ctx, c.Timeout) + defer cancel() + + om := object.NewOptionManager(c.Client.Client, *c.Client.Client.ServiceContent.Setting) + res, err := om.Query(ctx, "config.vpxd.stats.maxQueryMetrics") + if err == nil { + if len(res) > 0 { + if s, ok := res[0].GetOptionValue().Value.(string); ok { + v, err := strconv.Atoi(s) + if err == nil { + log.Printf("D! [input.vsphere] vCenter maxQueryMetrics is defined: %d", v) + if v == -1 { + // Whatever the server says, we never ask for more metrics than this. + return absoluteMaxMetrics, nil + } + return v, nil + } + } + // Fall through version-based inference if value isn't usable + } + } else { + log.Println("I! [input.vsphere] Option query for maxQueryMetrics failed. Using default") + } + + // No usable maxQueryMetrics setting. Infer based on version + ver := c.Client.Client.ServiceContent.About.Version + parts := strings.Split(ver, ".") + if len(parts) < 2 { + log.Printf("W! [input.vsphere] vCenter returned an invalid version string: %s. Using default query size=64", ver) + return 64, nil + } + log.Printf("D! [input.vsphere] vCenter version is: %s", ver) + major, err := strconv.Atoi(parts[0]) + if err != nil { + return 0, err + } + if major < 6 || major == 6 && parts[1] == "0" { + return 64, nil + } + return 256, nil +} diff --git a/plugins/inputs/vsphere/endpoint.go b/plugins/inputs/vsphere/endpoint.go index 55444ebf3..dbc67dd95 100644 --- a/plugins/inputs/vsphere/endpoint.go +++ b/plugins/inputs/vsphere/endpoint.go @@ -24,6 +24,8 @@ import ( var isolateLUN = regexp.MustCompile(".*/([^/]+)/?$") +const metricLookback = 3 + // Endpoint is a high-level representation of a connected vCenter endpoint. It is backed by the lower // level Client type. type Endpoint struct { @@ -32,6 +34,7 @@ type Endpoint struct { lastColls map[string]time.Time instanceInfo map[string]resourceInfo resourceKinds map[string]resourceKind + hwMarks *TSCache lun2ds map[string]string discoveryTicker *time.Ticker collectMux sync.RWMutex @@ -96,6 +99,7 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, URL: url, Parent: parent, lastColls: make(map[string]time.Time), + hwMarks: NewTSCache(1 * time.Hour), instanceInfo: make(map[string]resourceInfo), lun2ds: make(map[string]string), initialized: false, @@ -353,8 +357,8 @@ func (e *Endpoint) discover(ctx context.Context) error { // Populate resource objects, and endpoint instance info. for k, res := range e.resourceKinds { log.Printf("D! [input.vsphere] Discovering resources for %s", res.name) - // Need to do this for all resource types even if they are not enabled (but datastore) - if res.enabled || (k != "datastore" && k != "vm") { + // Need to do this for all resource types even if they are not enabled + if res.enabled || k != "vm" { objects, err := res.getObjects(ctx, e, client.Root) if err != nil { return err @@ -416,7 +420,6 @@ func (e *Endpoint) discover(ctx context.Context) error { url := ds.altID m := isolateLUN.FindStringSubmatch(url) if m != nil { - log.Printf("D! [input.vsphere]: LUN: %s", m[1]) l2d[m[1]] = ds.name } } @@ -539,7 +542,6 @@ func getDatastores(ctx context.Context, e *Endpoint, root *view.ContainerView) ( url = info.Url } } - log.Printf("D! [input.vsphere]: DS URL: %s %s", url, r.Name) m[r.ExtensibleManagedObject.Reference().Value] = objectRef{ name: r.Name, ref: r.ExtensibleManagedObject.Reference(), parentRef: r.Parent, altID: url} } @@ -584,10 +586,24 @@ func (e *Endpoint) Collect(ctx context.Context, acc telegraf.Accumulator) error } } } + + // Purge old timestamps from the cache + e.hwMarks.Purge() return nil } func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, now time.Time, latest time.Time) { + maxMetrics := e.Parent.MaxQueryMetrics + if maxMetrics < 1 { + maxMetrics = 1 + } + + // Workaround for vCenter weirdness. Cluster metrics seem to count multiple times + // when checking query size, so keep it at a low value. + // Revisit this when we better understand the reason why vCenter counts it this way! + if res.name == "cluster" && maxMetrics > 10 { + maxMetrics = 10 + } pqs := make([]types.PerfQuerySpec, 0, e.Parent.MaxQueryObjects) metrics := 0 total := 0 @@ -600,7 +616,7 @@ func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, n mr := len(info.metrics) for mr > 0 { mc := mr - headroom := e.Parent.MaxQueryMetrics - metrics + headroom := maxMetrics - metrics if !res.realTime && mc > headroom { // Metric query limit only applies to non-realtime metrics mc = headroom } @@ -610,10 +626,19 @@ func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, n MaxSample: 1, MetricId: info.metrics[fm : fm+mc], IntervalId: res.sampling, + Format: "normal", } + // For non-realtime metrics, we need to look back a few samples in case + // the vCenter is late reporting metrics. if !res.realTime { - pq.StartTime = &latest + pq.MaxSample = metricLookback + } + + // Look back 3 sampling periods + start := latest.Add(time.Duration(-res.sampling) * time.Second * (metricLookback - 1)) + if !res.realTime { + pq.StartTime = &start pq.EndTime = &now } pqs = append(pqs, pq) @@ -623,8 +648,8 @@ func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, n // We need to dump the current chunk of metrics for one of two reasons: // 1) We filled up the metric quota while processing the current resource // 2) We are at the last resource and have no more data to process. - if mr > 0 || (!res.realTime && metrics >= e.Parent.MaxQueryMetrics) || nRes >= e.Parent.MaxQueryObjects { - log.Printf("D! [input.vsphere]: Querying %d objects, %d metrics (%d remaining) of type %s for %s. Processed objects: %d. Total objects %d", + if mr > 0 || (!res.realTime && metrics >= maxMetrics) || nRes >= e.Parent.MaxQueryObjects { + log.Printf("D! [input.vsphere]: Queueing query: %d objects, %d metrics (%d remaining) of type %s for %s. Processed objects: %d. Total objects %d", len(pqs), metrics, mr, res.name, e.URL.Host, total+1, len(res.objects)) // To prevent deadlocks, don't send work items if the context has been cancelled. @@ -646,6 +671,8 @@ func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, n // if len(pqs) > 0 { // Call push function + log.Printf("D! [input.vsphere]: Queuing query: %d objects, %d metrics (0 remaining) of type %s for %s. Total objects %d (final chunk)", + len(pqs), metrics, res.name, e.URL.Host, len(res.objects)) f(ctx, pqs) } } @@ -668,7 +695,7 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc log.Printf("D! [input.vsphere]: Latest: %s, elapsed: %f, resource: %s", latest, elapsed, resourceType) if !res.realTime && elapsed < float64(res.sampling) { // No new data would be available. We're outta herE! [input.vsphere]: - log.Printf("D! [input.vsphere]: Sampling period for %s of %d has not elapsed for %s", + log.Printf("D! [input.vsphere]: Sampling period for %s of %d has not elapsed on %s", resourceType, res.sampling, e.URL.Host) return nil } @@ -679,7 +706,6 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc internalTags := map[string]string{"resourcetype": resourceType} sw := NewStopwatchWithTags("gather_duration", e.URL.Host, internalTags) - log.Printf("D! [input.vsphere]: Start of sample period deemed to be %s", latest) log.Printf("D! [input.vsphere]: Collecting metrics for %d objects of type %s for %s", len(res.objects), resourceType, e.URL.Host) @@ -690,7 +716,7 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc wp.Run(ctx, func(ctx context.Context, in interface{}) interface{} { chunk := in.([]types.PerfQuerySpec) n, err := e.collectChunk(ctx, chunk, resourceType, res, acc) - log.Printf("D! [input.vsphere]: Query returned %d metrics", n) + log.Printf("D! [input.vsphere] CollectChunk for %s returned %d metrics", resourceType, n) if err != nil { return err } @@ -722,7 +748,7 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc sw.Stop() SendInternalCounterWithTags("gather_count", e.URL.Host, internalTags, count) if len(merr) > 0 { - return err + return merr } return nil } @@ -757,6 +783,7 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, if err != nil { return count, err } + log.Printf("D! [input.vsphere] Query for %s returned metrics for %d objects", resourceType, len(ems)) // Iterate through results for _, em := range ems { @@ -783,10 +810,18 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, } e.populateTags(&objectRef, resourceType, &res, t, &v) - // Now deal with the values - for idx, value := range v.Value { + // Now deal with the values. Iterate backwards so we start with the latest value + tsKey := moid + "|" + name + "|" + v.Instance + for idx := len(v.Value) - 1; idx >= 0; idx-- { ts := em.SampleInfo[idx].Timestamp + // Since non-realtime metrics are queries with a lookback, we need to check the high-water mark + // to determine if this should be included. Only samples not seen before should be included. + if !(res.realTime || e.hwMarks.IsNew(tsKey, ts)) { + continue + } + value := v.Value[idx] + // Organize the metrics into a bucket per measurement. // Data SHOULD be presented to us with the same timestamp for all samples, but in case // they don't we use the measurement name + timestamp as the key for the bucket. @@ -813,6 +848,11 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, bucket.fields[fn] = value } count++ + + // Update highwater marks for non-realtime metrics. + if !res.realTime { + e.hwMarks.Put(tsKey, ts) + } } } // We've iterated through all the metrics and collected buckets for each diff --git a/plugins/inputs/vsphere/tscache.go b/plugins/inputs/vsphere/tscache.go new file mode 100644 index 000000000..9abe24ea7 --- /dev/null +++ b/plugins/inputs/vsphere/tscache.go @@ -0,0 +1,57 @@ +package vsphere + +import ( + "log" + "sync" + "time" +) + +// TSCache is a cache of timestamps used to determine the validity of datapoints +type TSCache struct { + ttl time.Duration + table map[string]time.Time + done chan struct{} + mux sync.RWMutex +} + +// NewTSCache creates a new TSCache with a specified time-to-live after which timestamps are discarded. +func NewTSCache(ttl time.Duration) *TSCache { + return &TSCache{ + ttl: ttl, + table: make(map[string]time.Time), + done: make(chan struct{}), + } +} + +// Purge removes timestamps that are older than the time-to-live +func (t *TSCache) Purge() { + t.mux.Lock() + defer t.mux.Unlock() + n := 0 + for k, v := range t.table { + if time.Now().Sub(v) > t.ttl { + delete(t.table, k) + n++ + } + } + log.Printf("D! [input.vsphere] Purged timestamp cache. %d deleted with %d remaining", n, len(t.table)) +} + +// IsNew returns true if the supplied timestamp for the supplied key is more recent than the +// timestamp we have on record. +func (t *TSCache) IsNew(key string, tm time.Time) bool { + t.mux.RLock() + defer t.mux.RUnlock() + v, ok := t.table[key] + if !ok { + return true // We've never seen this before, so consider everything a new sample + } + return !tm.Before(v) +} + +// Put updates the latest timestamp for the supplied key. +func (t *TSCache) Put(key string, time time.Time) { + t.mux.Lock() + defer t.mux.Unlock() + t.table[key] = time +} diff --git a/plugins/inputs/vsphere/vsphere.go b/plugins/inputs/vsphere/vsphere.go index 26af1e8cc..f0bb5dca9 100644 --- a/plugins/inputs/vsphere/vsphere.go +++ b/plugins/inputs/vsphere/vsphere.go @@ -192,7 +192,7 @@ var sampleConfig = ` # object_discovery_interval = "300s" ## timeout applies to any of the api request made to vcenter - # timeout = "20s" + # timeout = "60s" ## Optional SSL Config # ssl_ca = "/path/to/cafile" @@ -260,6 +260,7 @@ func (v *VSphere) Stop() { // Gather is the main data collection function called by the Telegraf core. It performs all // the data collection and writes all metrics into the Accumulator passed as an argument. func (v *VSphere) Gather(acc telegraf.Accumulator) error { + merr := make(multiError, 0) var wg sync.WaitGroup for _, ep := range v.endpoints { wg.Add(1) @@ -273,11 +274,15 @@ func (v *VSphere) Gather(acc telegraf.Accumulator) error { } if err != nil { acc.AddError(err) + merr = append(merr, err) } }(ep) } wg.Wait() + if len(merr) > 0 { + return merr + } return nil } @@ -306,7 +311,7 @@ func init() { DiscoverConcurrency: 1, ForceDiscoverOnInit: false, ObjectDiscoveryInterval: internal.Duration{Duration: time.Second * 300}, - Timeout: internal.Duration{Duration: time.Second * 20}, + Timeout: internal.Duration{Duration: time.Second * 60}, } }) } diff --git a/plugins/inputs/vsphere/vsphere_test.go b/plugins/inputs/vsphere/vsphere_test.go index 3290da2e9..4eb3d28f8 100644 --- a/plugins/inputs/vsphere/vsphere_test.go +++ b/plugins/inputs/vsphere/vsphere_test.go @@ -15,7 +15,9 @@ import ( "github.com/influxdata/telegraf/testutil" "github.com/influxdata/toml" "github.com/stretchr/testify/require" + "github.com/vmware/govmomi/object" "github.com/vmware/govmomi/simulator" + "github.com/vmware/govmomi/vim25/types" ) var configHeader = ` @@ -187,8 +189,6 @@ func createSim() (*simulator.Model, *simulator.Server, error) { model.Service.TLS = new(tls.Config) s := model.Service.NewServer() - //fmt.Printf("Server created at: %s\n", s.URL) - return model, s, nil } @@ -244,13 +244,51 @@ func TestTimeout(t *testing.T) { v.Timeout = internal.Duration{Duration: 1 * time.Nanosecond} require.NoError(t, v.Start(nil)) // We're not using the Accumulator, so it can be nil. defer v.Stop() - require.NoError(t, v.Gather(&acc)) + err = v.Gather(&acc) + require.NotNil(t, err, "Error should not be nil here") // The accumulator must contain exactly one error and it must be a deadline exceeded. require.Equal(t, 1, len(acc.Errors)) require.True(t, strings.Contains(acc.Errors[0].Error(), "context deadline exceeded")) } +func TestMaxQuery(t *testing.T) { + m, s, err := createSim() + if err != nil { + t.Fatal(err) + } + defer m.Remove() + defer s.Close() + + v := defaultVSphere() + v.MaxQueryMetrics = 256 + ctx := context.Background() + c, err := NewClient(ctx, s.URL, v) + if err != nil { + t.Fatal(err) + } + require.Equal(t, 256, v.MaxQueryMetrics) + + om := object.NewOptionManager(c.Client.Client, *c.Client.Client.ServiceContent.Setting) + err = om.Update(ctx, []types.BaseOptionValue{&types.OptionValue{ + Key: "config.vpxd.stats.maxQueryMetrics", + Value: "42", + }}) + if err != nil { + t.Fatal(err) + } + + v.MaxQueryMetrics = 256 + ctx = context.Background() + c2, err := NewClient(ctx, s.URL, v) + if err != nil { + t.Fatal(err) + } + require.Equal(t, 42, v.MaxQueryMetrics) + c.close() + c2.close() +} + func TestAll(t *testing.T) { m, s, err := createSim() if err != nil {