diff --git a/Godeps b/Godeps index 7e90dd061..8c13d6db8 100644 --- a/Godeps +++ b/Godeps @@ -10,6 +10,7 @@ github.com/couchbase/go-couchbase bfe555a140d53dc1adf390f1a1d4b0fd4ceadb28 github.com/couchbase/gomemcached 4a25d2f4e1dea9ea7dd76dfd943407abf9b07d29 github.com/couchbase/goutils 5823a0cbaaa9008406021dc5daf80125ea30bba6 github.com/davecgh/go-spew 346938d642f2ec3594ed81d874461961cd0faa76 +github.com/dgrijalva/jwt-go dbeaa9332f19a944acb5736b4456cfcc02140e29 github.com/docker/docker f5ec1e2936dcbe7b5001c2b817188b095c700c27 github.com/docker/go-connections 990a1a1a70b0da4c4cb70e117971a4f0babfbf1a github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3 diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 1324e7740..ec66f64f4 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -15,6 +15,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/consul" _ "github.com/influxdata/telegraf/plugins/inputs/couchbase" _ "github.com/influxdata/telegraf/plugins/inputs/couchdb" + _ "github.com/influxdata/telegraf/plugins/inputs/dcos" _ "github.com/influxdata/telegraf/plugins/inputs/disque" _ "github.com/influxdata/telegraf/plugins/inputs/dmcache" _ "github.com/influxdata/telegraf/plugins/inputs/dns_query" diff --git a/plugins/inputs/dcos/README.md b/plugins/inputs/dcos/README.md new file mode 100644 index 000000000..a1384402f --- /dev/null +++ b/plugins/inputs/dcos/README.md @@ -0,0 +1,209 @@ +# DC/OS Input Plugin + +This input plugin gathers metrics from a DC/OS cluster's [metrics component](https://docs.mesosphere.com/1.10/metrics/). + +**Series Cardinality Warning** + +Depending on the work load of your DC/OS cluster, this plugin can quickly +create a high number of series which, when unchecked, can cause high load on +your database. + +- Use [measurement filtering](https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md#measurement-filtering) liberally to exclude unneeded metrics as well as the node, container, and app inclue/exclude options. +- Write to a database with an appropriate [retention policy](https://docs.influxdata.com/influxdb/v1.3/concepts/glossary/#retention-policy-rp). +- Limit the number of series allowed in your database using the `max-series-per-database` and `max-values-per-tag` settings. +- Consider enabling the [TSI](https://docs.influxdata.com/influxdb/v1.3/about_the_project/releasenotes-changelog/#release-notes-8) engine. +- Monitor your [series cardinality](https://docs.influxdata.com/influxdb/v1.3/troubleshooting/frequently-asked-questions/#how-can-i-query-for-series-cardinality). + +### Configuration: +```toml +[[inputs.dcos]] + ## The DC/OS cluster URL. + cluster_url = "https://dcos-master-1" + + ## The ID of the service account. + service_account_id = "telegraf" + ## The private key file for the service account. + service_account_private_key = "/etc/telegraf/telegraf-sa-key.pem" + + ## Path containing login token. If set, will read on every gather. + # token_file = "/home/dcos/.dcos/token" + + ## In all filter options if both include and exclude are empty all items + ## will be collected. Arrays may contain glob patterns. + ## + ## Node IDs to collect metrics from. If a node is excluded, no metrics will + ## be collected for its containers or apps. + # node_include = [] + # node_exclude = [] + ## Container IDs to collect container metrics from. + # container_include = [] + # container_exclude = [] + ## Container IDs to collect app metrics from. + # app_include = [] + # app_exclude = [] + + ## Maximum concurrent connections to the cluster. + # max_connections = 10 + ## Maximum time to receive a response from cluster. + # response_timeout = "20s" + + ## Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ## If false, skip chain & host verification + # insecure_skip_verify = true + + ## Recommended filtering to reduce series cardinality. + # [inputs.dcos.tagdrop] + # path = ["/var/lib/mesos/slave/slaves/*"] +``` + +#### Enterprise Authentication + +When using Enterprise DC/OS, it is recommended to use a service account to +authenticate with the cluster. + +The plugin requires the following permissions: +``` +dcos:adminrouter:ops:system-metrics full +dcos:adminrouter:ops:mesos full +``` + +Follow the directions to [create a service account and assign permissions](https://docs.mesosphere.com/1.10/security/service-auth/custom-service-auth/). + +Quick configuration using the Enterprise CLI: +``` +dcos security org service-accounts keypair telegraf-sa-key.pem telegraf-sa-cert.pem +dcos security org service-accounts create -p telegraf-sa-cert.pem -d "Telegraf DC/OS input plugin" telegraf +dcos security org users grant telegraf dcos:adminrouter:ops:system-metrics full +dcos security org users grant telegraf dcos:adminrouter:ops:mesos full +``` + +#### Open Source Authentication + +The Open Source DC/OS does not provide service accounts. Instead you can use +of the following options: + +1. [Disable authentication](https://dcos.io/docs/1.10/security/managing-authentication/#authentication-opt-out) +2. Use the `token_file` parameter to read a authentication token from a file. + +Then `token_file` can be set by using the [dcos cli] to login periodically. +The cli can login for at most XXX days, you will need to ensure the cli +performs a new login before this time expires. +``` +dcos auth login --username foo --password bar +dcos config show core.dcos_acs_token > ~/.dcos/token +``` + +Another option to create a `token_file` is to generate a token using the +cluster secret. This will allow you to set the expiration date manually or +even create a never expiring token. However, if the cluster secret or the +token is compromised it cannot be revoked and may require a full reinstall of +the cluster. For more information on this technique reference +[this blog post](https://medium.com/@richardgirges/authenticating-open-source-dc-os-with-third-party-services-125fa33a5add). + +### Metrics: + +Please consult the [Metrics Reference](https://docs.mesosphere.com/1.10/metrics/reference/) +for details on interprete field interpretation. + +- dcos_node + - tags: + - cluster + - hostname + - path (filesystem fields only) + - interface (network fields only) + - fields: + - system_uptime (float) + - cpu_cores (float) + - cpu_total (float) + - cpu_user (float) + - cpu_system (float) + - cpu_idle (float) + - cpu_wait (float) + - load_1min (float) + - load_5min (float) + - load_15min (float) + - filesystem_capacity_total_bytes (int) + - filesystem_capacity_used_bytes (int) + - filesystem_capacity_free_bytes (int) + - filesystem_inode_total (float) + - filesystem_inode_used (float) + - filesystem_inode_free (float) + - memory_total_bytes (int) + - memory_free_bytes (int) + - memory_buffers_bytes (int) + - memory_cached_bytes (int) + - swap_total_bytes (int) + - swap_free_bytes (int) + - swap_used_bytes (int) + - network_in_bytes (int) + - network_out_bytes (int) + - network_in_packets (float) + - network_out_packets (float) + - network_in_dropped (float) + - network_out_dropped (float) + - network_in_errors (float) + - network_out_errors (float) + - process_count (float) + +- dcos_container + - tags: + - cluster + - hostname + - container_id + - task_name + - fields: + - cpus_limit (float) + - cpus_system_time (float) + - cpus_throttled_time (float) + - cpus_user_time (float) + - disk_limit_bytes (int) + - disk_used_bytes (int) + - mem_limit_bytes (int) + - mem_total_bytes (int) + - net_rx_bytes (int) + - net_rx_dropped (float) + - net_rx_errors (float) + - net_rx_packets (float) + - net_tx_bytes (int) + - net_tx_dropped (float) + - net_tx_errors (float) + - net_tx_packets (float) + +- dcos_app + - tags: + - cluster + - hostname + - container_id + - task_name + - fields: + - fields are application specific + +### Example Output: + +``` +dcos_node,cluster=enterprise,hostname=192.168.122.18,path=/boot filesystem_capacity_free_bytes=918188032i,filesystem_capacity_total_bytes=1063256064i,filesystem_capacity_used_bytes=145068032i,filesystem_inode_free=523958,filesystem_inode_total=524288,filesystem_inode_used=330 1511859222000000000 +dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=dummy0 network_in_bytes=0i,network_in_dropped=0,network_in_errors=0,network_in_packets=0,network_out_bytes=0i,network_out_dropped=0,network_out_errors=0,network_out_packets=0 1511859222000000000 +dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=docker0 network_in_bytes=0i,network_in_dropped=0,network_in_errors=0,network_in_packets=0,network_out_bytes=0i,network_out_dropped=0,network_out_errors=0,network_out_packets=0 1511859222000000000 +dcos_node,cluster=enterprise,hostname=192.168.122.18 cpu_cores=2,cpu_idle=81.62,cpu_system=4.19,cpu_total=13.670000000000002,cpu_user=9.48,cpu_wait=0,load_15min=0.7,load_1min=0.22,load_5min=0.6,memory_buffers_bytes=970752i,memory_cached_bytes=1830473728i,memory_free_bytes=1178636288i,memory_total_bytes=3975073792i,process_count=198,swap_free_bytes=859828224i,swap_total_bytes=859828224i,swap_used_bytes=0i,system_uptime=18874 1511859222000000000 +dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=lo network_in_bytes=1090992450i,network_in_dropped=0,network_in_errors=0,network_in_packets=1546938,network_out_bytes=1090992450i,network_out_dropped=0,network_out_errors=0,network_out_packets=1546938 1511859222000000000 +dcos_node,cluster=enterprise,hostname=192.168.122.18,path=/ filesystem_capacity_free_bytes=1668378624i,filesystem_capacity_total_bytes=6641680384i,filesystem_capacity_used_bytes=4973301760i,filesystem_inode_free=3107856,filesystem_inode_total=3248128,filesystem_inode_used=140272 1511859222000000000 +dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=minuteman network_in_bytes=0i,network_in_dropped=0,network_in_errors=0,network_in_packets=0,network_out_bytes=210i,network_out_dropped=0,network_out_errors=0,network_out_packets=3 1511859222000000000 +dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=eth0 network_in_bytes=539886216i,network_in_dropped=1,network_in_errors=0,network_in_packets=979808,network_out_bytes=112395836i,network_out_dropped=0,network_out_errors=0,network_out_packets=891239 1511859222000000000 +dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=spartan network_in_bytes=0i,network_in_dropped=0,network_in_errors=0,network_in_packets=0,network_out_bytes=210i,network_out_dropped=0,network_out_errors=0,network_out_packets=3 1511859222000000000 +dcos_node,cluster=enterprise,hostname=192.168.122.18,path=/var/lib/docker/overlay filesystem_capacity_free_bytes=1668378624i,filesystem_capacity_total_bytes=6641680384i,filesystem_capacity_used_bytes=4973301760i,filesystem_inode_free=3107856,filesystem_inode_total=3248128,filesystem_inode_used=140272 1511859222000000000 +dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=vtep1024 network_in_bytes=0i,network_in_dropped=0,network_in_errors=0,network_in_packets=0,network_out_bytes=0i,network_out_dropped=0,network_out_errors=0,network_out_packets=0 1511859222000000000 +dcos_node,cluster=enterprise,hostname=192.168.122.18,path=/var/lib/docker/plugins filesystem_capacity_free_bytes=1668378624i,filesystem_capacity_total_bytes=6641680384i,filesystem_capacity_used_bytes=4973301760i,filesystem_inode_free=3107856,filesystem_inode_total=3248128,filesystem_inode_used=140272 1511859222000000000 +dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=d-dcos network_in_bytes=0i,network_in_dropped=0,network_in_errors=0,network_in_packets=0,network_out_bytes=0i,network_out_dropped=0,network_out_errors=0,network_out_packets=0 1511859222000000000 +dcos_app,cluster=enterprise,container_id=9a78d34a-3bbf-467e-81cf-a57737f154ee,hostname=192.168.122.18 container_received_bytes_per_sec=0,container_throttled_bytes_per_sec=0 1511859222000000000 +dcos_container,cluster=enterprise,container_id=cbf19b77-3b8d-4bcf-b81f-824b67279629,hostname=192.168.122.18 cpus_limit=0.3,cpus_system_time=307.31,cpus_throttled_time=102.029930607,cpus_user_time=268.57,disk_limit_bytes=268435456i,disk_used_bytes=30953472i,mem_limit_bytes=570425344i,mem_total_bytes=13316096i,net_rx_bytes=0i,net_rx_dropped=0,net_rx_errors=0,net_rx_packets=0,net_tx_bytes=0i,net_tx_dropped=0,net_tx_errors=0,net_tx_packets=0 1511859222000000000 +dcos_app,cluster=enterprise,container_id=cbf19b77-3b8d-4bcf-b81f-824b67279629,hostname=192.168.122.18 container_received_bytes_per_sec=0,container_throttled_bytes_per_sec=0 1511859222000000000 +dcos_container,cluster=enterprise,container_id=5725e219-f66e-40a8-b3ab-519d85f4c4dc,hostname=192.168.122.18,task_name=hello-world cpus_limit=0.6,cpus_system_time=25.6,cpus_throttled_time=327.977109217,cpus_user_time=566.54,disk_limit_bytes=0i,disk_used_bytes=0i,mem_limit_bytes=1107296256i,mem_total_bytes=335941632i,net_rx_bytes=0i,net_rx_dropped=0,net_rx_errors=0,net_rx_packets=0,net_tx_bytes=0i,net_tx_dropped=0,net_tx_errors=0,net_tx_packets=0 1511859222000000000 +dcos_app,cluster=enterprise,container_id=5725e219-f66e-40a8-b3ab-519d85f4c4dc,hostname=192.168.122.18 container_received_bytes_per_sec=0,container_throttled_bytes_per_sec=0 1511859222000000000 +dcos_app,cluster=enterprise,container_id=c76e1488-4fb7-4010-a4cf-25725f8173f9,hostname=192.168.122.18 container_received_bytes_per_sec=0,container_throttled_bytes_per_sec=0 1511859222000000000 +dcos_container,cluster=enterprise,container_id=cbe0b2f9-061f-44ac-8f15-4844229e8231,hostname=192.168.122.18,task_name=telegraf cpus_limit=0.2,cpus_system_time=8.109999999,cpus_throttled_time=93.183916045,cpus_user_time=17.97,disk_limit_bytes=0i,disk_used_bytes=0i,mem_limit_bytes=167772160i,mem_total_bytes=0i,net_rx_bytes=0i,net_rx_dropped=0,net_rx_errors=0,net_rx_packets=0,net_tx_bytes=0i,net_tx_dropped=0,net_tx_errors=0,net_tx_packets=0 1511859222000000000 +dcos_container,cluster=enterprise,container_id=b64115de-3d2a-431d-a805-76e7c46453f1,hostname=192.168.122.18 cpus_limit=0.2,cpus_system_time=2.69,cpus_throttled_time=20.064861214,cpus_user_time=6.56,disk_limit_bytes=268435456i,disk_used_bytes=29360128i,mem_limit_bytes=297795584i,mem_total_bytes=13733888i,net_rx_bytes=0i,net_rx_dropped=0,net_rx_errors=0,net_rx_packets=0,net_tx_bytes=0i,net_tx_dropped=0,net_tx_errors=0,net_tx_packets=0 1511859222000000000 +dcos_app,cluster=enterprise,container_id=b64115de-3d2a-431d-a805-76e7c46453f1,hostname=192.168.122.18 container_received_bytes_per_sec=0,container_throttled_bytes_per_sec=0 1511859222000000000 +``` diff --git a/plugins/inputs/dcos/client.go b/plugins/inputs/dcos/client.go new file mode 100644 index 000000000..71165e9fb --- /dev/null +++ b/plugins/inputs/dcos/client.go @@ -0,0 +1,332 @@ +package dcos + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "fmt" + "net/http" + "net/url" + "time" + + jwt "github.com/dgrijalva/jwt-go" +) + +const ( + // How long to stayed logged in for + loginDuration = 65 * time.Minute +) + +// Client is an interface for communicating with the DC/OS API. +type Client interface { + SetToken(token string) + + Login(ctx context.Context, sa *ServiceAccount) (*AuthToken, error) + GetSummary(ctx context.Context) (*Summary, error) + GetContainers(ctx context.Context, node string) ([]Container, error) + GetNodeMetrics(ctx context.Context, node string) (*Metrics, error) + GetContainerMetrics(ctx context.Context, node, container string) (*Metrics, error) + GetAppMetrics(ctx context.Context, node, container string) (*Metrics, error) +} + +type APIError struct { + StatusCode int + Title string + Description string +} + +// Login is request data for logging in. +type Login struct { + UID string `json:"uid"` + Exp int64 `json:"exp"` + Token string `json:"token"` +} + +// LoginError is the response when login fails. +type LoginError struct { + Title string `json:"title"` + Description string `json:"description"` +} + +// LoginAuth is the response to a successful login. +type LoginAuth struct { + Token string `json:"token"` +} + +// Slave is a node in the cluster. +type Slave struct { + ID string `json:"id"` +} + +// Summary provides high level cluster wide information. +type Summary struct { + Cluster string + Slaves []Slave +} + +// Container is a container on a node. +type Container struct { + ID string +} + +type DataPoint struct { + Name string `json:"name"` + Tags map[string]string `json:"tags"` + Unit string `json:"unit"` + Value float64 `json:"value"` +} + +// Metrics are the DCOS metrics +type Metrics struct { + Datapoints []DataPoint `json:"datapoints"` + Dimensions map[string]interface{} `json:"dimensions"` +} + +// AuthToken is the authentication token. +type AuthToken struct { + Text string + Expire time.Time +} + +// ClusterClient is a Client that uses the cluster URL. +type ClusterClient struct { + clusterURL *url.URL + httpClient *http.Client + credentials *Credentials + token string + semaphore chan struct{} +} + +type claims struct { + UID string `json:"uid"` + jwt.StandardClaims +} + +func (e APIError) Error() string { + if e.Description != "" { + return fmt.Sprintf("%s: %s", e.Title, e.Description) + } + return e.Title +} + +func NewClusterClient( + clusterURL *url.URL, + timeout time.Duration, + maxConns int, + tlsConfig *tls.Config, +) *ClusterClient { + httpClient := &http.Client{ + Transport: &http.Transport{ + MaxIdleConns: maxConns, + TLSClientConfig: tlsConfig, + }, + Timeout: timeout, + } + semaphore := make(chan struct{}, maxConns) + + c := &ClusterClient{ + clusterURL: clusterURL, + httpClient: httpClient, + semaphore: semaphore, + } + return c +} + +func (c *ClusterClient) SetToken(token string) { + c.token = token +} + +func (c *ClusterClient) Login(ctx context.Context, sa *ServiceAccount) (*AuthToken, error) { + token, err := c.createLoginToken(sa) + if err != nil { + return nil, err + } + + exp := time.Now().Add(loginDuration) + + body := &Login{ + UID: sa.AccountID, + Exp: exp.Unix(), + Token: token, + } + + octets, err := json.Marshal(body) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", c.url("/acs/api/v1/auth/login"), bytes.NewBuffer(octets)) + if err != nil { + return nil, err + } + req.Header.Add("Content-Type", "application/json") + + req = req.WithContext(ctx) + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + auth := &LoginAuth{} + dec := json.NewDecoder(resp.Body) + err = dec.Decode(auth) + if err != nil { + return nil, err + } + + token := &AuthToken{ + Text: auth.Token, + Expire: exp, + } + return token, nil + } + + loginError := &LoginError{} + dec := json.NewDecoder(resp.Body) + err = dec.Decode(loginError) + if err != nil { + err := &APIError{ + StatusCode: resp.StatusCode, + Title: resp.Status, + } + return nil, err + } + + err = &APIError{ + StatusCode: resp.StatusCode, + Title: loginError.Title, + Description: loginError.Description, + } + return nil, err +} + +func (c *ClusterClient) GetSummary(ctx context.Context) (*Summary, error) { + summary := &Summary{} + err := c.doGet(ctx, c.url("/mesos/master/state-summary"), summary) + if err != nil { + return nil, err + } + + return summary, nil +} + +func (c *ClusterClient) GetContainers(ctx context.Context, node string) ([]Container, error) { + list := []string{} + + path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers", node) + err := c.doGet(ctx, c.url(path), &list) + if err != nil { + return nil, err + } + + containers := make([]Container, 0, len(list)) + for _, c := range list { + containers = append(containers, Container{ID: c}) + + } + + return containers, nil +} + +func (c *ClusterClient) getMetrics(ctx context.Context, url string) (*Metrics, error) { + metrics := &Metrics{} + + err := c.doGet(ctx, url, metrics) + if err != nil { + return nil, err + } + + return metrics, nil +} + +func (c *ClusterClient) GetNodeMetrics(ctx context.Context, node string) (*Metrics, error) { + path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/node", node) + return c.getMetrics(ctx, c.url(path)) +} + +func (c *ClusterClient) GetContainerMetrics(ctx context.Context, node, container string) (*Metrics, error) { + path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers/%s", node, container) + return c.getMetrics(ctx, c.url(path)) +} + +func (c *ClusterClient) GetAppMetrics(ctx context.Context, node, container string) (*Metrics, error) { + path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers/%s/app", node, container) + return c.getMetrics(ctx, c.url(path)) +} + +func createGetRequest(url string, token string) (*http.Request, error) { + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + + if token != "" { + req.Header.Add("Authorization", "token="+token) + } + req.Header.Add("Accept", "application/json") + + return req, nil +} + +func (c *ClusterClient) doGet(ctx context.Context, url string, v interface{}) error { + req, err := createGetRequest(url, c.token) + if err != nil { + return err + } + + select { + case c.semaphore <- struct{}{}: + break + case <-ctx.Done(): + return ctx.Err() + } + + resp, err := c.httpClient.Do(req.WithContext(ctx)) + if err != nil { + <-c.semaphore + return err + } + defer func() { + resp.Body.Close() + <-c.semaphore + }() + + // Clear invalid token if unauthorized + if resp.StatusCode == http.StatusUnauthorized { + c.token = "" + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return &APIError{ + StatusCode: resp.StatusCode, + Title: resp.Status, + } + } + + if resp.StatusCode == http.StatusNoContent { + return nil + } + + err = json.NewDecoder(resp.Body).Decode(v) + return err +} + +func (c *ClusterClient) url(path string) string { + url := c.clusterURL + url.Path = path + return url.String() +} + +func (c *ClusterClient) createLoginToken(sa *ServiceAccount) (string, error) { + token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims{ + UID: sa.AccountID, + StandardClaims: jwt.StandardClaims{ + // How long we have to login with this token + ExpiresAt: int64(5 * time.Minute / time.Second), + }, + }) + return token.SignedString(sa.PrivateKey) +} diff --git a/plugins/inputs/dcos/client_test.go b/plugins/inputs/dcos/client_test.go new file mode 100644 index 000000000..2781d10b7 --- /dev/null +++ b/plugins/inputs/dcos/client_test.go @@ -0,0 +1,232 @@ +package dcos + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + jwt "github.com/dgrijalva/jwt-go" + "github.com/stretchr/testify/require" +) + +const ( + privateKey = `-----BEGIN RSA PRIVATE KEY----- +MIICXQIBAAKBgQCwlGyzVp9cqtwiNCgCnaR0kilPZhr4xFBcnXxvQ8/uzOHaWKxj +XWR38cKR3gPh5+4iSmzMdo3HDJM5ks6imXGnp+LPOA5iNewnpLNs7UxA2arwKH/6 +4qIaAXAtf5jE46wZIMgc2EW9wGL3dxC0JY8EXPpBFB/3J8gADkorFR8lwwIDAQAB +AoGBAJaFHxfMmjHK77U0UnrQWFSKFy64cftmlL4t/Nl3q7L68PdIKULWZIMeEWZ4 +I0UZiFOwr4em83oejQ1ByGSwekEuiWaKUI85IaHfcbt+ogp9hY/XbOEo56OPQUAd +bEZv1JqJOqta9Ug1/E1P9LjEEyZ5F5ubx7813rxAE31qKtKJAkEA1zaMlCWIr+Rj +hGvzv5rlHH3wbOB4kQFXO4nqj3J/ttzR5QiJW24STMDcbNngFlVcDVju56LrNTiD +dPh9qvl7nwJBANILguR4u33OMksEZTYB7nQZSurqXsq6382zH7pTl29ANQTROHaM +PKC8dnDWq8RGTqKuvWblIzzGIKqIMovZo10CQC96T0UXirITFolOL3XjvAuvFO1Q +EAkdXJs77805m0dCK+P1IChVfiAEpBw3bKJArpAbQIlFfdI953JUp5SieU0CQEub +BSSEKMjh/cxu6peEHnb/262vayuCFKkQPu1sxWewLuVrAe36EKCy9dcsDmv5+rgo +Odjdxc9Madm4aKlaT6kCQQCpAgeblDrrxTrNQ+Typzo37PlnQrvI+0EceAUuJ72G +P0a+YZUeHNRqT2pPN9lMTAZGGi3CtcF2XScbLNEBeXge +-----END RSA PRIVATE KEY-----` +) + +func TestLogin(t *testing.T) { + var tests = []struct { + name string + responseCode int + responseBody string + expectedError error + expectedToken string + }{ + { + name: "Login successful", + responseCode: 200, + responseBody: `{"token": "XXX.YYY.ZZZ"}`, + expectedError: nil, + expectedToken: "XXX.YYY.ZZZ", + }, + { + name: "Unauthorized Error", + responseCode: http.StatusUnauthorized, + responseBody: `{"title": "x", "description": "y"}`, + expectedError: &APIError{http.StatusUnauthorized, "x", "y"}, + expectedToken: "", + }, + } + + key, err := jwt.ParseRSAPrivateKeyFromPEM([]byte(privateKey)) + require.NoError(t, err) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(tt.responseCode) + fmt.Fprintln(w, tt.responseBody) + }) + ts := httptest.NewServer(handler) + u, err := url.Parse(ts.URL) + require.NoError(t, err) + + ctx := context.Background() + sa := &ServiceAccount{ + AccountID: "telegraf", + PrivateKey: key, + } + client := NewClusterClient(u, defaultResponseTimeout, 1, nil) + auth, err := client.Login(ctx, sa) + + require.Equal(t, tt.expectedError, err) + + if tt.expectedToken != "" { + require.Equal(t, tt.expectedToken, auth.Text) + } else { + require.Nil(t, auth) + } + + ts.Close() + }) + } +} + +func TestGetSummary(t *testing.T) { + var tests = []struct { + name string + responseCode int + responseBody string + expectedValue *Summary + expectedError error + }{ + { + name: "No nodes", + responseCode: 200, + responseBody: `{"cluster": "a", "slaves": []}`, + expectedValue: &Summary{Cluster: "a", Slaves: []Slave{}}, + expectedError: nil, + }, + { + name: "Unauthorized Error", + responseCode: http.StatusUnauthorized, + responseBody: ``, + expectedValue: nil, + expectedError: &APIError{StatusCode: http.StatusUnauthorized, Title: "401 Unauthorized"}, + }, + { + name: "Has nodes", + responseCode: 200, + responseBody: `{"cluster": "a", "slaves": [{"id": "a"}, {"id": "b"}]}`, + expectedValue: &Summary{ + Cluster: "a", + Slaves: []Slave{ + Slave{ID: "a"}, + Slave{ID: "b"}, + }, + }, + expectedError: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // check the path + w.WriteHeader(tt.responseCode) + fmt.Fprintln(w, tt.responseBody) + }) + ts := httptest.NewServer(handler) + u, err := url.Parse(ts.URL) + require.NoError(t, err) + + ctx := context.Background() + client := NewClusterClient(u, defaultResponseTimeout, 1, nil) + summary, err := client.GetSummary(ctx) + + require.Equal(t, tt.expectedError, err) + require.Equal(t, tt.expectedValue, summary) + + ts.Close() + }) + } + +} + +func TestGetNodeMetrics(t *testing.T) { + var tests = []struct { + name string + responseCode int + responseBody string + expectedValue *Metrics + expectedError error + }{ + { + name: "Empty Body", + responseCode: 200, + responseBody: `{}`, + expectedValue: &Metrics{}, + expectedError: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // check the path + w.WriteHeader(tt.responseCode) + fmt.Fprintln(w, tt.responseBody) + }) + ts := httptest.NewServer(handler) + u, err := url.Parse(ts.URL) + require.NoError(t, err) + + ctx := context.Background() + client := NewClusterClient(u, defaultResponseTimeout, 1, nil) + m, err := client.GetNodeMetrics(ctx, "foo") + + require.Equal(t, tt.expectedError, err) + require.Equal(t, tt.expectedValue, m) + + ts.Close() + }) + } + +} + +func TestGetContainerMetrics(t *testing.T) { + var tests = []struct { + name string + responseCode int + responseBody string + expectedValue *Metrics + expectedError error + }{ + { + name: "204 No Contents", + responseCode: 204, + responseBody: ``, + expectedValue: &Metrics{}, + expectedError: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // check the path + w.WriteHeader(tt.responseCode) + fmt.Fprintln(w, tt.responseBody) + }) + ts := httptest.NewServer(handler) + u, err := url.Parse(ts.URL) + require.NoError(t, err) + + ctx := context.Background() + client := NewClusterClient(u, defaultResponseTimeout, 1, nil) + m, err := client.GetContainerMetrics(ctx, "foo", "bar") + + require.Equal(t, tt.expectedError, err) + require.Equal(t, tt.expectedValue, m) + + ts.Close() + }) + } + +} diff --git a/plugins/inputs/dcos/creds.go b/plugins/inputs/dcos/creds.go new file mode 100644 index 000000000..0178315bb --- /dev/null +++ b/plugins/inputs/dcos/creds.go @@ -0,0 +1,72 @@ +package dcos + +import ( + "context" + "crypto/rsa" + "fmt" + "io/ioutil" + "strings" + "time" + "unicode/utf8" +) + +const ( + // How long before expiration to renew token + relogDuration = 5 * time.Minute +) + +type Credentials interface { + Token(ctx context.Context, client Client) (string, error) + IsExpired() bool +} + +type ServiceAccount struct { + AccountID string + PrivateKey *rsa.PrivateKey + + auth *AuthToken +} + +type TokenCreds struct { + Path string +} + +type NullCreds struct { +} + +func (c *ServiceAccount) Token(ctx context.Context, client Client) (string, error) { + auth, err := client.Login(ctx, c) + if err != nil { + return "", err + } + c.auth = auth + return auth.Text, nil +} + +func (c *ServiceAccount) IsExpired() bool { + return c.auth.Text != "" || c.auth.Expire.Add(relogDuration).After(time.Now()) +} + +func (c *TokenCreds) Token(ctx context.Context, client Client) (string, error) { + octets, err := ioutil.ReadFile(c.Path) + if err != nil { + return "", fmt.Errorf("Error reading token file %q: %s", c.Path, err) + } + if !utf8.Valid(octets) { + return "", fmt.Errorf("Token file does not contain utf-8 encoded text: %s", c.Path) + } + token := strings.TrimSpace(string(octets)) + return token, nil +} + +func (c *TokenCreds) IsExpired() bool { + return true +} + +func (c *NullCreds) Token(ctx context.Context, client Client) (string, error) { + return "", nil +} + +func (c *NullCreds) IsExpired() bool { + return true +} diff --git a/plugins/inputs/dcos/dcos.go b/plugins/inputs/dcos/dcos.go new file mode 100644 index 000000000..91370b81f --- /dev/null +++ b/plugins/inputs/dcos/dcos.go @@ -0,0 +1,435 @@ +package dcos + +import ( + "context" + "io/ioutil" + "net/url" + "sort" + "strings" + "sync" + "time" + + jwt "github.com/dgrijalva/jwt-go" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" +) + +const ( + defaultMaxConnections = 10 + defaultResponseTimeout = 20 * time.Second +) + +var ( + nodeDimensions = []string{ + "hostname", + "path", + "interface", + } + containerDimensions = []string{ + "hostname", + "container_id", + "task_name", + } + appDimensions = []string{ + "hostname", + "container_id", + "task_name", + } +) + +type DCOS struct { + ClusterURL string `toml:"cluster_url"` + + ServiceAccountID string `toml:"service_account_id"` + ServiceAccountPrivateKey string + + TokenFile string + + NodeInclude []string + NodeExclude []string + ContainerInclude []string + ContainerExclude []string + AppInclude []string + AppExclude []string + + MaxConnections int + ResponseTimeout internal.Duration + + SSLCA string `toml:"ssl_ca"` + SSLCert string `toml:"ssl_cert"` + SSLKey string `toml:"ssl_key"` + InsecureSkipVerify bool `toml:"insecure_skip_verify"` + + client Client + creds Credentials + + initialized bool + nodeFilter filter.Filter + containerFilter filter.Filter + appFilter filter.Filter + taskNameFilter filter.Filter +} + +func (d *DCOS) Description() string { + return "Input plugin for DC/OS metrics" +} + +var sampleConfig = ` + ## The DC/OS cluster URL. + cluster_url = "https://dcos-ee-master-1" + + ## The ID of the service account. + service_account_id = "telegraf" + ## The private key file for the service account. + service_account_private_key = "/etc/telegraf/telegraf-sa-key.pem" + + ## Path containing login token. If set, will read on every gather. + # token_file = "/home/dcos/.dcos/token" + + ## In all filter options if both include and exclude are empty all items + ## will be collected. Arrays may contain glob patterns. + ## + ## Node IDs to collect metrics from. If a node is excluded, no metrics will + ## be collected for its containers or apps. + # node_include = [] + # node_exclude = [] + ## Container IDs to collect container metrics from. + # container_include = [] + # container_exclude = [] + ## Container IDs to collect app metrics from. + # app_include = [] + # app_exclude = [] + + ## Maximum concurrent connections to the cluster. + # max_connections = 10 + ## Maximum time to receive a response from cluster. + # response_timeout = "20s" + + ## Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ## If false, skip chain & host verification + # insecure_skip_verify = true + + ## Recommended filtering to reduce series cardinality. + # [inputs.dcos.tagdrop] + # path = ["/var/lib/mesos/slave/slaves/*"] +` + +func (d *DCOS) SampleConfig() string { + return sampleConfig +} + +func (d *DCOS) Gather(acc telegraf.Accumulator) error { + err := d.init() + if err != nil { + return err + } + + ctx := context.Background() + + token, err := d.creds.Token(ctx, d.client) + if err != nil { + return err + } + d.client.SetToken(token) + + summary, err := d.client.GetSummary(ctx) + if err != nil { + return err + } + + var wg sync.WaitGroup + for _, node := range summary.Slaves { + wg.Add(1) + go func(node string) { + defer wg.Done() + d.GatherNode(ctx, acc, summary.Cluster, node) + }(node.ID) + } + wg.Wait() + + return nil +} + +func (d *DCOS) GatherNode(ctx context.Context, acc telegraf.Accumulator, cluster, node string) { + if !d.nodeFilter.Match(node) { + return + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + m, err := d.client.GetNodeMetrics(ctx, node) + if err != nil { + acc.AddError(err) + return + } + d.addNodeMetrics(acc, cluster, m) + }() + + d.GatherContainers(ctx, acc, cluster, node) + wg.Wait() +} + +func (d *DCOS) GatherContainers(ctx context.Context, acc telegraf.Accumulator, cluster, node string) { + containers, err := d.client.GetContainers(ctx, node) + if err != nil { + acc.AddError(err) + return + } + + var wg sync.WaitGroup + for _, container := range containers { + if d.containerFilter.Match(container.ID) { + wg.Add(1) + go func(container string) { + defer wg.Done() + m, err := d.client.GetContainerMetrics(ctx, node, container) + if err != nil { + if err, ok := err.(APIError); ok && err.StatusCode == 404 { + return + } + acc.AddError(err) + return + } + d.addContainerMetrics(acc, cluster, m) + }(container.ID) + } + + if d.appFilter.Match(container.ID) { + wg.Add(1) + go func(container string) { + defer wg.Done() + m, err := d.client.GetAppMetrics(ctx, node, container) + if err != nil { + if err, ok := err.(APIError); ok && err.StatusCode == 404 { + return + } + acc.AddError(err) + return + } + d.addAppMetrics(acc, cluster, m) + }(container.ID) + } + } + wg.Wait() +} + +type point struct { + tags map[string]string + labels map[string]string + fields map[string]interface{} +} + +func (d *DCOS) createPoints(acc telegraf.Accumulator, m *Metrics) []*point { + points := make(map[string]*point) + for _, dp := range m.Datapoints { + fieldKey := strings.Replace(dp.Name, ".", "_", -1) + + tags := dp.Tags + if tags == nil { + tags = make(map[string]string) + } + + if dp.Unit == "bytes" && !strings.HasSuffix(fieldKey, "_bytes") { + fieldKey = fieldKey + "_bytes" + } + + if strings.HasPrefix(fieldKey, "dcos_metrics_module_") { + fieldKey = strings.TrimPrefix(fieldKey, "dcos_metrics_module_") + } + + tagset := make([]string, 0, len(tags)) + for k, v := range tags { + tagset = append(tagset, k+"="+v) + } + sort.Strings(tagset) + seriesParts := make([]string, 0, len(tagset)) + seriesParts = append(seriesParts, tagset...) + seriesKey := strings.Join(seriesParts, ",") + + p, ok := points[seriesKey] + if !ok { + p = &point{} + p.tags = tags + p.labels = make(map[string]string) + p.fields = make(map[string]interface{}) + points[seriesKey] = p + } + + if dp.Unit == "bytes" { + p.fields[fieldKey] = int64(dp.Value) + } else { + p.fields[fieldKey] = dp.Value + } + } + + results := make([]*point, 0, len(points)) + for _, p := range points { + for k, v := range m.Dimensions { + switch v := v.(type) { + case string: + p.tags[k] = v + case map[string]string: + if k == "labels" { + for k, v := range v { + p.labels[k] = v + } + } + } + } + results = append(results, p) + } + return results +} + +func (d *DCOS) addMetrics(acc telegraf.Accumulator, cluster, mname string, m *Metrics, tagDimensions []string) { + tm := time.Now() + + points := d.createPoints(acc, m) + + for _, p := range points { + tags := make(map[string]string) + tags["cluster"] = cluster + for _, tagkey := range tagDimensions { + v, ok := p.tags[tagkey] + if ok { + tags[tagkey] = v + } + } + for k, v := range p.labels { + tags[k] = v + } + + acc.AddFields(mname, p.fields, tags, tm) + } +} + +func (d *DCOS) addNodeMetrics(acc telegraf.Accumulator, cluster string, m *Metrics) { + d.addMetrics(acc, cluster, "dcos_node", m, nodeDimensions) +} + +func (d *DCOS) addContainerMetrics(acc telegraf.Accumulator, cluster string, m *Metrics) { + d.addMetrics(acc, cluster, "dcos_container", m, containerDimensions) +} + +func (d *DCOS) addAppMetrics(acc telegraf.Accumulator, cluster string, m *Metrics) { + d.addMetrics(acc, cluster, "dcos_app", m, appDimensions) +} + +func (d *DCOS) init() error { + if !d.initialized { + err := d.createFilters() + if err != nil { + return err + } + + if d.client == nil { + client, err := d.createClient() + if err != nil { + return err + } + d.client = client + } + + if d.creds == nil { + creds, err := d.createCredentials() + if err != nil { + return err + } + d.creds = creds + } + + d.initialized = true + } + return nil +} + +func (d *DCOS) createClient() (Client, error) { + tlsCfg, err := internal.GetTLSConfig( + d.SSLCert, d.SSLKey, d.SSLCA, d.InsecureSkipVerify) + if err != nil { + return nil, err + } + + url, err := url.Parse(d.ClusterURL) + if err != nil { + return nil, err + } + + client := NewClusterClient( + url, + d.ResponseTimeout.Duration, + d.MaxConnections, + tlsCfg, + ) + + return client, nil +} + +func (d *DCOS) createCredentials() (Credentials, error) { + if d.ServiceAccountID != "" && d.ServiceAccountPrivateKey != "" { + bs, err := ioutil.ReadFile(d.ServiceAccountPrivateKey) + if err != nil { + return nil, err + } + + privateKey, err := jwt.ParseRSAPrivateKeyFromPEM(bs) + if err != nil { + return nil, err + } + + creds := &ServiceAccount{ + AccountID: d.ServiceAccountID, + PrivateKey: privateKey, + } + return creds, nil + } else if d.TokenFile != "" { + creds := &TokenCreds{ + Path: d.TokenFile, + } + return creds, nil + } else { + creds := &NullCreds{} + return creds, nil + } +} + +func (d *DCOS) createFilters() error { + var err error + d.nodeFilter, err = filter.NewIncludeExcludeFilter( + d.NodeInclude, d.NodeExclude) + if err != nil { + return err + } + + d.containerFilter, err = filter.NewIncludeExcludeFilter( + d.ContainerInclude, d.ContainerExclude) + if err != nil { + return err + } + + d.appFilter, err = filter.NewIncludeExcludeFilter( + d.AppInclude, d.AppExclude) + if err != nil { + return err + } + + return nil +} + +func init() { + inputs.Add("dcos", func() telegraf.Input { + return &DCOS{ + MaxConnections: defaultMaxConnections, + ResponseTimeout: internal.Duration{ + Duration: defaultResponseTimeout, + }, + } + }) +} diff --git a/plugins/inputs/dcos/dcos_test.go b/plugins/inputs/dcos/dcos_test.go new file mode 100644 index 000000000..6a76f7b64 --- /dev/null +++ b/plugins/inputs/dcos/dcos_test.go @@ -0,0 +1,441 @@ +package dcos + +import ( + "context" + "fmt" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +type mockClient struct { + SetTokenF func(token string) + LoginF func(ctx context.Context, sa *ServiceAccount) (*AuthToken, error) + GetSummaryF func(ctx context.Context) (*Summary, error) + GetContainersF func(ctx context.Context, node string) ([]Container, error) + GetNodeMetricsF func(ctx context.Context, node string) (*Metrics, error) + GetContainerMetricsF func(ctx context.Context, node, container string) (*Metrics, error) + GetAppMetricsF func(ctx context.Context, node, container string) (*Metrics, error) +} + +func (c *mockClient) SetToken(token string) { + c.SetTokenF(token) +} + +func (c *mockClient) Login(ctx context.Context, sa *ServiceAccount) (*AuthToken, error) { + return c.LoginF(ctx, sa) +} + +func (c *mockClient) GetSummary(ctx context.Context) (*Summary, error) { + return c.GetSummaryF(ctx) +} + +func (c *mockClient) GetContainers(ctx context.Context, node string) ([]Container, error) { + return c.GetContainersF(ctx, node) +} + +func (c *mockClient) GetNodeMetrics(ctx context.Context, node string) (*Metrics, error) { + return c.GetNodeMetricsF(ctx, node) +} + +func (c *mockClient) GetContainerMetrics(ctx context.Context, node, container string) (*Metrics, error) { + return c.GetContainerMetricsF(ctx, node, container) +} + +func (c *mockClient) GetAppMetrics(ctx context.Context, node, container string) (*Metrics, error) { + return c.GetAppMetricsF(ctx, node, container) +} + +func TestAddNodeMetrics(t *testing.T) { + var tests = []struct { + name string + metrics *Metrics + check func(*testutil.Accumulator) []bool + }{ + { + name: "basic datapoint conversion", + metrics: &Metrics{ + Datapoints: []DataPoint{ + { + Name: "process.count", + Unit: "count", + Value: 42.0, + }, + }, + }, + check: func(acc *testutil.Accumulator) []bool { + return []bool{acc.HasPoint( + "dcos_node", + map[string]string{ + "cluster": "a", + }, + "process_count", 42.0, + )} + }, + }, + { + name: "path added as tag", + metrics: &Metrics{ + Datapoints: []DataPoint{ + { + Name: "filesystem.inode.free", + Tags: map[string]string{ + "path": "/var/lib", + }, + Unit: "count", + Value: 42.0, + }, + }, + }, + check: func(acc *testutil.Accumulator) []bool { + return []bool{acc.HasPoint( + "dcos_node", + map[string]string{ + "cluster": "a", + "path": "/var/lib", + }, + "filesystem_inode_free", 42.0, + )} + }, + }, + { + name: "interface added as tag", + metrics: &Metrics{ + Datapoints: []DataPoint{ + { + Name: "network.out.dropped", + Tags: map[string]string{ + "interface": "eth0", + }, + Unit: "count", + Value: 42.0, + }, + }, + }, + check: func(acc *testutil.Accumulator) []bool { + return []bool{acc.HasPoint( + "dcos_node", + map[string]string{ + "cluster": "a", + "interface": "eth0", + }, + "network_out_dropped", 42.0, + )} + }, + }, + { + name: "bytes unit appended to fieldkey", + metrics: &Metrics{ + Datapoints: []DataPoint{ + { + Name: "network.in", + Tags: map[string]string{ + "interface": "eth0", + }, + Unit: "bytes", + Value: 42.0, + }, + }, + }, + check: func(acc *testutil.Accumulator) []bool { + return []bool{acc.HasPoint( + "dcos_node", + map[string]string{ + "cluster": "a", + "interface": "eth0", + }, + "network_in_bytes", int64(42), + )} + }, + }, + { + name: "dimensions added as tags", + metrics: &Metrics{ + Datapoints: []DataPoint{ + { + Name: "process.count", + Tags: map[string]string{}, + Unit: "count", + Value: 42.0, + }, + { + Name: "memory.total", + Tags: map[string]string{}, + Unit: "bytes", + Value: 42, + }, + }, + Dimensions: map[string]interface{}{ + "cluster_id": "c0760bbd-9e9d-434b-bd4a-39c7cdef8a63", + "hostname": "192.168.122.18", + "mesos_id": "2dfbbd28-29d2-411d-92c4-e2f84c38688e-S1", + }, + }, + check: func(acc *testutil.Accumulator) []bool { + return []bool{ + acc.HasPoint( + "dcos_node", + map[string]string{ + "cluster": "a", + "hostname": "192.168.122.18", + }, + "process_count", 42.0), + acc.HasPoint( + "dcos_node", + map[string]string{ + "cluster": "a", + "hostname": "192.168.122.18", + }, + "memory_total_bytes", int64(42)), + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var acc testutil.Accumulator + dcos := &DCOS{} + dcos.addNodeMetrics(&acc, "a", tt.metrics) + for i, ok := range tt.check(&acc) { + require.True(t, ok, fmt.Sprintf("Index was not true: %d", i)) + } + }) + } + +} + +func TestAddContainerMetrics(t *testing.T) { + var tests = []struct { + name string + metrics *Metrics + check func(*testutil.Accumulator) []bool + }{ + { + name: "container", + metrics: &Metrics{ + Datapoints: []DataPoint{ + { + Name: "net.rx.errors", + Tags: map[string]string{ + "container_id": "f25c457b-fceb-44f0-8f5b-38be34cbb6fb", + "executor_id": "telegraf.192fb45f-cc0c-11e7-af48-ea183c0b541a", + "executor_name": "Command Executor (Task: telegraf.192fb45f-cc0c-11e7-af48-ea183c0b541a) (Command: NO EXECUTABLE)", + "framework_id": "ab2f3a8b-06db-4e8c-95b6-fb1940874a30-0001", + "source": "telegraf.192fb45f-cc0c-11e7-af48-ea183c0b541a", + }, + Unit: "count", + Value: 42.0, + }, + }, + Dimensions: map[string]interface{}{ + "cluster_id": "c0760bbd-9e9d-434b-bd4a-39c7cdef8a63", + "container_id": "f25c457b-fceb-44f0-8f5b-38be34cbb6fb", + "executor_id": "telegraf.192fb45f-cc0c-11e7-af48-ea183c0b541a", + "framework_id": "ab2f3a8b-06db-4e8c-95b6-fb1940874a30-0001", + "framework_name": "marathon", + "framework_principal": "dcos_marathon", + "framework_role": "slave_public", + "hostname": "192.168.122.18", + "labels": map[string]string{ + "DCOS_SPACE": "/telegraf", + }, + "mesos_id": "2dfbbd28-29d2-411d-92c4-e2f84c38688e-S1", + "task_id": "telegraf.192fb45f-cc0c-11e7-af48-ea183c0b541a", + "task_name": "telegraf", + }, + }, + check: func(acc *testutil.Accumulator) []bool { + return []bool{ + acc.HasPoint( + "dcos_container", + map[string]string{ + "cluster": "a", + "container_id": "f25c457b-fceb-44f0-8f5b-38be34cbb6fb", + "hostname": "192.168.122.18", + "task_name": "telegraf", + "DCOS_SPACE": "/telegraf", + }, + "net_rx_errors", + 42.0, + ), + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var acc testutil.Accumulator + dcos := &DCOS{} + dcos.addContainerMetrics(&acc, "a", tt.metrics) + for i, ok := range tt.check(&acc) { + require.True(t, ok, fmt.Sprintf("Index was not true: %d", i)) + } + }) + } +} + +func TestAddAppMetrics(t *testing.T) { + var tests = []struct { + name string + metrics *Metrics + check func(*testutil.Accumulator) []bool + }{ + { + name: "tags are optional", + metrics: &Metrics{ + Datapoints: []DataPoint{ + { + Name: "dcos.metrics.module.container_throttled_bytes_per_sec", + Unit: "", + Value: 42.0, + }, + }, + }, + check: func(acc *testutil.Accumulator) []bool { + return []bool{ + acc.HasPoint( + "dcos_app", + map[string]string{ + "cluster": "a", + }, + "container_throttled_bytes_per_sec", 42.0, + ), + } + }, + }, + { + name: "dimensions are tagged", + metrics: &Metrics{ + Datapoints: []DataPoint{ + { + Name: "dcos.metrics.module.container_throttled_bytes_per_sec", + Unit: "", + Value: 42.0, + }, + }, + Dimensions: map[string]interface{}{ + "cluster_id": "c0760bbd-9e9d-434b-bd4a-39c7cdef8a63", + "container_id": "02d31175-1c01-4459-8520-ef8b1339bc52", + "hostname": "192.168.122.18", + "mesos_id": "2dfbbd28-29d2-411d-92c4-e2f84c38688e-S1", + }, + }, + check: func(acc *testutil.Accumulator) []bool { + return []bool{ + acc.HasPoint( + "dcos_app", + map[string]string{ + "cluster": "a", + "container_id": "02d31175-1c01-4459-8520-ef8b1339bc52", + "hostname": "192.168.122.18", + }, + "container_throttled_bytes_per_sec", 42.0, + ), + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var acc testutil.Accumulator + dcos := &DCOS{} + dcos.addAppMetrics(&acc, "a", tt.metrics) + for i, ok := range tt.check(&acc) { + require.True(t, ok, fmt.Sprintf("Index was not true: %d", i)) + } + }) + } +} + +func TestGatherFilterNode(t *testing.T) { + var tests = []struct { + name string + nodeInclude []string + nodeExclude []string + client Client + check func(*testutil.Accumulator) []bool + }{ + { + name: "cluster without nodes has no metrics", + client: &mockClient{ + SetTokenF: func(token string) {}, + GetSummaryF: func(ctx context.Context) (*Summary, error) { + return &Summary{ + Cluster: "a", + Slaves: []Slave{}, + }, nil + }, + }, + check: func(acc *testutil.Accumulator) []bool { + return []bool{ + acc.NMetrics() == 0, + } + }, + }, + { + name: "node include", + nodeInclude: []string{"x"}, + client: &mockClient{ + SetTokenF: func(token string) {}, + GetSummaryF: func(ctx context.Context) (*Summary, error) { + return &Summary{ + Cluster: "a", + Slaves: []Slave{ + Slave{ID: "x"}, + Slave{ID: "y"}, + }, + }, nil + }, + GetContainersF: func(ctx context.Context, node string) ([]Container, error) { + return []Container{}, nil + }, + GetNodeMetricsF: func(ctx context.Context, node string) (*Metrics, error) { + return &Metrics{ + Datapoints: []DataPoint{ + { + Name: "value", + Value: 42.0, + }, + }, + Dimensions: map[string]interface{}{ + "hostname": "x", + }, + }, nil + }, + }, + check: func(acc *testutil.Accumulator) []bool { + return []bool{ + acc.HasPoint( + "dcos_node", + map[string]string{ + "cluster": "a", + "hostname": "x", + }, + "value", 42.0, + ), + acc.NMetrics() == 1, + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var acc testutil.Accumulator + dcos := &DCOS{ + NodeInclude: tt.nodeInclude, + NodeExclude: tt.nodeExclude, + client: tt.client, + } + err := dcos.Gather(&acc) + require.NoError(t, err) + for i, ok := range tt.check(&acc) { + require.True(t, ok, fmt.Sprintf("Index was not true: %d", i)) + } + }) + } +}