Improve scalability of vsphere input (#5113)

This commit is contained in:
Pontus Rydin
2018-12-28 16:24:43 -05:00
committed by Daniel Nelson
parent 1d6ff4fe4c
commit 78c1ffbf27
10 changed files with 482 additions and 398 deletions

View File

@@ -2,8 +2,10 @@ package vsphere
import (
"context"
"errors"
"fmt"
"log"
"math/rand"
"net/url"
"regexp"
"strconv"
@@ -24,15 +26,19 @@ import (
var isolateLUN = regexp.MustCompile(".*/([^/]+)/?$")
const metricLookback = 3
const metricLookback = 3 // Number of time periods to look back at for non-realtime metrics
const rtMetricLookback = 3 // Number of time periods to look back at for realtime metrics
const maxSampleConst = 10 // Absolute maximim number of samples regardless of period
const maxMetadataSamples = 100 // Number of resources to sample for metric metadata
// Endpoint is a high-level representation of a connected vCenter endpoint. It is backed by the lower
// level Client type.
type Endpoint struct {
Parent *VSphere
URL *url.URL
lastColls map[string]time.Time
instanceInfo map[string]resourceInfo
resourceKinds map[string]resourceKind
hwMarks *TSCache
lun2ds map[string]string
@@ -52,8 +58,14 @@ type resourceKind struct {
sampling int32
objects objectMap
filters filter.Filter
include []string
simple bool
metrics performance.MetricList
collectInstances bool
getObjects func(context.Context, *Endpoint, *view.ContainerView) (objectMap, error)
parent string
getObjects func(context.Context, *Client, *Endpoint, *view.ContainerView) (objectMap, error)
latestSample time.Time
lastColl time.Time
}
type metricEntry struct {
@@ -74,33 +86,22 @@ type objectRef struct {
dcname string
}
type resourceInfo struct {
name string
metrics performance.MetricList
parentRef *types.ManagedObjectReference
func (e *Endpoint) getParent(obj *objectRef, res *resourceKind) (*objectRef, bool) {
if pKind, ok := e.resourceKinds[res.parent]; ok {
if p, ok := pKind.objects[obj.parentRef.Value]; ok {
return &p, true
}
}
return nil, false
}
type metricQRequest struct {
res *resourceKind
obj objectRef
}
type metricQResponse struct {
obj objectRef
metrics *performance.MetricList
}
type multiError []error
// NewEndpoint returns a new connection to a vCenter based on the URL and configuration passed
// as parameters.
func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, error) {
e := Endpoint{
URL: url,
Parent: parent,
lastColls: make(map[string]time.Time),
hwMarks: NewTSCache(1 * time.Hour),
instanceInfo: make(map[string]resourceInfo),
lun2ds: make(map[string]string),
initialized: false,
clientFactory: NewClientFactory(ctx, url, parent),
@@ -116,8 +117,11 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint,
sampling: 300,
objects: make(objectMap),
filters: newFilterOrPanic(parent.DatacenterMetricInclude, parent.DatacenterMetricExclude),
simple: isSimple(parent.DatacenterMetricInclude, parent.DatacenterMetricExclude),
include: parent.DatacenterMetricInclude,
collectInstances: parent.DatacenterInstances,
getObjects: getDatacenters,
parent: "",
},
"cluster": {
name: "cluster",
@@ -128,8 +132,11 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint,
sampling: 300,
objects: make(objectMap),
filters: newFilterOrPanic(parent.ClusterMetricInclude, parent.ClusterMetricExclude),
simple: isSimple(parent.ClusterMetricInclude, parent.ClusterMetricExclude),
include: parent.ClusterMetricInclude,
collectInstances: parent.ClusterInstances,
getObjects: getClusters,
parent: "datacenter",
},
"host": {
name: "host",
@@ -140,8 +147,11 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint,
sampling: 20,
objects: make(objectMap),
filters: newFilterOrPanic(parent.HostMetricInclude, parent.HostMetricExclude),
simple: isSimple(parent.HostMetricInclude, parent.HostMetricExclude),
include: parent.HostMetricInclude,
collectInstances: parent.HostInstances,
getObjects: getHosts,
parent: "cluster",
},
"vm": {
name: "vm",
@@ -152,8 +162,11 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint,
sampling: 20,
objects: make(objectMap),
filters: newFilterOrPanic(parent.VMMetricInclude, parent.VMMetricExclude),
simple: isSimple(parent.VMMetricInclude, parent.VMMetricExclude),
include: parent.VMMetricInclude,
collectInstances: parent.VMInstances,
getObjects: getVMs,
parent: "host",
},
"datastore": {
name: "datastore",
@@ -163,8 +176,11 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint,
sampling: 300,
objects: make(objectMap),
filters: newFilterOrPanic(parent.DatastoreMetricInclude, parent.DatastoreMetricExclude),
simple: isSimple(parent.DatastoreMetricInclude, parent.DatastoreMetricExclude),
include: parent.DatastoreMetricInclude,
collectInstances: parent.DatastoreInstances,
getObjects: getDatastores,
parent: "",
},
}
@@ -174,24 +190,6 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint,
return &e, err
}
func (m multiError) Error() string {
switch len(m) {
case 0:
return "No error recorded. Something is wrong!"
case 1:
return m[0].Error()
default:
s := "Multiple errors detected concurrently: "
for i, e := range m {
if i != 0 {
s += ", "
}
s += e.Error()
}
return s
}
}
func anythingEnabled(ex []string) bool {
for _, s := range ex {
if s == "*" {
@@ -209,6 +207,18 @@ func newFilterOrPanic(include []string, exclude []string) filter.Filter {
return f
}
func isSimple(include []string, exclude []string) bool {
if len(exclude) > 0 || len(include) == 0 {
return false
}
for _, s := range include {
if strings.Contains(s, "*") {
return false
}
}
return true
}
func (e *Endpoint) startDiscovery(ctx context.Context) {
e.discoveryTicker = time.NewTicker(e.Parent.ObjectDiscoveryInterval.Duration)
go func() {
@@ -249,7 +259,9 @@ func (e *Endpoint) init(ctx context.Context) error {
} else {
// Otherwise, just run it in the background. We'll probably have an incomplete first metric
// collection this way.
go e.initalDiscovery(ctx)
go func() {
e.initalDiscovery(ctx)
}()
}
}
e.initialized = true
@@ -262,10 +274,7 @@ func (e *Endpoint) getMetricNameMap(ctx context.Context) (map[int32]string, erro
return nil, err
}
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
mn, err := client.Perf.CounterInfoByName(ctx1)
mn, err := client.CounterInfoByName(ctx)
if err != nil {
return nil, err
}
@@ -276,20 +285,19 @@ func (e *Endpoint) getMetricNameMap(ctx context.Context) (map[int32]string, erro
return names, nil
}
func (e *Endpoint) getMetadata(ctx context.Context, in interface{}) interface{} {
func (e *Endpoint) getMetadata(ctx context.Context, obj objectRef, sampling int32) (performance.MetricList, error) {
client, err := e.clientFactory.GetClient(ctx)
if err != nil {
return err
return nil, err
}
rq := in.(*metricQRequest)
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
metrics, err := client.Perf.AvailableMetric(ctx1, rq.obj.ref.Reference(), rq.res.sampling)
if err != nil && err != context.Canceled {
log.Printf("E! [input.vsphere]: Error while getting metric metadata. Discovery will be incomplete. Error: %s", err)
metrics, err := client.Perf.AvailableMetric(ctx1, obj.ref.Reference(), sampling)
if err != nil {
return nil, err
}
return &metricQResponse{metrics: &metrics, obj: rq.obj}
return metrics, nil
}
func (e *Endpoint) getDatacenterName(ctx context.Context, client *Client, cache map[string]string, r types.ManagedObjectReference) string {
@@ -349,17 +357,17 @@ func (e *Endpoint) discover(ctx context.Context) error {
}
log.Printf("D! [input.vsphere]: Discover new objects for %s", e.URL.Host)
instInfo := make(map[string]resourceInfo)
resourceKinds := make(map[string]resourceKind)
dcNameCache := make(map[string]string)
numRes := int64(0)
// Populate resource objects, and endpoint instance info.
for k, res := range e.resourceKinds {
log.Printf("D! [input.vsphere] Discovering resources for %s", res.name)
// Need to do this for all resource types even if they are not enabled
if res.enabled || k != "vm" {
objects, err := res.getObjects(ctx, e, client.Root)
objects, err := res.getObjects(ctx, client, e, client.Root)
if err != nil {
return err
}
@@ -374,42 +382,19 @@ func (e *Endpoint) discover(ctx context.Context) error {
}
}
// Set up a worker pool for processing metadata queries concurrently
wp := NewWorkerPool(10)
wp.Run(ctx, e.getMetadata, e.Parent.DiscoverConcurrency)
// Fill the input channels with resources that need to be queried
// for metadata.
wp.Fill(ctx, func(ctx context.Context, f PushFunc) {
for _, obj := range objects {
f(ctx, &metricQRequest{obj: obj, res: &res})
// No need to collect metric metadata if resource type is not enabled
if res.enabled {
if res.simple {
e.simpleMetadataSelect(ctx, client, &res)
} else {
e.complexMetadataSelect(ctx, &res, objects, metricNames)
}
})
// Drain the resulting metadata and build instance infos.
wp.Drain(ctx, func(ctx context.Context, in interface{}) bool {
switch resp := in.(type) {
case *metricQResponse:
mList := make(performance.MetricList, 0)
if res.enabled {
for _, m := range *resp.metrics {
if m.Instance != "" && !res.collectInstances {
continue
}
if res.filters.Match(metricNames[m.CounterId]) {
mList = append(mList, m)
}
}
}
instInfo[resp.obj.ref.Value] = resourceInfo{name: resp.obj.name, metrics: mList, parentRef: resp.obj.parentRef}
case error:
log.Printf("W! [input.vsphere]: Error while discovering resources: %s", resp)
return false
}
return true
})
}
res.objects = objects
resourceKinds[k] = res
SendInternalCounterWithTags("discovered_objects", e.URL.Host, map[string]string{"type": res.name}, int64(len(objects)))
numRes += int64(len(objects))
}
}
@@ -428,20 +413,100 @@ func (e *Endpoint) discover(ctx context.Context) error {
e.collectMux.Lock()
defer e.collectMux.Unlock()
e.instanceInfo = instInfo
e.resourceKinds = resourceKinds
e.lun2ds = l2d
sw.Stop()
SendInternalCounter("discovered_objects", e.URL.Host, int64(len(instInfo)))
SendInternalCounterWithTags("discovered_objects", e.URL.Host, map[string]string{"type": "instance-total"}, numRes)
return nil
}
func getDatacenters(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) {
func (e *Endpoint) simpleMetadataSelect(ctx context.Context, client *Client, res *resourceKind) {
log.Printf("D! [input.vsphere] Using fast metric metadata selection for %s", res.name)
m, err := client.CounterInfoByName(ctx)
if err != nil {
log.Printf("E! [input.vsphere]: Error while getting metric metadata. Discovery will be incomplete. Error: %s", err)
return
}
res.metrics = make(performance.MetricList, 0, len(res.include))
for _, s := range res.include {
if pci, ok := m[s]; ok {
cnt := types.PerfMetricId{
CounterId: pci.Key,
}
if res.collectInstances {
cnt.Instance = "*"
} else {
cnt.Instance = ""
}
res.metrics = append(res.metrics, cnt)
} else {
log.Printf("W! [input.vsphere] Metric name %s is unknown. Will not be collected", s)
}
}
}
func (e *Endpoint) complexMetadataSelect(ctx context.Context, res *resourceKind, objects objectMap, metricNames map[int32]string) {
// We're only going to get metadata from maxMetadataSamples resources. If we have
// more resources than that, we pick maxMetadataSamples samples at random.
sampledObjects := make([]objectRef, len(objects))
i := 0
for _, obj := range objects {
sampledObjects[i] = obj
i++
}
n := len(sampledObjects)
if n > maxMetadataSamples {
// Shuffle samples into the maxMetadatSamples positions
for i := 0; i < maxMetadataSamples; i++ {
j := int(rand.Int31n(int32(i + 1)))
t := sampledObjects[i]
sampledObjects[i] = sampledObjects[j]
sampledObjects[j] = t
}
sampledObjects = sampledObjects[0:maxMetadataSamples]
}
instInfoMux := sync.Mutex{}
te := NewThrottledExecutor(e.Parent.DiscoverConcurrency)
for _, obj := range sampledObjects {
func(obj objectRef) {
te.Run(ctx, func() {
metrics, err := e.getMetadata(ctx, obj, res.sampling)
if err != nil {
log.Printf("E! [input.vsphere]: Error while getting metric metadata. Discovery will be incomplete. Error: %s", err)
}
mMap := make(map[string]types.PerfMetricId)
for _, m := range metrics {
if m.Instance != "" && res.collectInstances {
m.Instance = "*"
} else {
m.Instance = ""
}
if res.filters.Match(metricNames[m.CounterId]) {
mMap[strconv.Itoa(int(m.CounterId))+"|"+m.Instance] = m
}
}
log.Printf("D! [input.vsphere] Found %d metrics for %s", len(mMap), obj.name)
instInfoMux.Lock()
defer instInfoMux.Unlock()
if len(mMap) > len(res.metrics) {
res.metrics = make(performance.MetricList, len(mMap))
i := 0
for _, m := range mMap {
res.metrics[i] = m
i++
}
}
})
}(obj)
}
te.Wait()
}
func getDatacenters(ctx context.Context, client *Client, e *Endpoint, root *view.ContainerView) (objectMap, error) {
var resources []mo.Datacenter
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
err := root.Retrieve(ctx1, []string{"Datacenter"}, []string{"name", "parent"}, &resources)
err := client.ListResources(ctx, root, []string{"Datacenter"}, []string{"name", "parent"}, &resources)
if err != nil {
return nil, err
}
@@ -453,11 +518,9 @@ func getDatacenters(ctx context.Context, e *Endpoint, root *view.ContainerView)
return m, nil
}
func getClusters(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) {
func getClusters(ctx context.Context, client *Client, e *Endpoint, root *view.ContainerView) (objectMap, error) {
var resources []mo.ClusterComputeResource
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
err := root.Retrieve(ctx1, []string{"ClusterComputeResource"}, []string{"name", "parent"}, &resources)
err := client.ListResources(ctx, root, []string{"ClusterComputeResource"}, []string{"name", "parent"}, &resources)
if err != nil {
return nil, err
}
@@ -487,9 +550,9 @@ func getClusters(ctx context.Context, e *Endpoint, root *view.ContainerView) (ob
return m, nil
}
func getHosts(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) {
func getHosts(ctx context.Context, client *Client, e *Endpoint, root *view.ContainerView) (objectMap, error) {
var resources []mo.HostSystem
err := root.Retrieve(ctx, []string{"HostSystem"}, []string{"name", "parent"}, &resources)
err := client.ListResources(ctx, root, []string{"HostSystem"}, []string{"name", "parent"}, &resources)
if err != nil {
return nil, err
}
@@ -501,16 +564,17 @@ func getHosts(ctx context.Context, e *Endpoint, root *view.ContainerView) (objec
return m, nil
}
func getVMs(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) {
func getVMs(ctx context.Context, client *Client, e *Endpoint, root *view.ContainerView) (objectMap, error) {
var resources []mo.VirtualMachine
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
err := root.Retrieve(ctx1, []string{"VirtualMachine"}, []string{"name", "runtime.host", "config.guestId", "config.uuid"}, &resources)
err := client.ListResources(ctx, root, []string{"VirtualMachine"}, []string{"name", "runtime.host", "runtime.powerState", "config.guestId", "config.uuid"}, &resources)
if err != nil {
return nil, err
}
m := make(objectMap)
for _, r := range resources {
if r.Runtime.PowerState != "poweredOn" {
continue
}
guest := "unknown"
uuid := ""
// Sometimes Config is unknown and returns a nil pointer
@@ -525,11 +589,9 @@ func getVMs(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectM
return m, nil
}
func getDatastores(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) {
func getDatastores(ctx context.Context, client *Client, e *Endpoint, root *view.ContainerView) (objectMap, error) {
var resources []mo.Datastore
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
err := root.Retrieve(ctx1, []string{"Datastore"}, []string{"name", "parent", "info"}, &resources)
err := client.ListResources(ctx, root, []string{"Datastore"}, []string{"name", "parent", "info"}, &resources)
if err != nil {
return nil, err
}
@@ -555,10 +617,10 @@ func (e *Endpoint) Close() {
// Collect runs a round of data collections as specified in the configuration.
func (e *Endpoint) Collect(ctx context.Context, acc telegraf.Accumulator) error {
// If we never managed to do a discovery, collection will be a no-op. Therefore,
// we need to check that a connection is available, or the collection will
// silently fail.
//
if _, err := e.clientFactory.GetClient(ctx); err != nil {
return err
}
@@ -571,28 +633,41 @@ func (e *Endpoint) Collect(ctx context.Context, acc telegraf.Accumulator) error
}
// If discovery interval is disabled (0), discover on each collection cycle
//
if e.Parent.ObjectDiscoveryInterval.Duration == 0 {
err := e.discover(ctx)
if err != nil {
return err
}
}
var wg sync.WaitGroup
for k, res := range e.resourceKinds {
if res.enabled {
err := e.collectResource(ctx, k, acc)
if err != nil {
return err
}
wg.Add(1)
go func(k string) {
defer wg.Done()
err := e.collectResource(ctx, k, acc)
if err != nil {
acc.AddError(err)
}
}(k)
}
}
wg.Wait()
// Purge old timestamps from the cache
e.hwMarks.Purge()
return nil
}
func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, now time.Time, latest time.Time) {
// Workaround to make sure pqs is a copy of the loop variable and won't change.
func submitChunkJob(ctx context.Context, te *ThrottledExecutor, job func([]types.PerfQuerySpec), pqs []types.PerfQuerySpec) {
te.Run(ctx, func() {
job(pqs)
})
}
func (e *Endpoint) chunkify(ctx context.Context, res *resourceKind, now time.Time, latest time.Time, acc telegraf.Accumulator, job func([]types.PerfQuerySpec)) {
te := NewThrottledExecutor(e.Parent.CollectConcurrency)
maxMetrics := e.Parent.MaxQueryMetrics
if maxMetrics < 1 {
maxMetrics = 1
@@ -609,38 +684,30 @@ func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, n
total := 0
nRes := 0
for _, object := range res.objects {
info, found := e.instanceInfo[object.ref.Value]
if !found {
log.Printf("E! [input.vsphere]: Internal error: Instance info not found for MOID %s", object.ref)
}
mr := len(info.metrics)
mr := len(res.metrics)
for mr > 0 {
mc := mr
headroom := maxMetrics - metrics
if !res.realTime && mc > headroom { // Metric query limit only applies to non-realtime metrics
mc = headroom
}
fm := len(info.metrics) - mr
fm := len(res.metrics) - mr
pq := types.PerfQuerySpec{
Entity: object.ref,
MaxSample: 1,
MetricId: info.metrics[fm : fm+mc],
MaxSample: maxSampleConst,
MetricId: res.metrics[fm : fm+mc],
IntervalId: res.sampling,
Format: "normal",
}
// For non-realtime metrics, we need to look back a few samples in case
// the vCenter is late reporting metrics.
if !res.realTime {
pq.MaxSample = metricLookback
start, ok := e.hwMarks.Get(object.ref.Value)
if !ok {
// Look back 3 sampling periods by default
start = latest.Add(time.Duration(-res.sampling) * time.Second * (metricLookback - 1))
}
pq.StartTime = &start
pq.EndTime = &now
// Look back 3 sampling periods
start := latest.Add(time.Duration(-res.sampling) * time.Second * (metricLookback - 1))
if !res.realTime {
pq.StartTime = &start
pq.EndTime = &now
}
pqs = append(pqs, pq)
mr -= mc
metrics += mc
@@ -648,17 +715,18 @@ func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, n
// We need to dump the current chunk of metrics for one of two reasons:
// 1) We filled up the metric quota while processing the current resource
// 2) We are at the last resource and have no more data to process.
if mr > 0 || (!res.realTime && metrics >= maxMetrics) || nRes >= e.Parent.MaxQueryObjects {
// 3) The query contains more than 100,000 individual metrics
if mr > 0 || nRes >= e.Parent.MaxQueryObjects || len(pqs) > 100000 {
log.Printf("D! [input.vsphere]: Queueing query: %d objects, %d metrics (%d remaining) of type %s for %s. Processed objects: %d. Total objects %d",
len(pqs), metrics, mr, res.name, e.URL.Host, total+1, len(res.objects))
// To prevent deadlocks, don't send work items if the context has been cancelled.
// Don't send work items if the context has been cancelled.
if ctx.Err() == context.Canceled {
return
}
// Call push function
f(ctx, pqs)
// Run collection job
submitChunkJob(ctx, te, job, pqs)
pqs = make([]types.PerfQuerySpec, 0, e.Parent.MaxQueryObjects)
metrics = 0
nRes = 0
@@ -667,19 +735,19 @@ func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, n
total++
nRes++
}
// There may be dangling stuff in the queue. Handle them
//
// Handle final partially filled chunk
if len(pqs) > 0 {
// Call push function
// Run collection job
log.Printf("D! [input.vsphere]: Queuing query: %d objects, %d metrics (0 remaining) of type %s for %s. Total objects %d (final chunk)",
len(pqs), metrics, res.name, e.URL.Host, len(res.objects))
f(ctx, pqs)
submitChunkJob(ctx, te, job, pqs)
}
// Wait for background collection to finish
te.Wait()
}
func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc telegraf.Accumulator) error {
// Do we have new data yet?
res := e.resourceKinds[resourceType]
client, err := e.clientFactory.GetClient(ctx)
if err != nil {
@@ -689,13 +757,23 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc
if err != nil {
return err
}
latest, hasLatest := e.lastColls[resourceType]
if hasLatest {
// Estimate the interval at which we're invoked. Use local time (not server time)
// since this is about how we got invoked locally.
localNow := time.Now()
estInterval := time.Duration(time.Minute)
if !res.lastColl.IsZero() {
estInterval = localNow.Sub(res.lastColl).Truncate(time.Duration(res.sampling) * time.Second)
}
log.Printf("D! [inputs.vsphere] Interval estimated to %s", estInterval)
latest := res.latestSample
if !latest.IsZero() {
elapsed := now.Sub(latest).Seconds() + 5.0 // Allow 5 second jitter.
log.Printf("D! [input.vsphere]: Latest: %s, elapsed: %f, resource: %s", latest, elapsed, resourceType)
log.Printf("D! [inputs.vsphere]: Latest: %s, elapsed: %f, resource: %s", latest, elapsed, resourceType)
if !res.realTime && elapsed < float64(res.sampling) {
// No new data would be available. We're outta herE! [input.vsphere]:
log.Printf("D! [input.vsphere]: Sampling period for %s of %d has not elapsed on %s",
// No new data would be available. We're outta here!
log.Printf("D! [inputs.vsphere]: Sampling period for %s of %d has not elapsed on %s",
resourceType, res.sampling, e.URL.Host)
return nil
}
@@ -706,91 +784,108 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc
internalTags := map[string]string{"resourcetype": resourceType}
sw := NewStopwatchWithTags("gather_duration", e.URL.Host, internalTags)
log.Printf("D! [input.vsphere]: Collecting metrics for %d objects of type %s for %s",
log.Printf("D! [inputs.vsphere]: Collecting metrics for %d objects of type %s for %s",
len(res.objects), resourceType, e.URL.Host)
count := int64(0)
// Set up a worker pool for collecting chunk metrics
wp := NewWorkerPool(10)
wp.Run(ctx, func(ctx context.Context, in interface{}) interface{} {
chunk := in.([]types.PerfQuerySpec)
n, err := e.collectChunk(ctx, chunk, resourceType, res, acc)
log.Printf("D! [input.vsphere] CollectChunk for %s returned %d metrics", resourceType, n)
if err != nil {
return err
}
atomic.AddInt64(&count, int64(n))
return nil
var tsMux sync.Mutex
latestSample := time.Time{}
}, e.Parent.CollectConcurrency)
// Fill the input channel of the worker queue by running the chunking
// logic implemented in chunker()
wp.Fill(ctx, func(ctx context.Context, f PushFunc) {
e.chunker(ctx, f, &res, now, latest)
})
// Drain the pool. We're getting errors back. They should all be nil
var mux sync.Mutex
merr := make(multiError, 0)
wp.Drain(ctx, func(ctx context.Context, in interface{}) bool {
if in != nil {
mux.Lock()
defer mux.Unlock()
merr = append(merr, in.(error))
return false
}
return true
})
e.lastColls[resourceType] = now // Use value captured at the beginning to avoid blind spots.
// Divide workload into chunks and process them concurrently
e.chunkify(ctx, &res, now, latest, acc,
func(chunk []types.PerfQuerySpec) {
n, localLatest, err := e.collectChunk(ctx, chunk, &res, acc, now, estInterval)
log.Printf("D! [inputs.vsphere] CollectChunk for %s returned %d metrics", resourceType, n)
if err != nil {
acc.AddError(errors.New("While collecting " + res.name + ": " + err.Error()))
}
atomic.AddInt64(&count, int64(n))
tsMux.Lock()
defer tsMux.Unlock()
if localLatest.After(latestSample) && !localLatest.IsZero() {
latestSample = localLatest
}
})
log.Printf("D! [inputs.vsphere] Latest sample for %s set to %s", resourceType, latestSample)
if !latestSample.IsZero() {
res.latestSample = latestSample
}
sw.Stop()
SendInternalCounterWithTags("gather_count", e.URL.Host, internalTags, count)
if len(merr) > 0 {
return merr
}
return nil
}
func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, resourceType string,
res resourceKind, acc telegraf.Accumulator) (int, error) {
func alignSamples(info []types.PerfSampleInfo, values []int64, interval time.Duration) ([]types.PerfSampleInfo, []float64) {
rInfo := make([]types.PerfSampleInfo, 0, len(info))
rValues := make([]float64, 0, len(values))
bi := 1.0
var lastBucket time.Time
for idx := range info {
// According to the docs, SampleInfo and Value should have the same length, but we've seen corrupted
// data coming back with missing values. Take care of that gracefully!
if idx >= len(values) {
log.Printf("D! [inputs.vsphere] len(SampleInfo)>len(Value) %d > %d", len(info), len(values))
break
}
v := float64(values[idx])
if v < 0 {
continue
}
ts := info[idx].Timestamp
roundedTs := ts.Truncate(interval)
// Are we still working on the same bucket?
if roundedTs == lastBucket {
bi++
p := len(rValues) - 1
rValues[p] = ((bi-1)/bi)*float64(rValues[p]) + v/bi
} else {
rValues = append(rValues, v)
roundedInfo := types.PerfSampleInfo{
Timestamp: roundedTs,
Interval: info[idx].Interval,
}
rInfo = append(rInfo, roundedInfo)
bi = 1.0
lastBucket = roundedTs
}
}
//log.Printf("D! [inputs.vsphere] Aligned samples: %d collapsed into %d", len(info), len(rInfo))
return rInfo, rValues
}
func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, res *resourceKind, acc telegraf.Accumulator, now time.Time, interval time.Duration) (int, time.Time, error) {
log.Printf("D! [inputs.vsphere] Query for %s has %d QuerySpecs", res.name, len(pqs))
latestSample := time.Time{}
count := 0
resourceType := res.name
prefix := "vsphere" + e.Parent.Separator + resourceType
client, err := e.clientFactory.GetClient(ctx)
if err != nil {
return 0, err
return count, latestSample, err
}
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
metricInfo, err := client.Perf.CounterInfoByName(ctx1)
metricInfo, err := client.CounterInfoByName(ctx)
if err != nil {
return count, err
return count, latestSample, err
}
ctx2, cancel2 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel2()
metrics, err := client.Perf.Query(ctx2, pqs)
ems, err := client.QueryMetrics(ctx, pqs)
if err != nil {
return count, err
return count, latestSample, err
}
ctx3, cancel3 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel3()
ems, err := client.Perf.ToMetricSeries(ctx3, metrics)
if err != nil {
return count, err
}
log.Printf("D! [input.vsphere] Query for %s returned metrics for %d objects", resourceType, len(ems))
log.Printf("D! [inputs.vsphere] Query for %s returned metrics for %d objects", resourceType, len(ems))
// Iterate through results
for _, em := range ems {
moid := em.Entity.Reference().Value
instInfo, found := e.instanceInfo[moid]
instInfo, found := res.objects[moid]
if !found {
log.Printf("E! [input.vsphere]: MOID %s not found in cache. Skipping! (This should not happen!)", moid)
log.Printf("E! [inputs.vsphere]: MOID %s not found in cache. Skipping! (This should not happen!)", moid)
continue
}
buckets := make(map[string]metricEntry)
@@ -805,26 +900,28 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec,
// Populate tags
objectRef, ok := res.objects[moid]
if !ok {
log.Printf("E! [input.vsphere]: MOID %s not found in cache. Skipping", moid)
log.Printf("E! [inputs.vsphere]: MOID %s not found in cache. Skipping", moid)
continue
}
e.populateTags(&objectRef, resourceType, &res, t, &v)
e.populateTags(&objectRef, resourceType, res, t, &v)
// Now deal with the values. Iterate backwards so we start with the latest value
tsKey := moid + "|" + name + "|" + v.Instance
for idx := len(v.Value) - 1; idx >= 0; idx-- {
ts := em.SampleInfo[idx].Timestamp
nValues := 0
alignedInfo, alignedValues := alignSamples(em.SampleInfo, v.Value, interval) // TODO: Estimate interval
// Since non-realtime metrics are queries with a lookback, we need to check the high-water mark
// to determine if this should be included. Only samples not seen before should be included.
if !(res.realTime || e.hwMarks.IsNew(tsKey, ts)) {
continue
for idx, sample := range alignedInfo {
// According to the docs, SampleInfo and Value should have the same length, but we've seen corrupted
// data coming back with missing values. Take care of that gracefully!
if idx >= len(alignedValues) {
log.Printf("D! [inputs.vsphere] len(SampleInfo)>len(Value) %d > %d", len(alignedInfo), len(alignedValues))
break
}
value := v.Value[idx]
ts := sample.Timestamp
if ts.After(latestSample) {
latestSample = ts
}
nValues++
// Organize the metrics into a bucket per measurement.
// Data SHOULD be presented to us with the same timestamp for all samples, but in case
// they don't we use the measurement name + timestamp as the key for the bucket.
mn, fn := e.makeMetricIdentifier(prefix, name)
bKey := mn + " " + v.Instance + " " + strconv.FormatInt(ts.UnixNano(), 10)
bucket, found := buckets[bKey]
@@ -832,27 +929,26 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec,
bucket = metricEntry{name: mn, ts: ts, fields: make(map[string]interface{}), tags: t}
buckets[bKey] = bucket
}
if value < 0 {
log.Printf("D! [input.vsphere]: Negative value for %s on %s. Indicates missing samples", name, objectRef.name)
continue
}
// Percentage values must be scaled down by 100.
info, ok := metricInfo[name]
if !ok {
log.Printf("E! [input.vsphere]: Could not determine unit for %s. Skipping", name)
log.Printf("E! [inputs.vsphere]: Could not determine unit for %s. Skipping", name)
}
v := alignedValues[idx]
if info.UnitInfo.GetElementDescription().Key == "percent" {
bucket.fields[fn] = float64(value) / 100.0
bucket.fields[fn] = float64(v) / 100.0
} else {
bucket.fields[fn] = value
bucket.fields[fn] = v
}
count++
// Update highwater marks for non-realtime metrics.
if !res.realTime {
e.hwMarks.Put(tsKey, ts)
}
// Update highwater marks
e.hwMarks.Put(moid, ts)
}
if nValues == 0 {
log.Printf("D! [inputs.vsphere]: Missing value for: %s, %s", name, objectRef.name)
continue
}
}
// We've iterated through all the metrics and collected buckets for each
@@ -861,17 +957,7 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec,
acc.AddFields(bucket.name, bucket.fields, bucket.tags, bucket.ts)
}
}
return count, nil
}
func (e *Endpoint) getParent(obj resourceInfo) (resourceInfo, bool) {
p := obj.parentRef
if p == nil {
log.Printf("D! [input.vsphere] No parent found for %s", obj.name)
return resourceInfo{}, false
}
r, ok := e.instanceInfo[p.Value]
return r, ok
return count, latestSample, nil
}
func (e *Endpoint) populateTags(objectRef *objectRef, resourceType string, resource *resourceKind, t map[string]string, v *performance.MetricSeries) {
@@ -885,14 +971,14 @@ func (e *Endpoint) populateTags(objectRef *objectRef, resourceType string, resou
}
// Map parent reference
parent, found := e.instanceInfo[objectRef.parentRef.Value]
parent, found := e.getParent(objectRef, resource)
if found {
t[resource.parentTag] = parent.name
if resourceType == "vm" {
if objectRef.guest != "" {
t["guest"] = objectRef.guest
}
if c, ok := e.getParent(parent); ok {
if c, ok := e.resourceKinds["cluster"].objects[parent.parentRef.Value]; ok {
t["clustername"] = c.name
}
}