Add resource path based filtering to vsphere input (#5165)

This commit is contained in:
Pontus Rydin 2019-02-12 17:05:14 -05:00 committed by Daniel Nelson
parent ddf35ddaf3
commit c0bb8625dc
7 changed files with 641 additions and 116 deletions

View File

@ -27,6 +27,7 @@ vm_metric_exclude = [ "*" ]
## VMs
## Typical VM metrics (if omitted or empty, all metrics are collected)
# vm_include = [ "/*/vm/**"] # Inventory path to VMs to collect (by default all are collected)
vm_metric_include = [
"cpu.demand.average",
"cpu.idle.summation",
@ -68,6 +69,7 @@ vm_metric_exclude = [ "*" ]
## Hosts
## Typical host metrics (if omitted or empty, all metrics are collected)
# host_include = [ "/*/host/**"] # Inventory path to hosts to collect (by default all are collected)
host_metric_include = [
"cpu.coreUtilization.average",
"cpu.costop.summation",
@ -120,16 +122,19 @@ vm_metric_exclude = [ "*" ]
# host_instances = true ## true by default
## Clusters
# cluster_include = [ "/*/host/**"] # Inventory path to clusters to collect (by default all are collected)
# cluster_metric_include = [] ## if omitted or empty, all metrics are collected
# cluster_metric_exclude = [] ## Nothing excluded by default
# cluster_instances = false ## false by default
## Datastores
# cluster_include = [ "/*/datastore/**"] # Inventory path to datastores to collect (by default all are collected)
# datastore_metric_include = [] ## if omitted or empty, all metrics are collected
# datastore_metric_exclude = [] ## Nothing excluded by default
# datastore_instances = false ## false by default
## Datacenters
# datacenter_include = [ "/*/host/**"] # Inventory path to clusters to collect (by default all are collected)
datacenter_metric_include = [] ## if omitted or empty, all metrics are collected
datacenter_metric_exclude = [ "*" ] ## Datacenters are not collected by default.
# datacenter_instances = false ## false by default
@ -196,6 +201,48 @@ For setting up concurrency, modify `collect_concurrency` and `discover_concurren
# discover_concurrency = 1
```
### Inventory Paths
Resources to be monitored can be selected using Inventory Paths. This treats the vSphere inventory as a tree structure similar
to a file system. A vSphere inventory has a structure similar to this:
```
<root>
+-DC0 # Virtual datacenter
+-datastore # Datastore folder (created by system)
| +-Datastore1
+-host # Host folder (created by system)
| +-Cluster1
| | +-Host1
| | | +-VM1
| | | +-VM2
| | | +-hadoop1
| +-Host2 # Dummy cluster created for non-clustered host
| | +-Host2
| | | +-VM3
| | | +-VM4
+-vm # VM folder (created by system)
| +-VM1
| +-VM2
| +-Folder1
| | +-hadoop1
| | +-NestedFolder1
| | | +-VM3
| | | +-VM4
```
#### Using Inventory Paths
Using familiar UNIX-style paths, one could select e.g. VM2 with the path ```/DC0/vm/VM2```.
Often, we want to select a group of resource, such as all the VMs in a folder. We could use the path ```/DC0/vm/Folder1/*``` for that.
Another possibility is to select objects using a partial name, such as ```/DC0/vm/Folder1/hadoop*``` yielding all vms in Folder1 with a name starting with "hadoop".
Finally, due to the arbitrary nesting of the folder structure, we need a "recursive wildcard" for traversing multiple folders. We use the "**" symbol for that. If we want to look for a VM with a name starting with "hadoop" in any folder, we could use the following path: ```/DC0/vm/**/hadoop*```
#### Multiple paths to VMs
As we can see from the example tree above, VMs appear both in its on folder under the datacenter, as well as under the hosts. This is useful when you like to select VMs on a specific host. For example, ```/DC0/host/Cluster1/Host1/hadoop*``` selects all VMs with a name starting with "hadoop" that are running on Host1.
We can extend this to looking at a cluster level: ```/DC0/host/Cluster1/*/hadoop*```. This selects any VM matching "hadoop*" on any host in Cluster1.
## Performance Considerations
### Realtime vs. historical metrics

View File

@ -74,7 +74,7 @@ func (cf *ClientFactory) GetClient(ctx context.Context) (*Client, error) {
ctx1, cancel1 := context.WithTimeout(ctx, cf.parent.Timeout.Duration)
defer cancel1()
if _, err := methods.GetCurrentTime(ctx1, cf.client.Client); err != nil {
log.Printf("I! [input.vsphere]: Client session seems to have time out. Reauthenticating!")
log.Printf("I! [inputs.vsphere]: Client session seems to have time out. Reauthenticating!")
ctx2, cancel2 := context.WithTimeout(ctx, cf.parent.Timeout.Duration)
defer cancel2()
if cf.client.Client.SessionManager.Login(ctx2, url.UserPassword(cf.parent.Username, cf.parent.Password)) != nil {
@ -102,7 +102,7 @@ func NewClient(ctx context.Context, u *url.URL, vs *VSphere) (*Client, error) {
u.User = url.UserPassword(vs.Username, vs.Password)
}
log.Printf("D! [input.vsphere]: Creating client: %s", u.Host)
log.Printf("D! [inputs.vsphere]: Creating client: %s", u.Host)
soapClient := soap.NewClient(u, tlsCfg.InsecureSkipVerify)
// Add certificate if we have it. Use it to log us in.
@ -173,9 +173,9 @@ func NewClient(ctx context.Context, u *url.URL, vs *VSphere) (*Client, error) {
if err != nil {
return nil, err
}
log.Printf("D! [input.vsphere] vCenter says max_query_metrics should be %d", n)
log.Printf("D! [inputs.vsphere] vCenter says max_query_metrics should be %d", n)
if n < vs.MaxQueryMetrics {
log.Printf("W! [input.vsphere] Configured max_query_metrics is %d, but server limits it to %d. Reducing.", vs.MaxQueryMetrics, n)
log.Printf("W! [inputs.vsphere] Configured max_query_metrics is %d, but server limits it to %d. Reducing.", vs.MaxQueryMetrics, n)
vs.MaxQueryMetrics = n
}
return client, nil
@ -199,7 +199,7 @@ func (c *Client) close() {
defer cancel()
if c.Client != nil {
if err := c.Client.Logout(ctx); err != nil {
log.Printf("E! [input.vsphere]: Error during logout: %s", err)
log.Printf("E! [inputs.vsphere]: Error during logout: %s", err)
}
}
})
@ -228,7 +228,7 @@ func (c *Client) GetMaxQueryMetrics(ctx context.Context) (int, error) {
if s, ok := res[0].GetOptionValue().Value.(string); ok {
v, err := strconv.Atoi(s)
if err == nil {
log.Printf("D! [input.vsphere] vCenter maxQueryMetrics is defined: %d", v)
log.Printf("D! [inputs.vsphere] vCenter maxQueryMetrics is defined: %d", v)
if v == -1 {
// Whatever the server says, we never ask for more metrics than this.
return absoluteMaxMetrics, nil
@ -239,17 +239,17 @@ func (c *Client) GetMaxQueryMetrics(ctx context.Context) (int, error) {
// Fall through version-based inference if value isn't usable
}
} else {
log.Println("D! [input.vsphere] Option query for maxQueryMetrics failed. Using default")
log.Println("D! [inputs.vsphere] Option query for maxQueryMetrics failed. Using default")
}
// No usable maxQueryMetrics setting. Infer based on version
ver := c.Client.Client.ServiceContent.About.Version
parts := strings.Split(ver, ".")
if len(parts) < 2 {
log.Printf("W! [input.vsphere] vCenter returned an invalid version string: %s. Using default query size=64", ver)
log.Printf("W! [inputs.vsphere] vCenter returned an invalid version string: %s. Using default query size=64", ver)
return 64, nil
}
log.Printf("D! [input.vsphere] vCenter version is: %s", ver)
log.Printf("D! [inputs.vsphere] vCenter version is: %s", ver)
major, err := strconv.Atoi(parts[0])
if err != nil {
return 0, err

View File

@ -19,7 +19,6 @@ import (
"github.com/influxdata/telegraf"
"github.com/vmware/govmomi/object"
"github.com/vmware/govmomi/performance"
"github.com/vmware/govmomi/view"
"github.com/vmware/govmomi/vim25/mo"
"github.com/vmware/govmomi/vim25/types"
)
@ -51,6 +50,7 @@ type Endpoint struct {
type resourceKind struct {
name string
vcName string
pKey string
parentTag string
enabled bool
@ -58,12 +58,13 @@ type resourceKind struct {
sampling int32
objects objectMap
filters filter.Filter
paths []string
collectInstances bool
getObjects func(context.Context, *Endpoint, *ResourceFilter) (objectMap, error)
include []string
simple bool
metrics performance.MetricList
collectInstances bool
parent string
getObjects func(context.Context, *Client, *Endpoint, *view.ContainerView) (objectMap, error)
latestSample time.Time
lastColl time.Time
}
@ -110,6 +111,7 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint,
e.resourceKinds = map[string]*resourceKind{
"datacenter": {
name: "datacenter",
vcName: "Datacenter",
pKey: "dcname",
parentTag: "",
enabled: anythingEnabled(parent.DatacenterMetricExclude),
@ -117,6 +119,7 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint,
sampling: 300,
objects: make(objectMap),
filters: newFilterOrPanic(parent.DatacenterMetricInclude, parent.DatacenterMetricExclude),
paths: parent.DatacenterInclude,
simple: isSimple(parent.DatacenterMetricInclude, parent.DatacenterMetricExclude),
include: parent.DatacenterMetricInclude,
collectInstances: parent.DatacenterInstances,
@ -125,6 +128,7 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint,
},
"cluster": {
name: "cluster",
vcName: "ClusterComputeResource",
pKey: "clustername",
parentTag: "dcname",
enabled: anythingEnabled(parent.ClusterMetricExclude),
@ -132,6 +136,7 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint,
sampling: 300,
objects: make(objectMap),
filters: newFilterOrPanic(parent.ClusterMetricInclude, parent.ClusterMetricExclude),
paths: parent.ClusterInclude,
simple: isSimple(parent.ClusterMetricInclude, parent.ClusterMetricExclude),
include: parent.ClusterMetricInclude,
collectInstances: parent.ClusterInstances,
@ -140,6 +145,7 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint,
},
"host": {
name: "host",
vcName: "HostSystem",
pKey: "esxhostname",
parentTag: "clustername",
enabled: anythingEnabled(parent.HostMetricExclude),
@ -147,6 +153,7 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint,
sampling: 20,
objects: make(objectMap),
filters: newFilterOrPanic(parent.HostMetricInclude, parent.HostMetricExclude),
paths: parent.HostInclude,
simple: isSimple(parent.HostMetricInclude, parent.HostMetricExclude),
include: parent.HostMetricInclude,
collectInstances: parent.HostInstances,
@ -155,6 +162,7 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint,
},
"vm": {
name: "vm",
vcName: "VirtualMachine",
pKey: "vmname",
parentTag: "esxhostname",
enabled: anythingEnabled(parent.VMMetricExclude),
@ -162,6 +170,7 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint,
sampling: 20,
objects: make(objectMap),
filters: newFilterOrPanic(parent.VMMetricInclude, parent.VMMetricExclude),
paths: parent.VMInclude,
simple: isSimple(parent.VMMetricInclude, parent.VMMetricExclude),
include: parent.VMMetricInclude,
collectInstances: parent.VMInstances,
@ -170,12 +179,14 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint,
},
"datastore": {
name: "datastore",
vcName: "Datastore",
pKey: "dsname",
enabled: anythingEnabled(parent.DatastoreMetricExclude),
realTime: false,
sampling: 300,
objects: make(objectMap),
filters: newFilterOrPanic(parent.DatastoreMetricInclude, parent.DatastoreMetricExclude),
paths: parent.DatastoreInclude,
simple: isSimple(parent.DatastoreMetricInclude, parent.DatastoreMetricExclude),
include: parent.DatastoreMetricInclude,
collectInstances: parent.DatastoreInstances,
@ -227,10 +238,10 @@ func (e *Endpoint) startDiscovery(ctx context.Context) {
case <-e.discoveryTicker.C:
err := e.discover(ctx)
if err != nil && err != context.Canceled {
log.Printf("E! [input.vsphere]: Error in discovery for %s: %v", e.URL.Host, err)
log.Printf("E! [inputs.vsphere]: Error in discovery for %s: %v", e.URL.Host, err)
}
case <-ctx.Done():
log.Printf("D! [input.vsphere]: Exiting discovery goroutine for %s", e.URL.Host)
log.Printf("D! [inputs.vsphere]: Exiting discovery goroutine for %s", e.URL.Host)
e.discoveryTicker.Stop()
return
}
@ -241,7 +252,7 @@ func (e *Endpoint) startDiscovery(ctx context.Context) {
func (e *Endpoint) initalDiscovery(ctx context.Context) {
err := e.discover(ctx)
if err != nil && err != context.Canceled {
log.Printf("E! [input.vsphere]: Error in discovery for %s: %v", e.URL.Host, err)
log.Printf("E! [inputs.vsphere]: Error in discovery for %s: %v", e.URL.Host, err)
}
e.startDiscovery(ctx)
}
@ -254,7 +265,7 @@ func (e *Endpoint) init(ctx context.Context) error {
// goroutine without waiting for it. This will probably cause us to report an empty
// dataset on the first collection, but it solves the issue of the first collection timing out.
if e.Parent.ForceDiscoverOnInit {
log.Printf("D! [input.vsphere]: Running initial discovery and waiting for it to finish")
log.Printf("D! [inputs.vsphere]: Running initial discovery and waiting for it to finish")
e.initalDiscovery(ctx)
} else {
// Otherwise, just run it in the background. We'll probably have an incomplete first metric
@ -317,7 +328,7 @@ func (e *Endpoint) getDatacenterName(ctx context.Context, client *Client, cache
defer cancel1()
err := o.Properties(ctx1, here, []string{"parent", "name"}, &result)
if err != nil {
log.Printf("W! [input.vsphere]: Error while resolving parent. Assuming no parent exists. Error: %s", err)
log.Printf("W! [inputs.vsphere]: Error while resolving parent. Assuming no parent exists. Error: %s", err)
break
}
if result.Reference().Type == "Datacenter" {
@ -326,7 +337,7 @@ func (e *Endpoint) getDatacenterName(ctx context.Context, client *Client, cache
break
}
if result.Parent == nil {
log.Printf("D! [input.vsphere]: No parent found for %s (ascending from %s)", here.Reference(), r.Reference())
log.Printf("D! [inputs.vsphere]: No parent found for %s (ascending from %s)", here.Reference(), r.Reference())
break
}
here = result.Parent.Reference()
@ -356,7 +367,7 @@ func (e *Endpoint) discover(ctx context.Context) error {
return err
}
log.Printf("D! [input.vsphere]: Discover new objects for %s", e.URL.Host)
log.Printf("D! [inputs.vsphere]: Discover new objects for %s", e.URL.Host)
resourceKinds := make(map[string]resourceKind)
dcNameCache := make(map[string]string)
@ -365,10 +376,17 @@ func (e *Endpoint) discover(ctx context.Context) error {
// Populate resource objects, and endpoint instance info.
newObjects := make(map[string]objectMap)
for k, res := range e.resourceKinds {
log.Printf("D! [input.vsphere] Discovering resources for %s", res.name)
log.Printf("D! [inputs.vsphere] Discovering resources for %s", res.name)
// Need to do this for all resource types even if they are not enabled
if res.enabled || k != "vm" {
objects, err := res.getObjects(ctx, client, e, client.Root)
rf := ResourceFilter{
finder: &Finder{client},
resType: res.vcName,
paths: res.paths}
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
objects, err := res.getObjects(ctx1, e, &rf)
if err != nil {
return err
}
@ -424,10 +442,10 @@ func (e *Endpoint) discover(ctx context.Context) error {
}
func (e *Endpoint) simpleMetadataSelect(ctx context.Context, client *Client, res *resourceKind) {
log.Printf("D! [input.vsphere] Using fast metric metadata selection for %s", res.name)
log.Printf("D! [inputs.vsphere] Using fast metric metadata selection for %s", res.name)
m, err := client.CounterInfoByName(ctx)
if err != nil {
log.Printf("E! [input.vsphere]: Error while getting metric metadata. Discovery will be incomplete. Error: %s", err)
log.Printf("E! [inputs.vsphere]: Error while getting metric metadata. Discovery will be incomplete. Error: %s", err)
return
}
res.metrics = make(performance.MetricList, 0, len(res.include))
@ -443,7 +461,7 @@ func (e *Endpoint) simpleMetadataSelect(ctx context.Context, client *Client, res
}
res.metrics = append(res.metrics, cnt)
} else {
log.Printf("W! [input.vsphere] Metric name %s is unknown. Will not be collected", s)
log.Printf("W! [inputs.vsphere] Metric name %s is unknown. Will not be collected", s)
}
}
}
@ -476,7 +494,7 @@ func (e *Endpoint) complexMetadataSelect(ctx context.Context, res *resourceKind,
te.Run(ctx, func() {
metrics, err := e.getMetadata(ctx, obj, res.sampling)
if err != nil {
log.Printf("E! [input.vsphere]: Error while getting metric metadata. Discovery will be incomplete. Error: %s", err)
log.Printf("E! [inputs.vsphere]: Error while getting metric metadata. Discovery will be incomplete. Error: %s", err)
}
mMap := make(map[string]types.PerfMetricId)
for _, m := range metrics {
@ -489,7 +507,7 @@ func (e *Endpoint) complexMetadataSelect(ctx context.Context, res *resourceKind,
mMap[strconv.Itoa(int(m.CounterId))+"|"+m.Instance] = m
}
}
log.Printf("D! [input.vsphere] Found %d metrics for %s", len(mMap), obj.name)
log.Printf("D! [inputs.vsphere] Found %d metrics for %s", len(mMap), obj.name)
instInfoMux.Lock()
defer instInfoMux.Unlock()
if len(mMap) > len(res.metrics) {
@ -506,9 +524,11 @@ func (e *Endpoint) complexMetadataSelect(ctx context.Context, res *resourceKind,
te.Wait()
}
func getDatacenters(ctx context.Context, client *Client, e *Endpoint, root *view.ContainerView) (objectMap, error) {
func getDatacenters(ctx context.Context, e *Endpoint, filter *ResourceFilter) (objectMap, error) {
var resources []mo.Datacenter
err := client.ListResources(ctx, root, []string{"Datacenter"}, []string{"name", "parent"}, &resources)
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
err := filter.FindAll(ctx1, &resources)
if err != nil {
return nil, err
}
@ -520,9 +540,11 @@ func getDatacenters(ctx context.Context, client *Client, e *Endpoint, root *view
return m, nil
}
func getClusters(ctx context.Context, client *Client, e *Endpoint, root *view.ContainerView) (objectMap, error) {
func getClusters(ctx context.Context, e *Endpoint, filter *ResourceFilter) (objectMap, error) {
var resources []mo.ClusterComputeResource
err := client.ListResources(ctx, root, []string{"ClusterComputeResource"}, []string{"name", "parent"}, &resources)
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
err := filter.FindAll(ctx1, &resources)
if err != nil {
return nil, err
}
@ -532,13 +554,19 @@ func getClusters(ctx context.Context, client *Client, e *Endpoint, root *view.Co
// We're not interested in the immediate parent (a folder), but the data center.
p, ok := cache[r.Parent.Value]
if !ok {
o := object.NewFolder(root.Client(), *r.Parent)
var folder mo.Folder
ctx2, cancel2 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel2()
err := o.Properties(ctx2, *r.Parent, []string{"parent"}, &folder)
client, err := e.clientFactory.GetClient(ctx2)
if err != nil {
log.Printf("W! [input.vsphere] Error while getting folder parent: %e", err)
return nil, err
}
o := object.NewFolder(client.Client.Client, *r.Parent)
var folder mo.Folder
ctx3, cancel3 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel3()
err = o.Properties(ctx3, *r.Parent, []string{"parent"}, &folder)
if err != nil {
log.Printf("W! [inputs.vsphere] Error while getting folder parent: %e", err)
p = nil
} else {
pp := folder.Parent.Reference()
@ -552,9 +580,9 @@ func getClusters(ctx context.Context, client *Client, e *Endpoint, root *view.Co
return m, nil
}
func getHosts(ctx context.Context, client *Client, e *Endpoint, root *view.ContainerView) (objectMap, error) {
func getHosts(ctx context.Context, e *Endpoint, filter *ResourceFilter) (objectMap, error) {
var resources []mo.HostSystem
err := client.ListResources(ctx, root, []string{"HostSystem"}, []string{"name", "parent"}, &resources)
err := filter.FindAll(ctx, &resources)
if err != nil {
return nil, err
}
@ -566,9 +594,11 @@ func getHosts(ctx context.Context, client *Client, e *Endpoint, root *view.Conta
return m, nil
}
func getVMs(ctx context.Context, client *Client, e *Endpoint, root *view.ContainerView) (objectMap, error) {
func getVMs(ctx context.Context, e *Endpoint, filter *ResourceFilter) (objectMap, error) {
var resources []mo.VirtualMachine
err := client.ListResources(ctx, root, []string{"VirtualMachine"}, []string{"name", "runtime.host", "runtime.powerState", "config.guestId", "config.uuid"}, &resources)
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
err := filter.FindAll(ctx1, &resources)
if err != nil {
return nil, err
}
@ -591,9 +621,11 @@ func getVMs(ctx context.Context, client *Client, e *Endpoint, root *view.Contain
return m, nil
}
func getDatastores(ctx context.Context, client *Client, e *Endpoint, root *view.ContainerView) (objectMap, error) {
func getDatastores(ctx context.Context, e *Endpoint, filter *ResourceFilter) (objectMap, error) {
var resources []mo.Datastore
err := client.ListResources(ctx, root, []string{"Datastore"}, []string{"name", "parent", "info"}, &resources)
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
err := filter.FindAll(ctx1, &resources)
if err != nil {
return nil, err
}
@ -710,6 +742,14 @@ func (e *Endpoint) chunkify(ctx context.Context, res *resourceKind, now time.Tim
pq.StartTime = &start
pq.EndTime = &now
// Make sure endtime is always after start time. We may occasionally see samples from the future
// returned from vCenter. This is presumably due to time drift between vCenter and EXSi nodes.
if pq.StartTime.After(*pq.EndTime) {
log.Printf("D! [inputs.vsphere] Future sample. Res: %s, StartTime: %s, EndTime: %s, Now: %s", pq.Entity, *pq.StartTime, *pq.EndTime, now)
end := start.Add(time.Second)
pq.EndTime = &end
}
pqs = append(pqs, pq)
mr -= mc
metrics += mc
@ -719,7 +759,7 @@ func (e *Endpoint) chunkify(ctx context.Context, res *resourceKind, now time.Tim
// 2) We are at the last resource and have no more data to process.
// 3) The query contains more than 100,000 individual metrics
if mr > 0 || nRes >= e.Parent.MaxQueryObjects || len(pqs) > 100000 {
log.Printf("D! [input.vsphere]: Queueing query: %d objects, %d metrics (%d remaining) of type %s for %s. Processed objects: %d. Total objects %d",
log.Printf("D! [inputs.vsphere]: Queueing query: %d objects, %d metrics (%d remaining) of type %s for %s. Processed objects: %d. Total objects %d",
len(pqs), metrics, mr, res.name, e.URL.Host, total+1, len(res.objects))
// Don't send work items if the context has been cancelled.
@ -740,7 +780,7 @@ func (e *Endpoint) chunkify(ctx context.Context, res *resourceKind, now time.Tim
// Handle final partially filled chunk
if len(pqs) > 0 {
// Run collection job
log.Printf("D! [input.vsphere]: Queuing query: %d objects, %d metrics (0 remaining) of type %s for %s. Total objects %d (final chunk)",
log.Printf("D! [inputs.vsphere]: Queuing query: %d objects, %d metrics (0 remaining) of type %s for %s. Total objects %d (final chunk)",
len(pqs), metrics, res.name, e.URL.Host, len(res.objects))
submitChunkJob(ctx, te, job, pqs)
}

View File

@ -0,0 +1,241 @@
package vsphere
import (
"context"
"log"
"reflect"
"strings"
"github.com/vmware/govmomi/property"
"github.com/vmware/govmomi/view"
"github.com/vmware/govmomi/vim25/mo"
"github.com/vmware/govmomi/vim25/types"
)
var childTypes map[string][]string
var addFields map[string][]string
// Finder allows callers to find resources in vCenter given a query string.
type Finder struct {
client *Client
}
// ResourceFilter is a convenience class holding a finder and a set of paths. It is useful when you need a
// self contained object capable of returning a certain set of resources.
type ResourceFilter struct {
finder *Finder
resType string
paths []string
}
type nameAndRef struct {
name string
ref types.ManagedObjectReference
}
// FindAll returns the union of resources found given the supplied resource type and paths.
func (f *Finder) FindAll(ctx context.Context, resType string, paths []string, dst interface{}) error {
for _, p := range paths {
if err := f.Find(ctx, resType, p, dst); err != nil {
return err
}
}
return nil
}
// Find returns the resources matching the specified path.
func (f *Finder) Find(ctx context.Context, resType, path string, dst interface{}) error {
p := strings.Split(path, "/")
flt := make([]property.Filter, len(p)-1)
for i := 1; i < len(p); i++ {
flt[i-1] = property.Filter{"name": p[i]}
}
objs := make(map[string]types.ObjectContent)
err := f.descend(ctx, f.client.Client.ServiceContent.RootFolder, resType, flt, 0, objs)
if err != nil {
return err
}
objectContentToTypedArray(objs, dst)
log.Printf("D! [inputs.vsphere] Find(%s, %s) returned %d objects", resType, path, len(objs))
return nil
}
func (f *Finder) descend(ctx context.Context, root types.ManagedObjectReference, resType string,
tokens []property.Filter, pos int, objs map[string]types.ObjectContent) error {
isLeaf := pos == len(tokens)-1
// No more tokens to match?
if pos >= len(tokens) {
return nil
}
// Determine child types
ct, ok := childTypes[root.Reference().Type]
if !ok {
// We don't know how to handle children of this type. Stop descending.
return nil
}
m := view.NewManager(f.client.Client.Client)
defer m.Destroy(ctx)
v, err := m.CreateContainerView(ctx, root, ct, false)
if err != nil {
return err
}
defer v.Destroy(ctx)
var content []types.ObjectContent
fields := []string{"name"}
if isLeaf {
// Special case: The last token is a recursive wildcard, so we can grab everything
// recursively in a single call.
if tokens[pos]["name"] == "**" {
v2, err := m.CreateContainerView(ctx, root, []string{resType}, true)
defer v2.Destroy(ctx)
if af, ok := addFields[resType]; ok {
fields = append(fields, af...)
}
err = v2.Retrieve(ctx, []string{resType}, fields, &content)
if err != nil {
return err
}
for _, c := range content {
objs[c.Obj.String()] = c
}
return nil
}
if af, ok := addFields[resType]; ok {
fields = append(fields, af...)
}
err = v.Retrieve(ctx, []string{resType}, fields, &content)
if err != nil {
return err
}
} else {
err = v.Retrieve(ctx, ct, fields, &content)
if err != nil {
return err
}
}
for _, c := range content {
if !tokens[pos].MatchPropertyList(c.PropSet[:1]) {
continue
}
// Already been here through another path? Skip!
if _, ok := objs[root.Reference().String()]; ok {
continue
}
if c.Obj.Type == resType && isLeaf {
// We found what we're looking for. Consider it a leaf and stop descending
objs[c.Obj.String()] = c
continue
}
// Deal with recursive wildcards (**)
inc := 1 // Normally we advance one token.
if tokens[pos]["name"] == "**" {
if isLeaf {
inc = 0 // Can't advance past last token, so keep descending the tree
} else {
// Lookahead to next token. If it matches this child, we are out of
// the recursive wildcard handling and we can advance TWO tokens ahead, since
// the token that ended the recursive wildcard mode is now consumed.
if tokens[pos+1].MatchPropertyList(c.PropSet) {
if pos < len(tokens)-2 {
inc = 2
} else {
// We found match and it's at a leaf! Grab it!
objs[c.Obj.String()] = c
continue
}
} else {
// We didn't break out of recursicve wildcard mode yet, so stay on this token.
inc = 0
}
}
}
err := f.descend(ctx, c.Obj, resType, tokens, pos+inc, objs)
if err != nil {
return err
}
}
return nil
}
func nameFromObjectContent(o types.ObjectContent) string {
for _, p := range o.PropSet {
if p.Name == "name" {
return p.Val.(string)
}
}
return "<unknown>"
}
func objectContentToTypedArray(objs map[string]types.ObjectContent, dst interface{}) error {
rt := reflect.TypeOf(dst)
if rt == nil || rt.Kind() != reflect.Ptr {
panic("need pointer")
}
rv := reflect.ValueOf(dst).Elem()
if !rv.CanSet() {
panic("cannot set dst")
}
for _, p := range objs {
v, err := mo.ObjectContentToType(p)
if err != nil {
return err
}
vt := reflect.TypeOf(v)
if !rv.Type().AssignableTo(vt) {
// For example: dst is []ManagedEntity, res is []HostSystem
if field, ok := vt.FieldByName(rt.Elem().Elem().Name()); ok && field.Anonymous {
rv.Set(reflect.Append(rv, reflect.ValueOf(v).FieldByIndex(field.Index)))
continue
}
}
rv.Set(reflect.Append(rv, reflect.ValueOf(v)))
}
return nil
}
// FindAll finds all resources matching the paths that were specified upon creation of
// the ResourceFilter.
func (r *ResourceFilter) FindAll(ctx context.Context, dst interface{}) error {
return r.finder.FindAll(ctx, r.resType, r.paths, dst)
}
func init() {
childTypes = map[string][]string{
"HostSystem": {"VirtualMachine"},
"ComputeResource": {"HostSystem", "ResourcePool"},
"ClusterComputeResource": {"HostSystem", "ResourcePool"},
"Datacenter": {"Folder"},
"Folder": {
"Folder",
"Datacenter",
"VirtualMachine",
"ComputeResource",
"ClusterComputeResource",
"Datastore",
},
}
addFields = map[string][]string{
"HostSystem": {"parent"},
"VirtualMachine": {"runtime.host", "config.guestId", "config.uuid", "runtime.powerState"},
"Datastore": {"parent", "info"},
"ClusterComputeResource": {"parent"},
"Datacenter": {"parent"},
}
}

View File

@ -34,7 +34,7 @@ func (t *TSCache) Purge() {
n++
}
}
log.Printf("D! [input.vsphere] Purged timestamp cache. %d deleted with %d remaining", n, len(t.table))
log.Printf("D! [inputs.vsphere] Purged timestamp cache. %d deleted with %d remaining", n, len(t.table))
}
// IsNew returns true if the supplied timestamp for the supplied key is more recent than the

View File

@ -22,18 +22,23 @@ type VSphere struct {
DatacenterInstances bool
DatacenterMetricInclude []string
DatacenterMetricExclude []string
DatacenterInclude []string
ClusterInstances bool
ClusterMetricInclude []string
ClusterMetricExclude []string
ClusterInclude []string
HostInstances bool
HostMetricInclude []string
HostMetricExclude []string
HostInclude []string
VMInstances bool `toml:"vm_instances"`
VMMetricInclude []string `toml:"vm_metric_include"`
VMMetricExclude []string `toml:"vm_metric_exclude"`
VMInclude []string `toml:"vm_include"`
DatastoreInstances bool
DatastoreMetricInclude []string
DatastoreMetricExclude []string
DatastoreInclude []string
Separator string
MaxQueryObjects int
@ -216,7 +221,7 @@ func (v *VSphere) Description() string {
// Start is called from telegraf core when a plugin is started and allows it to
// perform initialization tasks.
func (v *VSphere) Start(acc telegraf.Accumulator) error {
log.Println("D! [input.vsphere]: Starting plugin")
log.Println("D! [inputs.vsphere]: Starting plugin")
ctx, cancel := context.WithCancel(context.Background())
v.cancel = cancel
@ -239,7 +244,7 @@ func (v *VSphere) Start(acc telegraf.Accumulator) error {
// Stop is called from telegraf core when a plugin is stopped and allows it to
// perform shutdown tasks.
func (v *VSphere) Stop() {
log.Println("D! [input.vsphere]: Stopping plugin")
log.Println("D! [inputs.vsphere]: Stopping plugin")
v.cancel()
// Wait for all endpoints to finish. No need to wait for
@ -248,7 +253,7 @@ func (v *VSphere) Stop() {
// wait for any discovery to complete by trying to grab the
// "busy" mutex.
for _, ep := range v.endpoints {
log.Printf("D! [input.vsphere]: Waiting for endpoint %s to finish", ep.URL.Host)
log.Printf("D! [inputs.vsphere]: Waiting for endpoint %s to finish", ep.URL.Host)
func() {
ep.busy.Lock() // Wait until discovery is finished
defer ep.busy.Unlock()
@ -286,18 +291,26 @@ func init() {
return &VSphere{
Vcenters: []string{},
DatacenterInstances: false,
DatacenterMetricInclude: nil,
DatacenterMetricExclude: nil,
DatacenterInclude: []string{"/*"},
ClusterInstances: false,
ClusterMetricInclude: nil,
ClusterMetricExclude: nil,
ClusterInclude: []string{"/*/host/**"},
HostInstances: true,
HostMetricInclude: nil,
HostMetricExclude: nil,
HostInclude: []string{"/*/host/**"},
VMInstances: true,
VMMetricInclude: nil,
VMMetricExclude: nil,
VMInclude: []string{"/*/vm/**"},
DatastoreInstances: false,
DatastoreMetricInclude: nil,
DatastoreMetricExclude: nil,
DatastoreInclude: []string{"/*/datastore/**"},
Separator: "_",
MaxQueryObjects: 256,

View File

@ -20,6 +20,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/vmware/govmomi/object"
"github.com/vmware/govmomi/simulator"
"github.com/vmware/govmomi/vim25/mo"
"github.com/vmware/govmomi/vim25/types"
)
@ -112,69 +113,105 @@ func defaultVSphere() *VSphere {
"mem.usage.*",
"mem.active.*"},
ClusterMetricExclude: nil,
ClusterInclude: []string{"/**"},
HostMetricInclude: []string{
"cpu.ready.summation.delta.millisecond",
"cpu.latency.average.rate.percent",
"cpu.coreUtilization.average.rate.percent",
"mem.usage.average.absolute.percent",
"mem.swapinRate.average.rate.kiloBytesPerSecond",
"mem.state.latest.absolute.number",
"mem.latency.average.absolute.percent",
"mem.vmmemctl.average.absolute.kiloBytes",
"disk.read.average.rate.kiloBytesPerSecond",
"disk.write.average.rate.kiloBytesPerSecond",
"disk.numberReadAveraged.average.rate.number",
"disk.numberWriteAveraged.average.rate.number",
"disk.deviceReadLatency.average.absolute.millisecond",
"disk.deviceWriteLatency.average.absolute.millisecond",
"disk.totalReadLatency.average.absolute.millisecond",
"disk.totalWriteLatency.average.absolute.millisecond",
"storageAdapter.read.average.rate.kiloBytesPerSecond",
"storageAdapter.write.average.rate.kiloBytesPerSecond",
"storageAdapter.numberReadAveraged.average.rate.number",
"storageAdapter.numberWriteAveraged.average.rate.number",
"net.errorsRx.summation.delta.number",
"net.errorsTx.summation.delta.number",
"net.bytesRx.average.rate.kiloBytesPerSecond",
"net.bytesTx.average.rate.kiloBytesPerSecond",
"cpu.used.summation.delta.millisecond",
"cpu.usage.average.rate.percent",
"cpu.utilization.average.rate.percent",
"cpu.wait.summation.delta.millisecond",
"cpu.idle.summation.delta.millisecond",
"cpu.readiness.average.rate.percent",
"cpu.costop.summation.delta.millisecond",
"cpu.swapwait.summation.delta.millisecond",
"mem.swapoutRate.average.rate.kiloBytesPerSecond",
"disk.kernelReadLatency.average.absolute.millisecond",
"disk.kernelWriteLatency.average.absolute.millisecond"},
"cpu.coreUtilization.average",
"cpu.costop.summation",
"cpu.demand.average",
"cpu.idle.summation",
"cpu.latency.average",
"cpu.readiness.average",
"cpu.ready.summation",
"cpu.swapwait.summation",
"cpu.usage.average",
"cpu.usagemhz.average",
"cpu.used.summation",
"cpu.utilization.average",
"cpu.wait.summation",
"disk.deviceReadLatency.average",
"disk.deviceWriteLatency.average",
"disk.kernelReadLatency.average",
"disk.kernelWriteLatency.average",
"disk.numberReadAveraged.average",
"disk.numberWriteAveraged.average",
"disk.read.average",
"disk.totalReadLatency.average",
"disk.totalWriteLatency.average",
"disk.write.average",
"mem.active.average",
"mem.latency.average",
"mem.state.latest",
"mem.swapin.average",
"mem.swapinRate.average",
"mem.swapout.average",
"mem.swapoutRate.average",
"mem.totalCapacity.average",
"mem.usage.average",
"mem.vmmemctl.average",
"net.bytesRx.average",
"net.bytesTx.average",
"net.droppedRx.summation",
"net.droppedTx.summation",
"net.errorsRx.summation",
"net.errorsTx.summation",
"net.usage.average",
"power.power.average",
"storageAdapter.numberReadAveraged.average",
"storageAdapter.numberWriteAveraged.average",
"storageAdapter.read.average",
"storageAdapter.write.average",
"sys.uptime.latest"},
HostMetricExclude: nil,
HostInclude: []string{"/**"},
VMMetricInclude: []string{
"cpu.ready.summation.delta.millisecond",
"mem.swapinRate.average.rate.kiloBytesPerSecond",
"virtualDisk.numberReadAveraged.average.rate.number",
"virtualDisk.numberWriteAveraged.average.rate.number",
"virtualDisk.totalReadLatency.average.absolute.millisecond",
"virtualDisk.totalWriteLatency.average.absolute.millisecond",
"virtualDisk.readOIO.latest.absolute.number",
"virtualDisk.writeOIO.latest.absolute.number",
"net.bytesRx.average.rate.kiloBytesPerSecond",
"net.bytesTx.average.rate.kiloBytesPerSecond",
"net.droppedRx.summation.delta.number",
"net.droppedTx.summation.delta.number",
"cpu.run.summation.delta.millisecond",
"cpu.used.summation.delta.millisecond",
"mem.swapoutRate.average.rate.kiloBytesPerSecond",
"virtualDisk.read.average.rate.kiloBytesPerSecond",
"virtualDisk.write.average.rate.kiloBytesPerSecond"},
"cpu.demand.average",
"cpu.idle.summation",
"cpu.latency.average",
"cpu.readiness.average",
"cpu.ready.summation",
"cpu.run.summation",
"cpu.usagemhz.average",
"cpu.used.summation",
"cpu.wait.summation",
"mem.active.average",
"mem.granted.average",
"mem.latency.average",
"mem.swapin.average",
"mem.swapinRate.average",
"mem.swapout.average",
"mem.swapoutRate.average",
"mem.usage.average",
"mem.vmmemctl.average",
"net.bytesRx.average",
"net.bytesTx.average",
"net.droppedRx.summation",
"net.droppedTx.summation",
"net.usage.average",
"power.power.average",
"virtualDisk.numberReadAveraged.average",
"virtualDisk.numberWriteAveraged.average",
"virtualDisk.read.average",
"virtualDisk.readOIO.latest",
"virtualDisk.throughput.usage.average",
"virtualDisk.totalReadLatency.average",
"virtualDisk.totalWriteLatency.average",
"virtualDisk.write.average",
"virtualDisk.writeOIO.latest",
"sys.uptime.latest"},
VMMetricExclude: nil,
VMInclude: []string{"/**"},
DatastoreMetricInclude: []string{
"disk.used.*",
"disk.provsioned.*"},
DatastoreMetricExclude: nil,
DatastoreInclude: []string{"/**"},
DatacenterMetricInclude: nil,
DatacenterMetricExclude: nil,
DatacenterInclude: []string{"/**"},
ClientConfig: itls.ClientConfig{InsecureSkipVerify: true},
MaxQueryObjects: 256,
MaxQueryMetrics: 256,
ObjectDiscoveryInterval: internal.Duration{Duration: time.Second * 300},
Timeout: internal.Duration{Duration: time.Second * 20},
ForceDiscoverOnInit: true,
@ -197,6 +234,50 @@ func createSim() (*simulator.Model, *simulator.Server, error) {
return model, s, nil
}
func testAlignUniform(t *testing.T, n int) {
now := time.Now().Truncate(60 * time.Second)
info := make([]types.PerfSampleInfo, n)
values := make([]int64, n)
for i := 0; i < n; i++ {
info[i] = types.PerfSampleInfo{
Timestamp: now.Add(time.Duration(20*i) * time.Second),
Interval: 20,
}
values[i] = 1
}
newInfo, newValues := alignSamples(info, values, 60*time.Second)
require.Equal(t, n/3, len(newInfo), "Aligned infos have wrong size")
require.Equal(t, n/3, len(newValues), "Aligned values have wrong size")
for _, v := range newValues {
require.Equal(t, 1.0, v, "Aligned value should be 1")
}
}
func TestAlignMetrics(t *testing.T) {
testAlignUniform(t, 3)
testAlignUniform(t, 30)
testAlignUniform(t, 333)
// 20s to 60s of 1,2,3,1,2,3... (should average to 2)
n := 30
now := time.Now().Truncate(60 * time.Second)
info := make([]types.PerfSampleInfo, n)
values := make([]int64, n)
for i := 0; i < n; i++ {
info[i] = types.PerfSampleInfo{
Timestamp: now.Add(time.Duration(20*i) * time.Second),
Interval: 20,
}
values[i] = int64(i%3 + 1)
}
newInfo, newValues := alignSamples(info, values, 60*time.Second)
require.Equal(t, n/3, len(newInfo), "Aligned infos have wrong size")
require.Equal(t, n/3, len(newValues), "Aligned values have wrong size")
for _, v := range newValues {
require.Equal(t, 2.0, v, "Aligned value should be 2")
}
}
func TestParseConfig(t *testing.T) {
v := VSphere{}
c := v.SampleConfig()
@ -209,7 +290,7 @@ func TestParseConfig(t *testing.T) {
require.NotNil(t, tab)
}
func TestWorkerPool(t *testing.T) {
func TestThrottledExecutor(t *testing.T) {
max := int64(0)
ngr := int64(0)
n := 10000
@ -254,14 +335,13 @@ func TestTimeout(t *testing.T) {
defer m.Remove()
defer s.Close()
var acc testutil.Accumulator
v := defaultVSphere()
var acc testutil.Accumulator
v.Vcenters = []string{s.URL.String()}
v.Timeout = internal.Duration{Duration: 1 * time.Nanosecond}
require.NoError(t, v.Start(nil)) // We're not using the Accumulator, so it can be nil.
defer v.Stop()
err = v.Gather(&acc)
require.True(t, len(acc.Errors) > 0, "Errors should not be empty here")
// The accumulator must contain exactly one error and it must be a deadline exceeded.
require.Equal(t, 1, len(acc.Errors))
@ -311,6 +391,109 @@ func TestMaxQuery(t *testing.T) {
c2.close()
}
func TestFinder(t *testing.T) {
// Don't run test on 32-bit machines due to bug in simulator.
// https://github.com/vmware/govmomi/issues/1330
var i int
if unsafe.Sizeof(i) < 8 {
return
}
m, s, err := createSim()
if err != nil {
t.Fatal(err)
}
defer m.Remove()
defer s.Close()
v := defaultVSphere()
ctx := context.Background()
c, err := NewClient(ctx, s.URL, v)
f := Finder{c}
dc := []mo.Datacenter{}
err = f.Find(ctx, "Datacenter", "/DC0", &dc)
require.NoError(t, err)
require.Equal(t, 1, len(dc))
require.Equal(t, "DC0", dc[0].Name)
host := []mo.HostSystem{}
err = f.Find(ctx, "HostSystem", "/DC0/host/DC0_H0/DC0_H0", &host)
require.NoError(t, err)
require.Equal(t, 1, len(host))
require.Equal(t, "DC0_H0", host[0].Name)
host = []mo.HostSystem{}
err = f.Find(ctx, "HostSystem", "/DC0/host/DC0_C0/DC0_C0_H0", &host)
require.NoError(t, err)
require.Equal(t, 1, len(host))
require.Equal(t, "DC0_C0_H0", host[0].Name)
host = []mo.HostSystem{}
err = f.Find(ctx, "HostSystem", "/DC0/host/DC0_C0/*", &host)
require.NoError(t, err)
require.Equal(t, 3, len(host))
vm := []mo.VirtualMachine{}
err = f.Find(ctx, "VirtualMachine", "/DC0/vm/DC0_H0_VM0", &vm)
require.NoError(t, err)
require.Equal(t, 1, len(dc))
require.Equal(t, "DC0_H0_VM0", vm[0].Name)
vm = []mo.VirtualMachine{}
err = f.Find(ctx, "VirtualMachine", "/DC0/vm/DC0_C0*", &vm)
require.NoError(t, err)
require.Equal(t, 1, len(dc))
vm = []mo.VirtualMachine{}
err = f.Find(ctx, "VirtualMachine", "/DC0/*/DC0_H0_VM0", &vm)
require.NoError(t, err)
require.Equal(t, 1, len(dc))
require.Equal(t, "DC0_H0_VM0", vm[0].Name)
vm = []mo.VirtualMachine{}
err = f.Find(ctx, "VirtualMachine", "/DC0/*/DC0_H0_*", &vm)
require.NoError(t, err)
require.Equal(t, 2, len(vm))
vm = []mo.VirtualMachine{}
err = f.Find(ctx, "VirtualMachine", "/DC0/**/DC0_H0_VM*", &vm)
require.NoError(t, err)
require.Equal(t, 2, len(vm))
vm = []mo.VirtualMachine{}
err = f.Find(ctx, "VirtualMachine", "/DC0/**", &vm)
require.NoError(t, err)
require.Equal(t, 4, len(vm))
vm = []mo.VirtualMachine{}
err = f.Find(ctx, "VirtualMachine", "/**", &vm)
require.NoError(t, err)
require.Equal(t, 4, len(vm))
vm = []mo.VirtualMachine{}
err = f.Find(ctx, "VirtualMachine", "/**/DC0_H0_VM*", &vm)
require.NoError(t, err)
require.Equal(t, 2, len(vm))
vm = []mo.VirtualMachine{}
err = f.Find(ctx, "VirtualMachine", "/**/vm/**", &vm)
require.NoError(t, err)
require.Equal(t, 4, len(vm))
vm = []mo.VirtualMachine{}
err = f.FindAll(ctx, "VirtualMachine", []string{"/DC0/vm/DC0_H0*", "/DC0/vm/DC0_C0*"}, &vm)
require.NoError(t, err)
require.Equal(t, 4, len(vm))
vm = []mo.VirtualMachine{}
err = f.FindAll(ctx, "VirtualMachine", []string{"/**"}, &vm)
require.NoError(t, err)
require.Equal(t, 4, len(vm))
}
func TestAll(t *testing.T) {
// Don't run test on 32-bit machines due to bug in simulator.
// https://github.com/vmware/govmomi/issues/1330
@ -333,4 +516,5 @@ func TestAll(t *testing.T) {
defer v.Stop()
require.NoError(t, v.Gather(&acc))
require.Equal(t, 0, len(acc.Errors), fmt.Sprintf("Errors found: %s", acc.Errors))
require.True(t, len(acc.Metrics) > 0, "No metrics were collected")
}