Add input plugin for DC/OS (#3519)
This commit is contained in:
parent
b6e8214396
commit
2ce21bff24
1
Godeps
1
Godeps
|
@ -10,6 +10,7 @@ github.com/couchbase/go-couchbase bfe555a140d53dc1adf390f1a1d4b0fd4ceadb28
|
||||||
github.com/couchbase/gomemcached 4a25d2f4e1dea9ea7dd76dfd943407abf9b07d29
|
github.com/couchbase/gomemcached 4a25d2f4e1dea9ea7dd76dfd943407abf9b07d29
|
||||||
github.com/couchbase/goutils 5823a0cbaaa9008406021dc5daf80125ea30bba6
|
github.com/couchbase/goutils 5823a0cbaaa9008406021dc5daf80125ea30bba6
|
||||||
github.com/davecgh/go-spew 346938d642f2ec3594ed81d874461961cd0faa76
|
github.com/davecgh/go-spew 346938d642f2ec3594ed81d874461961cd0faa76
|
||||||
|
github.com/dgrijalva/jwt-go dbeaa9332f19a944acb5736b4456cfcc02140e29
|
||||||
github.com/docker/docker f5ec1e2936dcbe7b5001c2b817188b095c700c27
|
github.com/docker/docker f5ec1e2936dcbe7b5001c2b817188b095c700c27
|
||||||
github.com/docker/go-connections 990a1a1a70b0da4c4cb70e117971a4f0babfbf1a
|
github.com/docker/go-connections 990a1a1a70b0da4c4cb70e117971a4f0babfbf1a
|
||||||
github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3
|
github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3
|
||||||
|
|
|
@ -15,6 +15,7 @@ import (
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/consul"
|
_ "github.com/influxdata/telegraf/plugins/inputs/consul"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/couchbase"
|
_ "github.com/influxdata/telegraf/plugins/inputs/couchbase"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/couchdb"
|
_ "github.com/influxdata/telegraf/plugins/inputs/couchdb"
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/inputs/dcos"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/disque"
|
_ "github.com/influxdata/telegraf/plugins/inputs/disque"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/dmcache"
|
_ "github.com/influxdata/telegraf/plugins/inputs/dmcache"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/dns_query"
|
_ "github.com/influxdata/telegraf/plugins/inputs/dns_query"
|
||||||
|
|
|
@ -0,0 +1,209 @@
|
||||||
|
# DC/OS Input Plugin
|
||||||
|
|
||||||
|
This input plugin gathers metrics from a DC/OS cluster's [metrics component](https://docs.mesosphere.com/1.10/metrics/).
|
||||||
|
|
||||||
|
**Series Cardinality Warning**
|
||||||
|
|
||||||
|
Depending on the work load of your DC/OS cluster, this plugin can quickly
|
||||||
|
create a high number of series which, when unchecked, can cause high load on
|
||||||
|
your database.
|
||||||
|
|
||||||
|
- Use [measurement filtering](https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md#measurement-filtering) liberally to exclude unneeded metrics as well as the node, container, and app inclue/exclude options.
|
||||||
|
- Write to a database with an appropriate [retention policy](https://docs.influxdata.com/influxdb/v1.3/concepts/glossary/#retention-policy-rp).
|
||||||
|
- Limit the number of series allowed in your database using the `max-series-per-database` and `max-values-per-tag` settings.
|
||||||
|
- Consider enabling the [TSI](https://docs.influxdata.com/influxdb/v1.3/about_the_project/releasenotes-changelog/#release-notes-8) engine.
|
||||||
|
- Monitor your [series cardinality](https://docs.influxdata.com/influxdb/v1.3/troubleshooting/frequently-asked-questions/#how-can-i-query-for-series-cardinality).
|
||||||
|
|
||||||
|
### Configuration:
|
||||||
|
```toml
|
||||||
|
[[inputs.dcos]]
|
||||||
|
## The DC/OS cluster URL.
|
||||||
|
cluster_url = "https://dcos-master-1"
|
||||||
|
|
||||||
|
## The ID of the service account.
|
||||||
|
service_account_id = "telegraf"
|
||||||
|
## The private key file for the service account.
|
||||||
|
service_account_private_key = "/etc/telegraf/telegraf-sa-key.pem"
|
||||||
|
|
||||||
|
## Path containing login token. If set, will read on every gather.
|
||||||
|
# token_file = "/home/dcos/.dcos/token"
|
||||||
|
|
||||||
|
## In all filter options if both include and exclude are empty all items
|
||||||
|
## will be collected. Arrays may contain glob patterns.
|
||||||
|
##
|
||||||
|
## Node IDs to collect metrics from. If a node is excluded, no metrics will
|
||||||
|
## be collected for its containers or apps.
|
||||||
|
# node_include = []
|
||||||
|
# node_exclude = []
|
||||||
|
## Container IDs to collect container metrics from.
|
||||||
|
# container_include = []
|
||||||
|
# container_exclude = []
|
||||||
|
## Container IDs to collect app metrics from.
|
||||||
|
# app_include = []
|
||||||
|
# app_exclude = []
|
||||||
|
|
||||||
|
## Maximum concurrent connections to the cluster.
|
||||||
|
# max_connections = 10
|
||||||
|
## Maximum time to receive a response from cluster.
|
||||||
|
# response_timeout = "20s"
|
||||||
|
|
||||||
|
## Optional SSL Config
|
||||||
|
# ssl_ca = "/etc/telegraf/ca.pem"
|
||||||
|
# ssl_cert = "/etc/telegraf/cert.pem"
|
||||||
|
# ssl_key = "/etc/telegraf/key.pem"
|
||||||
|
## If false, skip chain & host verification
|
||||||
|
# insecure_skip_verify = true
|
||||||
|
|
||||||
|
## Recommended filtering to reduce series cardinality.
|
||||||
|
# [inputs.dcos.tagdrop]
|
||||||
|
# path = ["/var/lib/mesos/slave/slaves/*"]
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Enterprise Authentication
|
||||||
|
|
||||||
|
When using Enterprise DC/OS, it is recommended to use a service account to
|
||||||
|
authenticate with the cluster.
|
||||||
|
|
||||||
|
The plugin requires the following permissions:
|
||||||
|
```
|
||||||
|
dcos:adminrouter:ops:system-metrics full
|
||||||
|
dcos:adminrouter:ops:mesos full
|
||||||
|
```
|
||||||
|
|
||||||
|
Follow the directions to [create a service account and assign permissions](https://docs.mesosphere.com/1.10/security/service-auth/custom-service-auth/).
|
||||||
|
|
||||||
|
Quick configuration using the Enterprise CLI:
|
||||||
|
```
|
||||||
|
dcos security org service-accounts keypair telegraf-sa-key.pem telegraf-sa-cert.pem
|
||||||
|
dcos security org service-accounts create -p telegraf-sa-cert.pem -d "Telegraf DC/OS input plugin" telegraf
|
||||||
|
dcos security org users grant telegraf dcos:adminrouter:ops:system-metrics full
|
||||||
|
dcos security org users grant telegraf dcos:adminrouter:ops:mesos full
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Open Source Authentication
|
||||||
|
|
||||||
|
The Open Source DC/OS does not provide service accounts. Instead you can use
|
||||||
|
of the following options:
|
||||||
|
|
||||||
|
1. [Disable authentication](https://dcos.io/docs/1.10/security/managing-authentication/#authentication-opt-out)
|
||||||
|
2. Use the `token_file` parameter to read a authentication token from a file.
|
||||||
|
|
||||||
|
Then `token_file` can be set by using the [dcos cli] to login periodically.
|
||||||
|
The cli can login for at most XXX days, you will need to ensure the cli
|
||||||
|
performs a new login before this time expires.
|
||||||
|
```
|
||||||
|
dcos auth login --username foo --password bar
|
||||||
|
dcos config show core.dcos_acs_token > ~/.dcos/token
|
||||||
|
```
|
||||||
|
|
||||||
|
Another option to create a `token_file` is to generate a token using the
|
||||||
|
cluster secret. This will allow you to set the expiration date manually or
|
||||||
|
even create a never expiring token. However, if the cluster secret or the
|
||||||
|
token is compromised it cannot be revoked and may require a full reinstall of
|
||||||
|
the cluster. For more information on this technique reference
|
||||||
|
[this blog post](https://medium.com/@richardgirges/authenticating-open-source-dc-os-with-third-party-services-125fa33a5add).
|
||||||
|
|
||||||
|
### Metrics:
|
||||||
|
|
||||||
|
Please consult the [Metrics Reference](https://docs.mesosphere.com/1.10/metrics/reference/)
|
||||||
|
for details on interprete field interpretation.
|
||||||
|
|
||||||
|
- dcos_node
|
||||||
|
- tags:
|
||||||
|
- cluster
|
||||||
|
- hostname
|
||||||
|
- path (filesystem fields only)
|
||||||
|
- interface (network fields only)
|
||||||
|
- fields:
|
||||||
|
- system_uptime (float)
|
||||||
|
- cpu_cores (float)
|
||||||
|
- cpu_total (float)
|
||||||
|
- cpu_user (float)
|
||||||
|
- cpu_system (float)
|
||||||
|
- cpu_idle (float)
|
||||||
|
- cpu_wait (float)
|
||||||
|
- load_1min (float)
|
||||||
|
- load_5min (float)
|
||||||
|
- load_15min (float)
|
||||||
|
- filesystem_capacity_total_bytes (int)
|
||||||
|
- filesystem_capacity_used_bytes (int)
|
||||||
|
- filesystem_capacity_free_bytes (int)
|
||||||
|
- filesystem_inode_total (float)
|
||||||
|
- filesystem_inode_used (float)
|
||||||
|
- filesystem_inode_free (float)
|
||||||
|
- memory_total_bytes (int)
|
||||||
|
- memory_free_bytes (int)
|
||||||
|
- memory_buffers_bytes (int)
|
||||||
|
- memory_cached_bytes (int)
|
||||||
|
- swap_total_bytes (int)
|
||||||
|
- swap_free_bytes (int)
|
||||||
|
- swap_used_bytes (int)
|
||||||
|
- network_in_bytes (int)
|
||||||
|
- network_out_bytes (int)
|
||||||
|
- network_in_packets (float)
|
||||||
|
- network_out_packets (float)
|
||||||
|
- network_in_dropped (float)
|
||||||
|
- network_out_dropped (float)
|
||||||
|
- network_in_errors (float)
|
||||||
|
- network_out_errors (float)
|
||||||
|
- process_count (float)
|
||||||
|
|
||||||
|
- dcos_container
|
||||||
|
- tags:
|
||||||
|
- cluster
|
||||||
|
- hostname
|
||||||
|
- container_id
|
||||||
|
- task_name
|
||||||
|
- fields:
|
||||||
|
- cpus_limit (float)
|
||||||
|
- cpus_system_time (float)
|
||||||
|
- cpus_throttled_time (float)
|
||||||
|
- cpus_user_time (float)
|
||||||
|
- disk_limit_bytes (int)
|
||||||
|
- disk_used_bytes (int)
|
||||||
|
- mem_limit_bytes (int)
|
||||||
|
- mem_total_bytes (int)
|
||||||
|
- net_rx_bytes (int)
|
||||||
|
- net_rx_dropped (float)
|
||||||
|
- net_rx_errors (float)
|
||||||
|
- net_rx_packets (float)
|
||||||
|
- net_tx_bytes (int)
|
||||||
|
- net_tx_dropped (float)
|
||||||
|
- net_tx_errors (float)
|
||||||
|
- net_tx_packets (float)
|
||||||
|
|
||||||
|
- dcos_app
|
||||||
|
- tags:
|
||||||
|
- cluster
|
||||||
|
- hostname
|
||||||
|
- container_id
|
||||||
|
- task_name
|
||||||
|
- fields:
|
||||||
|
- fields are application specific
|
||||||
|
|
||||||
|
### Example Output:
|
||||||
|
|
||||||
|
```
|
||||||
|
dcos_node,cluster=enterprise,hostname=192.168.122.18,path=/boot filesystem_capacity_free_bytes=918188032i,filesystem_capacity_total_bytes=1063256064i,filesystem_capacity_used_bytes=145068032i,filesystem_inode_free=523958,filesystem_inode_total=524288,filesystem_inode_used=330 1511859222000000000
|
||||||
|
dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=dummy0 network_in_bytes=0i,network_in_dropped=0,network_in_errors=0,network_in_packets=0,network_out_bytes=0i,network_out_dropped=0,network_out_errors=0,network_out_packets=0 1511859222000000000
|
||||||
|
dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=docker0 network_in_bytes=0i,network_in_dropped=0,network_in_errors=0,network_in_packets=0,network_out_bytes=0i,network_out_dropped=0,network_out_errors=0,network_out_packets=0 1511859222000000000
|
||||||
|
dcos_node,cluster=enterprise,hostname=192.168.122.18 cpu_cores=2,cpu_idle=81.62,cpu_system=4.19,cpu_total=13.670000000000002,cpu_user=9.48,cpu_wait=0,load_15min=0.7,load_1min=0.22,load_5min=0.6,memory_buffers_bytes=970752i,memory_cached_bytes=1830473728i,memory_free_bytes=1178636288i,memory_total_bytes=3975073792i,process_count=198,swap_free_bytes=859828224i,swap_total_bytes=859828224i,swap_used_bytes=0i,system_uptime=18874 1511859222000000000
|
||||||
|
dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=lo network_in_bytes=1090992450i,network_in_dropped=0,network_in_errors=0,network_in_packets=1546938,network_out_bytes=1090992450i,network_out_dropped=0,network_out_errors=0,network_out_packets=1546938 1511859222000000000
|
||||||
|
dcos_node,cluster=enterprise,hostname=192.168.122.18,path=/ filesystem_capacity_free_bytes=1668378624i,filesystem_capacity_total_bytes=6641680384i,filesystem_capacity_used_bytes=4973301760i,filesystem_inode_free=3107856,filesystem_inode_total=3248128,filesystem_inode_used=140272 1511859222000000000
|
||||||
|
dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=minuteman network_in_bytes=0i,network_in_dropped=0,network_in_errors=0,network_in_packets=0,network_out_bytes=210i,network_out_dropped=0,network_out_errors=0,network_out_packets=3 1511859222000000000
|
||||||
|
dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=eth0 network_in_bytes=539886216i,network_in_dropped=1,network_in_errors=0,network_in_packets=979808,network_out_bytes=112395836i,network_out_dropped=0,network_out_errors=0,network_out_packets=891239 1511859222000000000
|
||||||
|
dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=spartan network_in_bytes=0i,network_in_dropped=0,network_in_errors=0,network_in_packets=0,network_out_bytes=210i,network_out_dropped=0,network_out_errors=0,network_out_packets=3 1511859222000000000
|
||||||
|
dcos_node,cluster=enterprise,hostname=192.168.122.18,path=/var/lib/docker/overlay filesystem_capacity_free_bytes=1668378624i,filesystem_capacity_total_bytes=6641680384i,filesystem_capacity_used_bytes=4973301760i,filesystem_inode_free=3107856,filesystem_inode_total=3248128,filesystem_inode_used=140272 1511859222000000000
|
||||||
|
dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=vtep1024 network_in_bytes=0i,network_in_dropped=0,network_in_errors=0,network_in_packets=0,network_out_bytes=0i,network_out_dropped=0,network_out_errors=0,network_out_packets=0 1511859222000000000
|
||||||
|
dcos_node,cluster=enterprise,hostname=192.168.122.18,path=/var/lib/docker/plugins filesystem_capacity_free_bytes=1668378624i,filesystem_capacity_total_bytes=6641680384i,filesystem_capacity_used_bytes=4973301760i,filesystem_inode_free=3107856,filesystem_inode_total=3248128,filesystem_inode_used=140272 1511859222000000000
|
||||||
|
dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=d-dcos network_in_bytes=0i,network_in_dropped=0,network_in_errors=0,network_in_packets=0,network_out_bytes=0i,network_out_dropped=0,network_out_errors=0,network_out_packets=0 1511859222000000000
|
||||||
|
dcos_app,cluster=enterprise,container_id=9a78d34a-3bbf-467e-81cf-a57737f154ee,hostname=192.168.122.18 container_received_bytes_per_sec=0,container_throttled_bytes_per_sec=0 1511859222000000000
|
||||||
|
dcos_container,cluster=enterprise,container_id=cbf19b77-3b8d-4bcf-b81f-824b67279629,hostname=192.168.122.18 cpus_limit=0.3,cpus_system_time=307.31,cpus_throttled_time=102.029930607,cpus_user_time=268.57,disk_limit_bytes=268435456i,disk_used_bytes=30953472i,mem_limit_bytes=570425344i,mem_total_bytes=13316096i,net_rx_bytes=0i,net_rx_dropped=0,net_rx_errors=0,net_rx_packets=0,net_tx_bytes=0i,net_tx_dropped=0,net_tx_errors=0,net_tx_packets=0 1511859222000000000
|
||||||
|
dcos_app,cluster=enterprise,container_id=cbf19b77-3b8d-4bcf-b81f-824b67279629,hostname=192.168.122.18 container_received_bytes_per_sec=0,container_throttled_bytes_per_sec=0 1511859222000000000
|
||||||
|
dcos_container,cluster=enterprise,container_id=5725e219-f66e-40a8-b3ab-519d85f4c4dc,hostname=192.168.122.18,task_name=hello-world cpus_limit=0.6,cpus_system_time=25.6,cpus_throttled_time=327.977109217,cpus_user_time=566.54,disk_limit_bytes=0i,disk_used_bytes=0i,mem_limit_bytes=1107296256i,mem_total_bytes=335941632i,net_rx_bytes=0i,net_rx_dropped=0,net_rx_errors=0,net_rx_packets=0,net_tx_bytes=0i,net_tx_dropped=0,net_tx_errors=0,net_tx_packets=0 1511859222000000000
|
||||||
|
dcos_app,cluster=enterprise,container_id=5725e219-f66e-40a8-b3ab-519d85f4c4dc,hostname=192.168.122.18 container_received_bytes_per_sec=0,container_throttled_bytes_per_sec=0 1511859222000000000
|
||||||
|
dcos_app,cluster=enterprise,container_id=c76e1488-4fb7-4010-a4cf-25725f8173f9,hostname=192.168.122.18 container_received_bytes_per_sec=0,container_throttled_bytes_per_sec=0 1511859222000000000
|
||||||
|
dcos_container,cluster=enterprise,container_id=cbe0b2f9-061f-44ac-8f15-4844229e8231,hostname=192.168.122.18,task_name=telegraf cpus_limit=0.2,cpus_system_time=8.109999999,cpus_throttled_time=93.183916045,cpus_user_time=17.97,disk_limit_bytes=0i,disk_used_bytes=0i,mem_limit_bytes=167772160i,mem_total_bytes=0i,net_rx_bytes=0i,net_rx_dropped=0,net_rx_errors=0,net_rx_packets=0,net_tx_bytes=0i,net_tx_dropped=0,net_tx_errors=0,net_tx_packets=0 1511859222000000000
|
||||||
|
dcos_container,cluster=enterprise,container_id=b64115de-3d2a-431d-a805-76e7c46453f1,hostname=192.168.122.18 cpus_limit=0.2,cpus_system_time=2.69,cpus_throttled_time=20.064861214,cpus_user_time=6.56,disk_limit_bytes=268435456i,disk_used_bytes=29360128i,mem_limit_bytes=297795584i,mem_total_bytes=13733888i,net_rx_bytes=0i,net_rx_dropped=0,net_rx_errors=0,net_rx_packets=0,net_tx_bytes=0i,net_tx_dropped=0,net_tx_errors=0,net_tx_packets=0 1511859222000000000
|
||||||
|
dcos_app,cluster=enterprise,container_id=b64115de-3d2a-431d-a805-76e7c46453f1,hostname=192.168.122.18 container_received_bytes_per_sec=0,container_throttled_bytes_per_sec=0 1511859222000000000
|
||||||
|
```
|
|
@ -0,0 +1,332 @@
|
||||||
|
package dcos
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
jwt "github.com/dgrijalva/jwt-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// How long to stayed logged in for
|
||||||
|
loginDuration = 65 * time.Minute
|
||||||
|
)
|
||||||
|
|
||||||
|
// Client is an interface for communicating with the DC/OS API.
|
||||||
|
type Client interface {
|
||||||
|
SetToken(token string)
|
||||||
|
|
||||||
|
Login(ctx context.Context, sa *ServiceAccount) (*AuthToken, error)
|
||||||
|
GetSummary(ctx context.Context) (*Summary, error)
|
||||||
|
GetContainers(ctx context.Context, node string) ([]Container, error)
|
||||||
|
GetNodeMetrics(ctx context.Context, node string) (*Metrics, error)
|
||||||
|
GetContainerMetrics(ctx context.Context, node, container string) (*Metrics, error)
|
||||||
|
GetAppMetrics(ctx context.Context, node, container string) (*Metrics, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type APIError struct {
|
||||||
|
StatusCode int
|
||||||
|
Title string
|
||||||
|
Description string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Login is request data for logging in.
|
||||||
|
type Login struct {
|
||||||
|
UID string `json:"uid"`
|
||||||
|
Exp int64 `json:"exp"`
|
||||||
|
Token string `json:"token"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoginError is the response when login fails.
|
||||||
|
type LoginError struct {
|
||||||
|
Title string `json:"title"`
|
||||||
|
Description string `json:"description"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoginAuth is the response to a successful login.
|
||||||
|
type LoginAuth struct {
|
||||||
|
Token string `json:"token"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Slave is a node in the cluster.
|
||||||
|
type Slave struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Summary provides high level cluster wide information.
|
||||||
|
type Summary struct {
|
||||||
|
Cluster string
|
||||||
|
Slaves []Slave
|
||||||
|
}
|
||||||
|
|
||||||
|
// Container is a container on a node.
|
||||||
|
type Container struct {
|
||||||
|
ID string
|
||||||
|
}
|
||||||
|
|
||||||
|
type DataPoint struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Tags map[string]string `json:"tags"`
|
||||||
|
Unit string `json:"unit"`
|
||||||
|
Value float64 `json:"value"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Metrics are the DCOS metrics
|
||||||
|
type Metrics struct {
|
||||||
|
Datapoints []DataPoint `json:"datapoints"`
|
||||||
|
Dimensions map[string]interface{} `json:"dimensions"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// AuthToken is the authentication token.
|
||||||
|
type AuthToken struct {
|
||||||
|
Text string
|
||||||
|
Expire time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClusterClient is a Client that uses the cluster URL.
|
||||||
|
type ClusterClient struct {
|
||||||
|
clusterURL *url.URL
|
||||||
|
httpClient *http.Client
|
||||||
|
credentials *Credentials
|
||||||
|
token string
|
||||||
|
semaphore chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type claims struct {
|
||||||
|
UID string `json:"uid"`
|
||||||
|
jwt.StandardClaims
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e APIError) Error() string {
|
||||||
|
if e.Description != "" {
|
||||||
|
return fmt.Sprintf("%s: %s", e.Title, e.Description)
|
||||||
|
}
|
||||||
|
return e.Title
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewClusterClient(
|
||||||
|
clusterURL *url.URL,
|
||||||
|
timeout time.Duration,
|
||||||
|
maxConns int,
|
||||||
|
tlsConfig *tls.Config,
|
||||||
|
) *ClusterClient {
|
||||||
|
httpClient := &http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
MaxIdleConns: maxConns,
|
||||||
|
TLSClientConfig: tlsConfig,
|
||||||
|
},
|
||||||
|
Timeout: timeout,
|
||||||
|
}
|
||||||
|
semaphore := make(chan struct{}, maxConns)
|
||||||
|
|
||||||
|
c := &ClusterClient{
|
||||||
|
clusterURL: clusterURL,
|
||||||
|
httpClient: httpClient,
|
||||||
|
semaphore: semaphore,
|
||||||
|
}
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClusterClient) SetToken(token string) {
|
||||||
|
c.token = token
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClusterClient) Login(ctx context.Context, sa *ServiceAccount) (*AuthToken, error) {
|
||||||
|
token, err := c.createLoginToken(sa)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
exp := time.Now().Add(loginDuration)
|
||||||
|
|
||||||
|
body := &Login{
|
||||||
|
UID: sa.AccountID,
|
||||||
|
Exp: exp.Unix(),
|
||||||
|
Token: token,
|
||||||
|
}
|
||||||
|
|
||||||
|
octets, err := json.Marshal(body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequest("POST", c.url("/acs/api/v1/auth/login"), bytes.NewBuffer(octets))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.Header.Add("Content-Type", "application/json")
|
||||||
|
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode == http.StatusOK {
|
||||||
|
auth := &LoginAuth{}
|
||||||
|
dec := json.NewDecoder(resp.Body)
|
||||||
|
err = dec.Decode(auth)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
token := &AuthToken{
|
||||||
|
Text: auth.Token,
|
||||||
|
Expire: exp,
|
||||||
|
}
|
||||||
|
return token, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
loginError := &LoginError{}
|
||||||
|
dec := json.NewDecoder(resp.Body)
|
||||||
|
err = dec.Decode(loginError)
|
||||||
|
if err != nil {
|
||||||
|
err := &APIError{
|
||||||
|
StatusCode: resp.StatusCode,
|
||||||
|
Title: resp.Status,
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = &APIError{
|
||||||
|
StatusCode: resp.StatusCode,
|
||||||
|
Title: loginError.Title,
|
||||||
|
Description: loginError.Description,
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClusterClient) GetSummary(ctx context.Context) (*Summary, error) {
|
||||||
|
summary := &Summary{}
|
||||||
|
err := c.doGet(ctx, c.url("/mesos/master/state-summary"), summary)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return summary, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClusterClient) GetContainers(ctx context.Context, node string) ([]Container, error) {
|
||||||
|
list := []string{}
|
||||||
|
|
||||||
|
path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers", node)
|
||||||
|
err := c.doGet(ctx, c.url(path), &list)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
containers := make([]Container, 0, len(list))
|
||||||
|
for _, c := range list {
|
||||||
|
containers = append(containers, Container{ID: c})
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return containers, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClusterClient) getMetrics(ctx context.Context, url string) (*Metrics, error) {
|
||||||
|
metrics := &Metrics{}
|
||||||
|
|
||||||
|
err := c.doGet(ctx, url, metrics)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return metrics, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClusterClient) GetNodeMetrics(ctx context.Context, node string) (*Metrics, error) {
|
||||||
|
path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/node", node)
|
||||||
|
return c.getMetrics(ctx, c.url(path))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClusterClient) GetContainerMetrics(ctx context.Context, node, container string) (*Metrics, error) {
|
||||||
|
path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers/%s", node, container)
|
||||||
|
return c.getMetrics(ctx, c.url(path))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClusterClient) GetAppMetrics(ctx context.Context, node, container string) (*Metrics, error) {
|
||||||
|
path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers/%s/app", node, container)
|
||||||
|
return c.getMetrics(ctx, c.url(path))
|
||||||
|
}
|
||||||
|
|
||||||
|
func createGetRequest(url string, token string) (*http.Request, error) {
|
||||||
|
req, err := http.NewRequest("GET", url, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if token != "" {
|
||||||
|
req.Header.Add("Authorization", "token="+token)
|
||||||
|
}
|
||||||
|
req.Header.Add("Accept", "application/json")
|
||||||
|
|
||||||
|
return req, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClusterClient) doGet(ctx context.Context, url string, v interface{}) error {
|
||||||
|
req, err := createGetRequest(url, c.token)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case c.semaphore <- struct{}{}:
|
||||||
|
break
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := c.httpClient.Do(req.WithContext(ctx))
|
||||||
|
if err != nil {
|
||||||
|
<-c.semaphore
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
resp.Body.Close()
|
||||||
|
<-c.semaphore
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Clear invalid token if unauthorized
|
||||||
|
if resp.StatusCode == http.StatusUnauthorized {
|
||||||
|
c.token = ""
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||||
|
return &APIError{
|
||||||
|
StatusCode: resp.StatusCode,
|
||||||
|
Title: resp.Status,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode == http.StatusNoContent {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
err = json.NewDecoder(resp.Body).Decode(v)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClusterClient) url(path string) string {
|
||||||
|
url := c.clusterURL
|
||||||
|
url.Path = path
|
||||||
|
return url.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClusterClient) createLoginToken(sa *ServiceAccount) (string, error) {
|
||||||
|
token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims{
|
||||||
|
UID: sa.AccountID,
|
||||||
|
StandardClaims: jwt.StandardClaims{
|
||||||
|
// How long we have to login with this token
|
||||||
|
ExpiresAt: int64(5 * time.Minute / time.Second),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
return token.SignedString(sa.PrivateKey)
|
||||||
|
}
|
|
@ -0,0 +1,232 @@
|
||||||
|
package dcos
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"net/url"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
jwt "github.com/dgrijalva/jwt-go"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
privateKey = `-----BEGIN RSA PRIVATE KEY-----
|
||||||
|
MIICXQIBAAKBgQCwlGyzVp9cqtwiNCgCnaR0kilPZhr4xFBcnXxvQ8/uzOHaWKxj
|
||||||
|
XWR38cKR3gPh5+4iSmzMdo3HDJM5ks6imXGnp+LPOA5iNewnpLNs7UxA2arwKH/6
|
||||||
|
4qIaAXAtf5jE46wZIMgc2EW9wGL3dxC0JY8EXPpBFB/3J8gADkorFR8lwwIDAQAB
|
||||||
|
AoGBAJaFHxfMmjHK77U0UnrQWFSKFy64cftmlL4t/Nl3q7L68PdIKULWZIMeEWZ4
|
||||||
|
I0UZiFOwr4em83oejQ1ByGSwekEuiWaKUI85IaHfcbt+ogp9hY/XbOEo56OPQUAd
|
||||||
|
bEZv1JqJOqta9Ug1/E1P9LjEEyZ5F5ubx7813rxAE31qKtKJAkEA1zaMlCWIr+Rj
|
||||||
|
hGvzv5rlHH3wbOB4kQFXO4nqj3J/ttzR5QiJW24STMDcbNngFlVcDVju56LrNTiD
|
||||||
|
dPh9qvl7nwJBANILguR4u33OMksEZTYB7nQZSurqXsq6382zH7pTl29ANQTROHaM
|
||||||
|
PKC8dnDWq8RGTqKuvWblIzzGIKqIMovZo10CQC96T0UXirITFolOL3XjvAuvFO1Q
|
||||||
|
EAkdXJs77805m0dCK+P1IChVfiAEpBw3bKJArpAbQIlFfdI953JUp5SieU0CQEub
|
||||||
|
BSSEKMjh/cxu6peEHnb/262vayuCFKkQPu1sxWewLuVrAe36EKCy9dcsDmv5+rgo
|
||||||
|
Odjdxc9Madm4aKlaT6kCQQCpAgeblDrrxTrNQ+Typzo37PlnQrvI+0EceAUuJ72G
|
||||||
|
P0a+YZUeHNRqT2pPN9lMTAZGGi3CtcF2XScbLNEBeXge
|
||||||
|
-----END RSA PRIVATE KEY-----`
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestLogin(t *testing.T) {
|
||||||
|
var tests = []struct {
|
||||||
|
name string
|
||||||
|
responseCode int
|
||||||
|
responseBody string
|
||||||
|
expectedError error
|
||||||
|
expectedToken string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Login successful",
|
||||||
|
responseCode: 200,
|
||||||
|
responseBody: `{"token": "XXX.YYY.ZZZ"}`,
|
||||||
|
expectedError: nil,
|
||||||
|
expectedToken: "XXX.YYY.ZZZ",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Unauthorized Error",
|
||||||
|
responseCode: http.StatusUnauthorized,
|
||||||
|
responseBody: `{"title": "x", "description": "y"}`,
|
||||||
|
expectedError: &APIError{http.StatusUnauthorized, "x", "y"},
|
||||||
|
expectedToken: "",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
key, err := jwt.ParseRSAPrivateKeyFromPEM([]byte(privateKey))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(tt.responseCode)
|
||||||
|
fmt.Fprintln(w, tt.responseBody)
|
||||||
|
})
|
||||||
|
ts := httptest.NewServer(handler)
|
||||||
|
u, err := url.Parse(ts.URL)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
sa := &ServiceAccount{
|
||||||
|
AccountID: "telegraf",
|
||||||
|
PrivateKey: key,
|
||||||
|
}
|
||||||
|
client := NewClusterClient(u, defaultResponseTimeout, 1, nil)
|
||||||
|
auth, err := client.Login(ctx, sa)
|
||||||
|
|
||||||
|
require.Equal(t, tt.expectedError, err)
|
||||||
|
|
||||||
|
if tt.expectedToken != "" {
|
||||||
|
require.Equal(t, tt.expectedToken, auth.Text)
|
||||||
|
} else {
|
||||||
|
require.Nil(t, auth)
|
||||||
|
}
|
||||||
|
|
||||||
|
ts.Close()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetSummary(t *testing.T) {
|
||||||
|
var tests = []struct {
|
||||||
|
name string
|
||||||
|
responseCode int
|
||||||
|
responseBody string
|
||||||
|
expectedValue *Summary
|
||||||
|
expectedError error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "No nodes",
|
||||||
|
responseCode: 200,
|
||||||
|
responseBody: `{"cluster": "a", "slaves": []}`,
|
||||||
|
expectedValue: &Summary{Cluster: "a", Slaves: []Slave{}},
|
||||||
|
expectedError: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Unauthorized Error",
|
||||||
|
responseCode: http.StatusUnauthorized,
|
||||||
|
responseBody: `<html></html>`,
|
||||||
|
expectedValue: nil,
|
||||||
|
expectedError: &APIError{StatusCode: http.StatusUnauthorized, Title: "401 Unauthorized"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Has nodes",
|
||||||
|
responseCode: 200,
|
||||||
|
responseBody: `{"cluster": "a", "slaves": [{"id": "a"}, {"id": "b"}]}`,
|
||||||
|
expectedValue: &Summary{
|
||||||
|
Cluster: "a",
|
||||||
|
Slaves: []Slave{
|
||||||
|
Slave{ID: "a"},
|
||||||
|
Slave{ID: "b"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedError: nil,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
// check the path
|
||||||
|
w.WriteHeader(tt.responseCode)
|
||||||
|
fmt.Fprintln(w, tt.responseBody)
|
||||||
|
})
|
||||||
|
ts := httptest.NewServer(handler)
|
||||||
|
u, err := url.Parse(ts.URL)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
client := NewClusterClient(u, defaultResponseTimeout, 1, nil)
|
||||||
|
summary, err := client.GetSummary(ctx)
|
||||||
|
|
||||||
|
require.Equal(t, tt.expectedError, err)
|
||||||
|
require.Equal(t, tt.expectedValue, summary)
|
||||||
|
|
||||||
|
ts.Close()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetNodeMetrics(t *testing.T) {
|
||||||
|
var tests = []struct {
|
||||||
|
name string
|
||||||
|
responseCode int
|
||||||
|
responseBody string
|
||||||
|
expectedValue *Metrics
|
||||||
|
expectedError error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Empty Body",
|
||||||
|
responseCode: 200,
|
||||||
|
responseBody: `{}`,
|
||||||
|
expectedValue: &Metrics{},
|
||||||
|
expectedError: nil,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
// check the path
|
||||||
|
w.WriteHeader(tt.responseCode)
|
||||||
|
fmt.Fprintln(w, tt.responseBody)
|
||||||
|
})
|
||||||
|
ts := httptest.NewServer(handler)
|
||||||
|
u, err := url.Parse(ts.URL)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
client := NewClusterClient(u, defaultResponseTimeout, 1, nil)
|
||||||
|
m, err := client.GetNodeMetrics(ctx, "foo")
|
||||||
|
|
||||||
|
require.Equal(t, tt.expectedError, err)
|
||||||
|
require.Equal(t, tt.expectedValue, m)
|
||||||
|
|
||||||
|
ts.Close()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetContainerMetrics(t *testing.T) {
|
||||||
|
var tests = []struct {
|
||||||
|
name string
|
||||||
|
responseCode int
|
||||||
|
responseBody string
|
||||||
|
expectedValue *Metrics
|
||||||
|
expectedError error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "204 No Contents",
|
||||||
|
responseCode: 204,
|
||||||
|
responseBody: ``,
|
||||||
|
expectedValue: &Metrics{},
|
||||||
|
expectedError: nil,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
// check the path
|
||||||
|
w.WriteHeader(tt.responseCode)
|
||||||
|
fmt.Fprintln(w, tt.responseBody)
|
||||||
|
})
|
||||||
|
ts := httptest.NewServer(handler)
|
||||||
|
u, err := url.Parse(ts.URL)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
client := NewClusterClient(u, defaultResponseTimeout, 1, nil)
|
||||||
|
m, err := client.GetContainerMetrics(ctx, "foo", "bar")
|
||||||
|
|
||||||
|
require.Equal(t, tt.expectedError, err)
|
||||||
|
require.Equal(t, tt.expectedValue, m)
|
||||||
|
|
||||||
|
ts.Close()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,72 @@
|
||||||
|
package dcos
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/rsa"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
"unicode/utf8"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// How long before expiration to renew token
|
||||||
|
relogDuration = 5 * time.Minute
|
||||||
|
)
|
||||||
|
|
||||||
|
type Credentials interface {
|
||||||
|
Token(ctx context.Context, client Client) (string, error)
|
||||||
|
IsExpired() bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type ServiceAccount struct {
|
||||||
|
AccountID string
|
||||||
|
PrivateKey *rsa.PrivateKey
|
||||||
|
|
||||||
|
auth *AuthToken
|
||||||
|
}
|
||||||
|
|
||||||
|
type TokenCreds struct {
|
||||||
|
Path string
|
||||||
|
}
|
||||||
|
|
||||||
|
type NullCreds struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ServiceAccount) Token(ctx context.Context, client Client) (string, error) {
|
||||||
|
auth, err := client.Login(ctx, c)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
c.auth = auth
|
||||||
|
return auth.Text, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ServiceAccount) IsExpired() bool {
|
||||||
|
return c.auth.Text != "" || c.auth.Expire.Add(relogDuration).After(time.Now())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *TokenCreds) Token(ctx context.Context, client Client) (string, error) {
|
||||||
|
octets, err := ioutil.ReadFile(c.Path)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("Error reading token file %q: %s", c.Path, err)
|
||||||
|
}
|
||||||
|
if !utf8.Valid(octets) {
|
||||||
|
return "", fmt.Errorf("Token file does not contain utf-8 encoded text: %s", c.Path)
|
||||||
|
}
|
||||||
|
token := strings.TrimSpace(string(octets))
|
||||||
|
return token, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *TokenCreds) IsExpired() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *NullCreds) Token(ctx context.Context, client Client) (string, error) {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *NullCreds) IsExpired() bool {
|
||||||
|
return true
|
||||||
|
}
|
|
@ -0,0 +1,435 @@
|
||||||
|
package dcos
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/url"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
jwt "github.com/dgrijalva/jwt-go"
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/filter"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultMaxConnections = 10
|
||||||
|
defaultResponseTimeout = 20 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
nodeDimensions = []string{
|
||||||
|
"hostname",
|
||||||
|
"path",
|
||||||
|
"interface",
|
||||||
|
}
|
||||||
|
containerDimensions = []string{
|
||||||
|
"hostname",
|
||||||
|
"container_id",
|
||||||
|
"task_name",
|
||||||
|
}
|
||||||
|
appDimensions = []string{
|
||||||
|
"hostname",
|
||||||
|
"container_id",
|
||||||
|
"task_name",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
type DCOS struct {
|
||||||
|
ClusterURL string `toml:"cluster_url"`
|
||||||
|
|
||||||
|
ServiceAccountID string `toml:"service_account_id"`
|
||||||
|
ServiceAccountPrivateKey string
|
||||||
|
|
||||||
|
TokenFile string
|
||||||
|
|
||||||
|
NodeInclude []string
|
||||||
|
NodeExclude []string
|
||||||
|
ContainerInclude []string
|
||||||
|
ContainerExclude []string
|
||||||
|
AppInclude []string
|
||||||
|
AppExclude []string
|
||||||
|
|
||||||
|
MaxConnections int
|
||||||
|
ResponseTimeout internal.Duration
|
||||||
|
|
||||||
|
SSLCA string `toml:"ssl_ca"`
|
||||||
|
SSLCert string `toml:"ssl_cert"`
|
||||||
|
SSLKey string `toml:"ssl_key"`
|
||||||
|
InsecureSkipVerify bool `toml:"insecure_skip_verify"`
|
||||||
|
|
||||||
|
client Client
|
||||||
|
creds Credentials
|
||||||
|
|
||||||
|
initialized bool
|
||||||
|
nodeFilter filter.Filter
|
||||||
|
containerFilter filter.Filter
|
||||||
|
appFilter filter.Filter
|
||||||
|
taskNameFilter filter.Filter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DCOS) Description() string {
|
||||||
|
return "Input plugin for DC/OS metrics"
|
||||||
|
}
|
||||||
|
|
||||||
|
var sampleConfig = `
|
||||||
|
## The DC/OS cluster URL.
|
||||||
|
cluster_url = "https://dcos-ee-master-1"
|
||||||
|
|
||||||
|
## The ID of the service account.
|
||||||
|
service_account_id = "telegraf"
|
||||||
|
## The private key file for the service account.
|
||||||
|
service_account_private_key = "/etc/telegraf/telegraf-sa-key.pem"
|
||||||
|
|
||||||
|
## Path containing login token. If set, will read on every gather.
|
||||||
|
# token_file = "/home/dcos/.dcos/token"
|
||||||
|
|
||||||
|
## In all filter options if both include and exclude are empty all items
|
||||||
|
## will be collected. Arrays may contain glob patterns.
|
||||||
|
##
|
||||||
|
## Node IDs to collect metrics from. If a node is excluded, no metrics will
|
||||||
|
## be collected for its containers or apps.
|
||||||
|
# node_include = []
|
||||||
|
# node_exclude = []
|
||||||
|
## Container IDs to collect container metrics from.
|
||||||
|
# container_include = []
|
||||||
|
# container_exclude = []
|
||||||
|
## Container IDs to collect app metrics from.
|
||||||
|
# app_include = []
|
||||||
|
# app_exclude = []
|
||||||
|
|
||||||
|
## Maximum concurrent connections to the cluster.
|
||||||
|
# max_connections = 10
|
||||||
|
## Maximum time to receive a response from cluster.
|
||||||
|
# response_timeout = "20s"
|
||||||
|
|
||||||
|
## Optional SSL Config
|
||||||
|
# ssl_ca = "/etc/telegraf/ca.pem"
|
||||||
|
# ssl_cert = "/etc/telegraf/cert.pem"
|
||||||
|
# ssl_key = "/etc/telegraf/key.pem"
|
||||||
|
## If false, skip chain & host verification
|
||||||
|
# insecure_skip_verify = true
|
||||||
|
|
||||||
|
## Recommended filtering to reduce series cardinality.
|
||||||
|
# [inputs.dcos.tagdrop]
|
||||||
|
# path = ["/var/lib/mesos/slave/slaves/*"]
|
||||||
|
`
|
||||||
|
|
||||||
|
func (d *DCOS) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DCOS) Gather(acc telegraf.Accumulator) error {
|
||||||
|
err := d.init()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
token, err := d.creds.Token(ctx, d.client)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
d.client.SetToken(token)
|
||||||
|
|
||||||
|
summary, err := d.client.GetSummary(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for _, node := range summary.Slaves {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(node string) {
|
||||||
|
defer wg.Done()
|
||||||
|
d.GatherNode(ctx, acc, summary.Cluster, node)
|
||||||
|
}(node.ID)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DCOS) GatherNode(ctx context.Context, acc telegraf.Accumulator, cluster, node string) {
|
||||||
|
if !d.nodeFilter.Match(node) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
m, err := d.client.GetNodeMetrics(ctx, node)
|
||||||
|
if err != nil {
|
||||||
|
acc.AddError(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
d.addNodeMetrics(acc, cluster, m)
|
||||||
|
}()
|
||||||
|
|
||||||
|
d.GatherContainers(ctx, acc, cluster, node)
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DCOS) GatherContainers(ctx context.Context, acc telegraf.Accumulator, cluster, node string) {
|
||||||
|
containers, err := d.client.GetContainers(ctx, node)
|
||||||
|
if err != nil {
|
||||||
|
acc.AddError(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for _, container := range containers {
|
||||||
|
if d.containerFilter.Match(container.ID) {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(container string) {
|
||||||
|
defer wg.Done()
|
||||||
|
m, err := d.client.GetContainerMetrics(ctx, node, container)
|
||||||
|
if err != nil {
|
||||||
|
if err, ok := err.(APIError); ok && err.StatusCode == 404 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
acc.AddError(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
d.addContainerMetrics(acc, cluster, m)
|
||||||
|
}(container.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
if d.appFilter.Match(container.ID) {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(container string) {
|
||||||
|
defer wg.Done()
|
||||||
|
m, err := d.client.GetAppMetrics(ctx, node, container)
|
||||||
|
if err != nil {
|
||||||
|
if err, ok := err.(APIError); ok && err.StatusCode == 404 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
acc.AddError(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
d.addAppMetrics(acc, cluster, m)
|
||||||
|
}(container.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
type point struct {
|
||||||
|
tags map[string]string
|
||||||
|
labels map[string]string
|
||||||
|
fields map[string]interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DCOS) createPoints(acc telegraf.Accumulator, m *Metrics) []*point {
|
||||||
|
points := make(map[string]*point)
|
||||||
|
for _, dp := range m.Datapoints {
|
||||||
|
fieldKey := strings.Replace(dp.Name, ".", "_", -1)
|
||||||
|
|
||||||
|
tags := dp.Tags
|
||||||
|
if tags == nil {
|
||||||
|
tags = make(map[string]string)
|
||||||
|
}
|
||||||
|
|
||||||
|
if dp.Unit == "bytes" && !strings.HasSuffix(fieldKey, "_bytes") {
|
||||||
|
fieldKey = fieldKey + "_bytes"
|
||||||
|
}
|
||||||
|
|
||||||
|
if strings.HasPrefix(fieldKey, "dcos_metrics_module_") {
|
||||||
|
fieldKey = strings.TrimPrefix(fieldKey, "dcos_metrics_module_")
|
||||||
|
}
|
||||||
|
|
||||||
|
tagset := make([]string, 0, len(tags))
|
||||||
|
for k, v := range tags {
|
||||||
|
tagset = append(tagset, k+"="+v)
|
||||||
|
}
|
||||||
|
sort.Strings(tagset)
|
||||||
|
seriesParts := make([]string, 0, len(tagset))
|
||||||
|
seriesParts = append(seriesParts, tagset...)
|
||||||
|
seriesKey := strings.Join(seriesParts, ",")
|
||||||
|
|
||||||
|
p, ok := points[seriesKey]
|
||||||
|
if !ok {
|
||||||
|
p = &point{}
|
||||||
|
p.tags = tags
|
||||||
|
p.labels = make(map[string]string)
|
||||||
|
p.fields = make(map[string]interface{})
|
||||||
|
points[seriesKey] = p
|
||||||
|
}
|
||||||
|
|
||||||
|
if dp.Unit == "bytes" {
|
||||||
|
p.fields[fieldKey] = int64(dp.Value)
|
||||||
|
} else {
|
||||||
|
p.fields[fieldKey] = dp.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
results := make([]*point, 0, len(points))
|
||||||
|
for _, p := range points {
|
||||||
|
for k, v := range m.Dimensions {
|
||||||
|
switch v := v.(type) {
|
||||||
|
case string:
|
||||||
|
p.tags[k] = v
|
||||||
|
case map[string]string:
|
||||||
|
if k == "labels" {
|
||||||
|
for k, v := range v {
|
||||||
|
p.labels[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
results = append(results, p)
|
||||||
|
}
|
||||||
|
return results
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DCOS) addMetrics(acc telegraf.Accumulator, cluster, mname string, m *Metrics, tagDimensions []string) {
|
||||||
|
tm := time.Now()
|
||||||
|
|
||||||
|
points := d.createPoints(acc, m)
|
||||||
|
|
||||||
|
for _, p := range points {
|
||||||
|
tags := make(map[string]string)
|
||||||
|
tags["cluster"] = cluster
|
||||||
|
for _, tagkey := range tagDimensions {
|
||||||
|
v, ok := p.tags[tagkey]
|
||||||
|
if ok {
|
||||||
|
tags[tagkey] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for k, v := range p.labels {
|
||||||
|
tags[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
acc.AddFields(mname, p.fields, tags, tm)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DCOS) addNodeMetrics(acc telegraf.Accumulator, cluster string, m *Metrics) {
|
||||||
|
d.addMetrics(acc, cluster, "dcos_node", m, nodeDimensions)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DCOS) addContainerMetrics(acc telegraf.Accumulator, cluster string, m *Metrics) {
|
||||||
|
d.addMetrics(acc, cluster, "dcos_container", m, containerDimensions)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DCOS) addAppMetrics(acc telegraf.Accumulator, cluster string, m *Metrics) {
|
||||||
|
d.addMetrics(acc, cluster, "dcos_app", m, appDimensions)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DCOS) init() error {
|
||||||
|
if !d.initialized {
|
||||||
|
err := d.createFilters()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if d.client == nil {
|
||||||
|
client, err := d.createClient()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
d.client = client
|
||||||
|
}
|
||||||
|
|
||||||
|
if d.creds == nil {
|
||||||
|
creds, err := d.createCredentials()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
d.creds = creds
|
||||||
|
}
|
||||||
|
|
||||||
|
d.initialized = true
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DCOS) createClient() (Client, error) {
|
||||||
|
tlsCfg, err := internal.GetTLSConfig(
|
||||||
|
d.SSLCert, d.SSLKey, d.SSLCA, d.InsecureSkipVerify)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
url, err := url.Parse(d.ClusterURL)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
client := NewClusterClient(
|
||||||
|
url,
|
||||||
|
d.ResponseTimeout.Duration,
|
||||||
|
d.MaxConnections,
|
||||||
|
tlsCfg,
|
||||||
|
)
|
||||||
|
|
||||||
|
return client, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DCOS) createCredentials() (Credentials, error) {
|
||||||
|
if d.ServiceAccountID != "" && d.ServiceAccountPrivateKey != "" {
|
||||||
|
bs, err := ioutil.ReadFile(d.ServiceAccountPrivateKey)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
privateKey, err := jwt.ParseRSAPrivateKeyFromPEM(bs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
creds := &ServiceAccount{
|
||||||
|
AccountID: d.ServiceAccountID,
|
||||||
|
PrivateKey: privateKey,
|
||||||
|
}
|
||||||
|
return creds, nil
|
||||||
|
} else if d.TokenFile != "" {
|
||||||
|
creds := &TokenCreds{
|
||||||
|
Path: d.TokenFile,
|
||||||
|
}
|
||||||
|
return creds, nil
|
||||||
|
} else {
|
||||||
|
creds := &NullCreds{}
|
||||||
|
return creds, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DCOS) createFilters() error {
|
||||||
|
var err error
|
||||||
|
d.nodeFilter, err = filter.NewIncludeExcludeFilter(
|
||||||
|
d.NodeInclude, d.NodeExclude)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
d.containerFilter, err = filter.NewIncludeExcludeFilter(
|
||||||
|
d.ContainerInclude, d.ContainerExclude)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
d.appFilter, err = filter.NewIncludeExcludeFilter(
|
||||||
|
d.AppInclude, d.AppExclude)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
inputs.Add("dcos", func() telegraf.Input {
|
||||||
|
return &DCOS{
|
||||||
|
MaxConnections: defaultMaxConnections,
|
||||||
|
ResponseTimeout: internal.Duration{
|
||||||
|
Duration: defaultResponseTimeout,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,441 @@
|
||||||
|
package dcos
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type mockClient struct {
|
||||||
|
SetTokenF func(token string)
|
||||||
|
LoginF func(ctx context.Context, sa *ServiceAccount) (*AuthToken, error)
|
||||||
|
GetSummaryF func(ctx context.Context) (*Summary, error)
|
||||||
|
GetContainersF func(ctx context.Context, node string) ([]Container, error)
|
||||||
|
GetNodeMetricsF func(ctx context.Context, node string) (*Metrics, error)
|
||||||
|
GetContainerMetricsF func(ctx context.Context, node, container string) (*Metrics, error)
|
||||||
|
GetAppMetricsF func(ctx context.Context, node, container string) (*Metrics, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mockClient) SetToken(token string) {
|
||||||
|
c.SetTokenF(token)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mockClient) Login(ctx context.Context, sa *ServiceAccount) (*AuthToken, error) {
|
||||||
|
return c.LoginF(ctx, sa)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mockClient) GetSummary(ctx context.Context) (*Summary, error) {
|
||||||
|
return c.GetSummaryF(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mockClient) GetContainers(ctx context.Context, node string) ([]Container, error) {
|
||||||
|
return c.GetContainersF(ctx, node)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mockClient) GetNodeMetrics(ctx context.Context, node string) (*Metrics, error) {
|
||||||
|
return c.GetNodeMetricsF(ctx, node)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mockClient) GetContainerMetrics(ctx context.Context, node, container string) (*Metrics, error) {
|
||||||
|
return c.GetContainerMetricsF(ctx, node, container)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mockClient) GetAppMetrics(ctx context.Context, node, container string) (*Metrics, error) {
|
||||||
|
return c.GetAppMetricsF(ctx, node, container)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAddNodeMetrics(t *testing.T) {
|
||||||
|
var tests = []struct {
|
||||||
|
name string
|
||||||
|
metrics *Metrics
|
||||||
|
check func(*testutil.Accumulator) []bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "basic datapoint conversion",
|
||||||
|
metrics: &Metrics{
|
||||||
|
Datapoints: []DataPoint{
|
||||||
|
{
|
||||||
|
Name: "process.count",
|
||||||
|
Unit: "count",
|
||||||
|
Value: 42.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
check: func(acc *testutil.Accumulator) []bool {
|
||||||
|
return []bool{acc.HasPoint(
|
||||||
|
"dcos_node",
|
||||||
|
map[string]string{
|
||||||
|
"cluster": "a",
|
||||||
|
},
|
||||||
|
"process_count", 42.0,
|
||||||
|
)}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "path added as tag",
|
||||||
|
metrics: &Metrics{
|
||||||
|
Datapoints: []DataPoint{
|
||||||
|
{
|
||||||
|
Name: "filesystem.inode.free",
|
||||||
|
Tags: map[string]string{
|
||||||
|
"path": "/var/lib",
|
||||||
|
},
|
||||||
|
Unit: "count",
|
||||||
|
Value: 42.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
check: func(acc *testutil.Accumulator) []bool {
|
||||||
|
return []bool{acc.HasPoint(
|
||||||
|
"dcos_node",
|
||||||
|
map[string]string{
|
||||||
|
"cluster": "a",
|
||||||
|
"path": "/var/lib",
|
||||||
|
},
|
||||||
|
"filesystem_inode_free", 42.0,
|
||||||
|
)}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "interface added as tag",
|
||||||
|
metrics: &Metrics{
|
||||||
|
Datapoints: []DataPoint{
|
||||||
|
{
|
||||||
|
Name: "network.out.dropped",
|
||||||
|
Tags: map[string]string{
|
||||||
|
"interface": "eth0",
|
||||||
|
},
|
||||||
|
Unit: "count",
|
||||||
|
Value: 42.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
check: func(acc *testutil.Accumulator) []bool {
|
||||||
|
return []bool{acc.HasPoint(
|
||||||
|
"dcos_node",
|
||||||
|
map[string]string{
|
||||||
|
"cluster": "a",
|
||||||
|
"interface": "eth0",
|
||||||
|
},
|
||||||
|
"network_out_dropped", 42.0,
|
||||||
|
)}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "bytes unit appended to fieldkey",
|
||||||
|
metrics: &Metrics{
|
||||||
|
Datapoints: []DataPoint{
|
||||||
|
{
|
||||||
|
Name: "network.in",
|
||||||
|
Tags: map[string]string{
|
||||||
|
"interface": "eth0",
|
||||||
|
},
|
||||||
|
Unit: "bytes",
|
||||||
|
Value: 42.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
check: func(acc *testutil.Accumulator) []bool {
|
||||||
|
return []bool{acc.HasPoint(
|
||||||
|
"dcos_node",
|
||||||
|
map[string]string{
|
||||||
|
"cluster": "a",
|
||||||
|
"interface": "eth0",
|
||||||
|
},
|
||||||
|
"network_in_bytes", int64(42),
|
||||||
|
)}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "dimensions added as tags",
|
||||||
|
metrics: &Metrics{
|
||||||
|
Datapoints: []DataPoint{
|
||||||
|
{
|
||||||
|
Name: "process.count",
|
||||||
|
Tags: map[string]string{},
|
||||||
|
Unit: "count",
|
||||||
|
Value: 42.0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "memory.total",
|
||||||
|
Tags: map[string]string{},
|
||||||
|
Unit: "bytes",
|
||||||
|
Value: 42,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Dimensions: map[string]interface{}{
|
||||||
|
"cluster_id": "c0760bbd-9e9d-434b-bd4a-39c7cdef8a63",
|
||||||
|
"hostname": "192.168.122.18",
|
||||||
|
"mesos_id": "2dfbbd28-29d2-411d-92c4-e2f84c38688e-S1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
check: func(acc *testutil.Accumulator) []bool {
|
||||||
|
return []bool{
|
||||||
|
acc.HasPoint(
|
||||||
|
"dcos_node",
|
||||||
|
map[string]string{
|
||||||
|
"cluster": "a",
|
||||||
|
"hostname": "192.168.122.18",
|
||||||
|
},
|
||||||
|
"process_count", 42.0),
|
||||||
|
acc.HasPoint(
|
||||||
|
"dcos_node",
|
||||||
|
map[string]string{
|
||||||
|
"cluster": "a",
|
||||||
|
"hostname": "192.168.122.18",
|
||||||
|
},
|
||||||
|
"memory_total_bytes", int64(42)),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
dcos := &DCOS{}
|
||||||
|
dcos.addNodeMetrics(&acc, "a", tt.metrics)
|
||||||
|
for i, ok := range tt.check(&acc) {
|
||||||
|
require.True(t, ok, fmt.Sprintf("Index was not true: %d", i))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAddContainerMetrics(t *testing.T) {
|
||||||
|
var tests = []struct {
|
||||||
|
name string
|
||||||
|
metrics *Metrics
|
||||||
|
check func(*testutil.Accumulator) []bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "container",
|
||||||
|
metrics: &Metrics{
|
||||||
|
Datapoints: []DataPoint{
|
||||||
|
{
|
||||||
|
Name: "net.rx.errors",
|
||||||
|
Tags: map[string]string{
|
||||||
|
"container_id": "f25c457b-fceb-44f0-8f5b-38be34cbb6fb",
|
||||||
|
"executor_id": "telegraf.192fb45f-cc0c-11e7-af48-ea183c0b541a",
|
||||||
|
"executor_name": "Command Executor (Task: telegraf.192fb45f-cc0c-11e7-af48-ea183c0b541a) (Command: NO EXECUTABLE)",
|
||||||
|
"framework_id": "ab2f3a8b-06db-4e8c-95b6-fb1940874a30-0001",
|
||||||
|
"source": "telegraf.192fb45f-cc0c-11e7-af48-ea183c0b541a",
|
||||||
|
},
|
||||||
|
Unit: "count",
|
||||||
|
Value: 42.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Dimensions: map[string]interface{}{
|
||||||
|
"cluster_id": "c0760bbd-9e9d-434b-bd4a-39c7cdef8a63",
|
||||||
|
"container_id": "f25c457b-fceb-44f0-8f5b-38be34cbb6fb",
|
||||||
|
"executor_id": "telegraf.192fb45f-cc0c-11e7-af48-ea183c0b541a",
|
||||||
|
"framework_id": "ab2f3a8b-06db-4e8c-95b6-fb1940874a30-0001",
|
||||||
|
"framework_name": "marathon",
|
||||||
|
"framework_principal": "dcos_marathon",
|
||||||
|
"framework_role": "slave_public",
|
||||||
|
"hostname": "192.168.122.18",
|
||||||
|
"labels": map[string]string{
|
||||||
|
"DCOS_SPACE": "/telegraf",
|
||||||
|
},
|
||||||
|
"mesos_id": "2dfbbd28-29d2-411d-92c4-e2f84c38688e-S1",
|
||||||
|
"task_id": "telegraf.192fb45f-cc0c-11e7-af48-ea183c0b541a",
|
||||||
|
"task_name": "telegraf",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
check: func(acc *testutil.Accumulator) []bool {
|
||||||
|
return []bool{
|
||||||
|
acc.HasPoint(
|
||||||
|
"dcos_container",
|
||||||
|
map[string]string{
|
||||||
|
"cluster": "a",
|
||||||
|
"container_id": "f25c457b-fceb-44f0-8f5b-38be34cbb6fb",
|
||||||
|
"hostname": "192.168.122.18",
|
||||||
|
"task_name": "telegraf",
|
||||||
|
"DCOS_SPACE": "/telegraf",
|
||||||
|
},
|
||||||
|
"net_rx_errors",
|
||||||
|
42.0,
|
||||||
|
),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
dcos := &DCOS{}
|
||||||
|
dcos.addContainerMetrics(&acc, "a", tt.metrics)
|
||||||
|
for i, ok := range tt.check(&acc) {
|
||||||
|
require.True(t, ok, fmt.Sprintf("Index was not true: %d", i))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAddAppMetrics(t *testing.T) {
|
||||||
|
var tests = []struct {
|
||||||
|
name string
|
||||||
|
metrics *Metrics
|
||||||
|
check func(*testutil.Accumulator) []bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "tags are optional",
|
||||||
|
metrics: &Metrics{
|
||||||
|
Datapoints: []DataPoint{
|
||||||
|
{
|
||||||
|
Name: "dcos.metrics.module.container_throttled_bytes_per_sec",
|
||||||
|
Unit: "",
|
||||||
|
Value: 42.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
check: func(acc *testutil.Accumulator) []bool {
|
||||||
|
return []bool{
|
||||||
|
acc.HasPoint(
|
||||||
|
"dcos_app",
|
||||||
|
map[string]string{
|
||||||
|
"cluster": "a",
|
||||||
|
},
|
||||||
|
"container_throttled_bytes_per_sec", 42.0,
|
||||||
|
),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "dimensions are tagged",
|
||||||
|
metrics: &Metrics{
|
||||||
|
Datapoints: []DataPoint{
|
||||||
|
{
|
||||||
|
Name: "dcos.metrics.module.container_throttled_bytes_per_sec",
|
||||||
|
Unit: "",
|
||||||
|
Value: 42.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Dimensions: map[string]interface{}{
|
||||||
|
"cluster_id": "c0760bbd-9e9d-434b-bd4a-39c7cdef8a63",
|
||||||
|
"container_id": "02d31175-1c01-4459-8520-ef8b1339bc52",
|
||||||
|
"hostname": "192.168.122.18",
|
||||||
|
"mesos_id": "2dfbbd28-29d2-411d-92c4-e2f84c38688e-S1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
check: func(acc *testutil.Accumulator) []bool {
|
||||||
|
return []bool{
|
||||||
|
acc.HasPoint(
|
||||||
|
"dcos_app",
|
||||||
|
map[string]string{
|
||||||
|
"cluster": "a",
|
||||||
|
"container_id": "02d31175-1c01-4459-8520-ef8b1339bc52",
|
||||||
|
"hostname": "192.168.122.18",
|
||||||
|
},
|
||||||
|
"container_throttled_bytes_per_sec", 42.0,
|
||||||
|
),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
dcos := &DCOS{}
|
||||||
|
dcos.addAppMetrics(&acc, "a", tt.metrics)
|
||||||
|
for i, ok := range tt.check(&acc) {
|
||||||
|
require.True(t, ok, fmt.Sprintf("Index was not true: %d", i))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGatherFilterNode(t *testing.T) {
|
||||||
|
var tests = []struct {
|
||||||
|
name string
|
||||||
|
nodeInclude []string
|
||||||
|
nodeExclude []string
|
||||||
|
client Client
|
||||||
|
check func(*testutil.Accumulator) []bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "cluster without nodes has no metrics",
|
||||||
|
client: &mockClient{
|
||||||
|
SetTokenF: func(token string) {},
|
||||||
|
GetSummaryF: func(ctx context.Context) (*Summary, error) {
|
||||||
|
return &Summary{
|
||||||
|
Cluster: "a",
|
||||||
|
Slaves: []Slave{},
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
check: func(acc *testutil.Accumulator) []bool {
|
||||||
|
return []bool{
|
||||||
|
acc.NMetrics() == 0,
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "node include",
|
||||||
|
nodeInclude: []string{"x"},
|
||||||
|
client: &mockClient{
|
||||||
|
SetTokenF: func(token string) {},
|
||||||
|
GetSummaryF: func(ctx context.Context) (*Summary, error) {
|
||||||
|
return &Summary{
|
||||||
|
Cluster: "a",
|
||||||
|
Slaves: []Slave{
|
||||||
|
Slave{ID: "x"},
|
||||||
|
Slave{ID: "y"},
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
GetContainersF: func(ctx context.Context, node string) ([]Container, error) {
|
||||||
|
return []Container{}, nil
|
||||||
|
},
|
||||||
|
GetNodeMetricsF: func(ctx context.Context, node string) (*Metrics, error) {
|
||||||
|
return &Metrics{
|
||||||
|
Datapoints: []DataPoint{
|
||||||
|
{
|
||||||
|
Name: "value",
|
||||||
|
Value: 42.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Dimensions: map[string]interface{}{
|
||||||
|
"hostname": "x",
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
check: func(acc *testutil.Accumulator) []bool {
|
||||||
|
return []bool{
|
||||||
|
acc.HasPoint(
|
||||||
|
"dcos_node",
|
||||||
|
map[string]string{
|
||||||
|
"cluster": "a",
|
||||||
|
"hostname": "x",
|
||||||
|
},
|
||||||
|
"value", 42.0,
|
||||||
|
),
|
||||||
|
acc.NMetrics() == 1,
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
dcos := &DCOS{
|
||||||
|
NodeInclude: tt.nodeInclude,
|
||||||
|
NodeExclude: tt.nodeExclude,
|
||||||
|
client: tt.client,
|
||||||
|
}
|
||||||
|
err := dcos.Gather(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
for i, ok := range tt.check(&acc) {
|
||||||
|
require.True(t, ok, fmt.Sprintf("Index was not true: %d", i))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue