Added metrics for Mesos slaves and tasks running on them.

closes #1356
This commit is contained in:
Łukasz Harasimowicz 2016-06-09 12:33:14 +02:00 committed by Cameron Sparr
parent 29ea433763
commit ee240a5599
3 changed files with 800 additions and 218 deletions

View File

@ -1,6 +1,6 @@
# Mesos Input Plugin # Mesos Input Plugin
This input plugin gathers metrics from Mesos (*currently only Mesos masters*). This input plugin gathers metrics from Mesos.
For more information, please check the [Mesos Observability Metrics](http://mesos.apache.org/documentation/latest/monitoring/) page. For more information, please check the [Mesos Observability Metrics](http://mesos.apache.org/documentation/latest/monitoring/) page.
### Configuration: ### Configuration:
@ -8,14 +8,41 @@ For more information, please check the [Mesos Observability Metrics](http://meso
```toml ```toml
# Telegraf plugin for gathering metrics from N Mesos masters # Telegraf plugin for gathering metrics from N Mesos masters
[[inputs.mesos]] [[inputs.mesos]]
# Timeout, in ms. ## Timeout, in ms.
timeout = 100 timeout = 100
# A list of Mesos masters, default value is localhost:5050. ## A list of Mesos masters.
masters = ["localhost:5050"] masters = ["localhost:5050"]
# Metrics groups to be collected, by default, all enabled. ## Master metrics groups to be collected, by default, all enabled.
master_collections = ["resources","master","system","slaves","frameworks","messages","evqueue","registrar"] master_collections = [
"resources",
"master",
"system",
"agents",
"frameworks",
"tasks",
"messages",
"evqueue",
"registrar",
]
## A list of Mesos slaves, default is []
# slaves = []
## Slave metrics groups to be collected, by default, all enabled.
# slave_collections = [
# "resources",
# "agent",
# "system",
# "executors",
# "tasks",
# "messages",
# ]
## Include mesos tasks statistics, default is false
# slave_tasks = true
``` ```
By dafault this plugin is not configured to gather metrics from mesos. Since mesos cluster can be deployed in numerous ways it does not provide ane default
values in that matter. User needs to specify master/slave nodes this plugin will gather metrics from. Additionally by enabling `slave_tasks` will allow
agthering metrics from takss runing on specified slaves (this options is disabled by default).
### Measurements & Fields: ### Measurements & Fields:
Mesos master metric groups Mesos master metric groups
@ -33,6 +60,12 @@ Mesos master metric groups
- master/disk_revocable_percent - master/disk_revocable_percent
- master/disk_revocable_total - master/disk_revocable_total
- master/disk_revocable_used - master/disk_revocable_used
- master/gpus_percent
- master/gpus_used
- master/gpus_total
- master/gpus_revocable_percent
- master/gpus_revocable_total
- master/gpus_revocable_used
- master/mem_percent - master/mem_percent
- master/mem_used - master/mem_used
- master/mem_total - master/mem_total
@ -136,17 +169,111 @@ Mesos master metric groups
- registrar/state_store_ms/p999 - registrar/state_store_ms/p999
- registrar/state_store_ms/p9999 - registrar/state_store_ms/p9999
Mesos slave metric groups
- resources
- slave/cpus_percent
- slave/cpus_used
- slave/cpus_total
- slave/cpus_revocable_percent
- slave/cpus_revocable_total
- slave/cpus_revocable_used
- slave/disk_percent
- slave/disk_used
- slave/disk_total
- slave/disk_revocable_percent
- slave/disk_revocable_total
- slave/disk_revocable_used
- slave/gpus_percent
- slave/gpus_used
- slave/gpus_total,
- slave/gpus_revocable_percent
- slave/gpus_revocable_total
- slave/gpus_revocable_used
- slave/mem_percent
- slave/mem_used
- slave/mem_total
- slave/mem_revocable_percent
- slave/mem_revocable_total
- slave/mem_revocable_used
- agent
- slave/registered
- slave/uptime_secs
- system
- system/cpus_total
- system/load_15min
- system/load_5min
- system/load_1min
- system/mem_free_bytes
- system/mem_total_bytes
- executors
- containerizer/mesos/container_destroy_errors
- slave/container_launch_errors
- slave/executors_preempted
- slave/frameworks_active
- slave/executor_directory_max_allowed_age_secs
- slave/executors_registering
- slave/executors_running
- slave/executors_terminated
- slave/executors_terminating
- slave/recovery_errors
- tasks
- slave/tasks_failed
- slave/tasks_finished
- slave/tasks_killed
- slave/tasks_lost
- slave/tasks_running
- slave/tasks_staging
- slave/tasks_starting
- messages
- slave/invalid_framework_messages
- slave/invalid_status_updates
- slave/valid_framework_messages
- slave/valid_status_updates
Mesos tasks metric groups
- executor_id
- executor_name
- framework_id
- source
- statistics (all metrics below will have `statistics_` prefix included in their names
- cpus_limit
- cpus_system_time_secs
- cpus_user_time_secs
- mem_anon_bytes
- mem_cache_bytes
- mem_critical_pressure_counter
- mem_file_bytes
- mem_limit_bytes
- mem_low_pressure_counter
- mem_mapped_file_bytes
- mem_medium_pressure_counter
- mem_rss_bytes
- mem_swap_bytes
- mem_total_bytes
- mem_total_memsw_bytes
- mem_unevictable_bytes
- timestamp
### Tags: ### Tags:
- All measurements have the following tags: - All master/slave measurements have the following tags:
- server
- role (master/slave)
- Tasks measurements have the following tags:
- server - server
### Example Output: ### Example Output:
``` ```
$ telegraf -config ~/mesos.conf -input-filter mesos -test $ telegraf -config ~/mesos.conf -input-filter mesos -test
* Plugin: mesos, Collection 1 * Plugin: mesos, Collection 1
mesos,server=172.17.8.101 allocator/event_queue_dispatches=0,master/cpus_percent=0, mesos,host=172.17.8.102,server=172.17.8.101 allocator/event_queue_dispatches=0,master/cpus_percent=0,
master/cpus_revocable_percent=0,master/cpus_revocable_total=0, master/cpus_revocable_percent=0,master/cpus_revocable_total=0,
master/cpus_revocable_used=0,master/cpus_total=2, master/cpus_revocable_used=0,master/cpus_total=2,
master/cpus_used=0,master/disk_percent=0,master/disk_revocable_percent=0, master/cpus_used=0,master/disk_percent=0,master/disk_revocable_percent=0,
@ -163,3 +290,16 @@ master/mem_revocable_used=0,master/mem_total=1002,
master/mem_used=0,master/messages_authenticate=0, master/mem_used=0,master/messages_authenticate=0,
master/messages_deactivate_framework=0 ... master/messages_deactivate_framework=0 ...
``` ```
Meoso tasks metrics (if enabled):
```
mesos-tasks,host=172.17.8.102,server=172.17.8.101,task_id=hello-world.e4b5b497-2ccd-11e6-a659-0242fb222ce2
statistics_cpus_limit=0.2,statistics_cpus_system_time_secs=142.49,statistics_cpus_user_time_secs=388.14,
statistics_mem_anon_bytes=359129088,statistics_mem_cache_bytes=3964928,
statistics_mem_critical_pressure_counter=0,statistics_mem_file_bytes=3964928,
statistics_mem_limit_bytes=767557632,statistics_mem_low_pressure_counter=0,
statistics_mem_mapped_file_bytes=114688,statistics_mem_medium_pressure_counter=0,
statistics_mem_rss_bytes=359129088,statistics_mem_swap_bytes=0,statistics_mem_total_bytes=363094016,
statistics_mem_total_memsw_bytes=363094016,statistics_mem_unevictable_bytes=0,
statistics_timestamp=1465486052.70525 1465486053052811792...
```

View File

@ -17,33 +17,57 @@ import (
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
) )
type Role string
const (
MASTER Role = "master"
SLAVE = "slave"
)
type Mesos struct { type Mesos struct {
Timeout int Timeout int
Masters []string Masters []string
MasterCols []string `toml:"master_collections"` MasterCols []string `toml:"master_collections"`
Slaves []string
SlaveCols []string `toml:"slave_collections"`
SlaveTasks bool
} }
var defaultMetrics = []string{ var allMetrics = map[Role][]string{
"resources", "master", "system", "slaves", "frameworks", MASTER: []string{"resources", "master", "system", "agents", "frameworks", "tasks", "messages", "evqueue", "registrar"},
"tasks", "messages", "evqueue", "messages", "registrar", SLAVE: []string{"resources", "agent", "system", "executors", "tasks", "messages"},
} }
var sampleConfig = ` var sampleConfig = `
# Timeout, in ms. ## Timeout, in ms.
timeout = 100 timeout = 100
# A list of Mesos masters, default value is localhost:5050. ## A list of Mesos masters.
masters = ["localhost:5050"] masters = ["localhost:5050"]
# Metrics groups to be collected, by default, all enabled. ## Master metrics groups to be collected, by default, all enabled.
master_collections = [ master_collections = [
"resources", "resources",
"master", "master",
"system", "system",
"slaves", "agents",
"frameworks", "frameworks",
"tasks",
"messages", "messages",
"evqueue", "evqueue",
"registrar", "registrar",
] ]
## A list of Mesos slaves, default is []
# slaves = []
## Slave metrics groups to be collected, by default, all enabled.
# slave_collections = [
# "resources",
# "agent",
# "system",
# "executors",
# "tasks",
# "messages",
# ]
## Include mesos tasks statistics, default is false
# slave_tasks = true
` `
// SampleConfig returns a sample configuration block // SampleConfig returns a sample configuration block
@ -56,21 +80,54 @@ func (m *Mesos) Description() string {
return "Telegraf plugin for gathering metrics from N Mesos masters" return "Telegraf plugin for gathering metrics from N Mesos masters"
} }
func (m *Mesos) SetDefaults() {
if len(m.MasterCols) == 0 {
m.MasterCols = allMetrics[MASTER]
}
if len(m.SlaveCols) == 0 {
m.SlaveCols = allMetrics[SLAVE]
}
if m.Timeout == 0 {
log.Println("[mesos] Missing timeout value, setting default value (100ms)")
m.Timeout = 100
}
}
// Gather() metrics from given list of Mesos Masters // Gather() metrics from given list of Mesos Masters
func (m *Mesos) Gather(acc telegraf.Accumulator) error { func (m *Mesos) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup var wg sync.WaitGroup
var errorChannel chan error var errorChannel chan error
if len(m.Masters) == 0 { m.SetDefaults()
m.Masters = []string{"localhost:5050"}
}
errorChannel = make(chan error, len(m.Masters)*2) errorChannel = make(chan error, len(m.Masters)+2*len(m.Slaves))
for _, v := range m.Masters { for _, v := range m.Masters {
wg.Add(1) wg.Add(1)
go func(c string) { go func(c string) {
errorChannel <- m.gatherMetrics(c, acc) errorChannel <- m.gatherMainMetrics(c, ":5050", MASTER, acc)
wg.Done()
return
}(v)
}
for _, v := range m.Slaves {
wg.Add(1)
go func(c string) {
errorChannel <- m.gatherMainMetrics(c, ":5051", MASTER, acc)
wg.Done()
return
}(v)
if !m.SlaveTasks {
continue
}
wg.Add(1)
go func(c string) {
errorChannel <- m.gatherSlaveTaskMetrics(c, ":5051", acc)
wg.Done() wg.Done()
return return
}(v) }(v)
@ -94,7 +151,7 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error {
} }
// metricsDiff() returns set names for removal // metricsDiff() returns set names for removal
func metricsDiff(w []string) []string { func metricsDiff(role Role, w []string) []string {
b := []string{} b := []string{}
s := make(map[string]bool) s := make(map[string]bool)
@ -106,7 +163,7 @@ func metricsDiff(w []string) []string {
s[v] = true s[v] = true
} }
for _, d := range defaultMetrics { for _, d := range allMetrics[role] {
if _, ok := s[d]; !ok { if _, ok := s[d]; !ok {
b = append(b, d) b = append(b, d)
} }
@ -116,156 +173,239 @@ func metricsDiff(w []string) []string {
} }
// masterBlocks serves as kind of metrics registry groupping them in sets // masterBlocks serves as kind of metrics registry groupping them in sets
func masterBlocks(g string) []string { func getMetrics(role Role, group string) []string {
var m map[string][]string var m map[string][]string
m = make(map[string][]string) m = make(map[string][]string)
m["resources"] = []string{ if role == MASTER {
"master/cpus_percent", m["resources"] = []string{
"master/cpus_used", "master/cpus_percent",
"master/cpus_total", "master/cpus_used",
"master/cpus_revocable_percent", "master/cpus_total",
"master/cpus_revocable_total", "master/cpus_revocable_percent",
"master/cpus_revocable_used", "master/cpus_revocable_total",
"master/disk_percent", "master/cpus_revocable_used",
"master/disk_used", "master/disk_percent",
"master/disk_total", "master/disk_used",
"master/disk_revocable_percent", "master/disk_total",
"master/disk_revocable_total", "master/disk_revocable_percent",
"master/disk_revocable_used", "master/disk_revocable_total",
"master/mem_percent", "master/disk_revocable_used",
"master/mem_used", "master/gpus_percent",
"master/mem_total", "master/gpus_used",
"master/mem_revocable_percent", "master/gpus_total",
"master/mem_revocable_total", "master/gpus_revocable_percent",
"master/mem_revocable_used", "master/gpus_revocable_total",
"master/gpus_revocable_used",
"master/mem_percent",
"master/mem_used",
"master/mem_total",
"master/mem_revocable_percent",
"master/mem_revocable_total",
"master/mem_revocable_used",
}
m["master"] = []string{
"master/elected",
"master/uptime_secs",
}
m["system"] = []string{
"system/cpus_total",
"system/load_15min",
"system/load_5min",
"system/load_1min",
"system/mem_free_bytes",
"system/mem_total_bytes",
}
m["agents"] = []string{
"master/slave_registrations",
"master/slave_removals",
"master/slave_reregistrations",
"master/slave_shutdowns_scheduled",
"master/slave_shutdowns_canceled",
"master/slave_shutdowns_completed",
"master/slaves_active",
"master/slaves_connected",
"master/slaves_disconnected",
"master/slaves_inactive",
}
m["frameworks"] = []string{
"master/frameworks_active",
"master/frameworks_connected",
"master/frameworks_disconnected",
"master/frameworks_inactive",
"master/outstanding_offers",
}
m["tasks"] = []string{
"master/tasks_error",
"master/tasks_failed",
"master/tasks_finished",
"master/tasks_killed",
"master/tasks_lost",
"master/tasks_running",
"master/tasks_staging",
"master/tasks_starting",
}
m["messages"] = []string{
"master/invalid_executor_to_framework_messages",
"master/invalid_framework_to_executor_messages",
"master/invalid_status_update_acknowledgements",
"master/invalid_status_updates",
"master/dropped_messages",
"master/messages_authenticate",
"master/messages_deactivate_framework",
"master/messages_decline_offers",
"master/messages_executor_to_framework",
"master/messages_exited_executor",
"master/messages_framework_to_executor",
"master/messages_kill_task",
"master/messages_launch_tasks",
"master/messages_reconcile_tasks",
"master/messages_register_framework",
"master/messages_register_slave",
"master/messages_reregister_framework",
"master/messages_reregister_slave",
"master/messages_resource_request",
"master/messages_revive_offers",
"master/messages_status_update",
"master/messages_status_update_acknowledgement",
"master/messages_unregister_framework",
"master/messages_unregister_slave",
"master/messages_update_slave",
"master/recovery_slave_removals",
"master/slave_removals/reason_registered",
"master/slave_removals/reason_unhealthy",
"master/slave_removals/reason_unregistered",
"master/valid_framework_to_executor_messages",
"master/valid_status_update_acknowledgements",
"master/valid_status_updates",
"master/task_lost/source_master/reason_invalid_offers",
"master/task_lost/source_master/reason_slave_removed",
"master/task_lost/source_slave/reason_executor_terminated",
"master/valid_executor_to_framework_messages",
}
m["evqueue"] = []string{
"master/event_queue_dispatches",
"master/event_queue_http_requests",
"master/event_queue_messages",
}
m["registrar"] = []string{
"registrar/state_fetch_ms",
"registrar/state_store_ms",
"registrar/state_store_ms/max",
"registrar/state_store_ms/min",
"registrar/state_store_ms/p50",
"registrar/state_store_ms/p90",
"registrar/state_store_ms/p95",
"registrar/state_store_ms/p99",
"registrar/state_store_ms/p999",
"registrar/state_store_ms/p9999",
}
} else if role == SLAVE {
m["resources"] = []string{
"slave/cpus_percent",
"slave/cpus_used",
"slave/cpus_total",
"slave/cpus_revocable_percent",
"slave/cpus_revocable_total",
"slave/cpus_revocable_used",
"slave/disk_percent",
"slave/disk_used",
"slave/disk_total",
"slave/disk_revocable_percent",
"slave/disk_revocable_total",
"slave/disk_revocable_used",
"slave/gpus_percent",
"slave/gpus_used",
"slave/gpus_total",
"slave/gpus_revocable_percent",
"slave/gpus_revocable_total",
"slave/gpus_revocable_used",
"slave/mem_percent",
"slave/mem_used",
"slave/mem_total",
"slave/mem_revocable_percent",
"slave/mem_revocable_total",
"slave/mem_revocable_used",
}
m["agent"] = []string{
"slave/registered",
"slave/uptime_secs",
}
m["system"] = []string{
"system/cpus_total",
"system/load_15min",
"system/load_5min",
"system/load_1min",
"system/mem_free_bytes",
"system/mem_total_bytes",
}
m["executors"] = []string{
"containerizer/mesos/container_destroy_errors",
"slave/container_launch_errors",
"slave/executors_preempted",
"slave/frameworks_active",
"slave/executor_directory_max_allowed_age_secs",
"slave/executors_registering",
"slave/executors_running",
"slave/executors_terminated",
"slave/executors_terminating",
"slave/recovery_errors",
}
m["tasks"] = []string{
"slave/tasks_failed",
"slave/tasks_finished",
"slave/tasks_killed",
"slave/tasks_lost",
"slave/tasks_running",
"slave/tasks_staging",
"slave/tasks_starting",
}
m["messages"] = []string{
"slave/invalid_framework_messages",
"slave/invalid_status_updates",
"slave/valid_framework_messages",
"slave/valid_status_updates",
}
} }
m["master"] = []string{ ret, ok := m[group]
"master/elected",
"master/uptime_secs",
}
m["system"] = []string{
"system/cpus_total",
"system/load_15min",
"system/load_5min",
"system/load_1min",
"system/mem_free_bytes",
"system/mem_total_bytes",
}
m["slaves"] = []string{
"master/slave_registrations",
"master/slave_removals",
"master/slave_reregistrations",
"master/slave_shutdowns_scheduled",
"master/slave_shutdowns_canceled",
"master/slave_shutdowns_completed",
"master/slaves_active",
"master/slaves_connected",
"master/slaves_disconnected",
"master/slaves_inactive",
}
m["frameworks"] = []string{
"master/frameworks_active",
"master/frameworks_connected",
"master/frameworks_disconnected",
"master/frameworks_inactive",
"master/outstanding_offers",
}
m["tasks"] = []string{
"master/tasks_error",
"master/tasks_failed",
"master/tasks_finished",
"master/tasks_killed",
"master/tasks_lost",
"master/tasks_running",
"master/tasks_staging",
"master/tasks_starting",
}
m["messages"] = []string{
"master/invalid_executor_to_framework_messages",
"master/invalid_framework_to_executor_messages",
"master/invalid_status_update_acknowledgements",
"master/invalid_status_updates",
"master/dropped_messages",
"master/messages_authenticate",
"master/messages_deactivate_framework",
"master/messages_decline_offers",
"master/messages_executor_to_framework",
"master/messages_exited_executor",
"master/messages_framework_to_executor",
"master/messages_kill_task",
"master/messages_launch_tasks",
"master/messages_reconcile_tasks",
"master/messages_register_framework",
"master/messages_register_slave",
"master/messages_reregister_framework",
"master/messages_reregister_slave",
"master/messages_resource_request",
"master/messages_revive_offers",
"master/messages_status_update",
"master/messages_status_update_acknowledgement",
"master/messages_unregister_framework",
"master/messages_unregister_slave",
"master/messages_update_slave",
"master/recovery_slave_removals",
"master/slave_removals/reason_registered",
"master/slave_removals/reason_unhealthy",
"master/slave_removals/reason_unregistered",
"master/valid_framework_to_executor_messages",
"master/valid_status_update_acknowledgements",
"master/valid_status_updates",
"master/task_lost/source_master/reason_invalid_offers",
"master/task_lost/source_master/reason_slave_removed",
"master/task_lost/source_slave/reason_executor_terminated",
"master/valid_executor_to_framework_messages",
}
m["evqueue"] = []string{
"master/event_queue_dispatches",
"master/event_queue_http_requests",
"master/event_queue_messages",
}
m["registrar"] = []string{
"registrar/state_fetch_ms",
"registrar/state_store_ms",
"registrar/state_store_ms/max",
"registrar/state_store_ms/min",
"registrar/state_store_ms/p50",
"registrar/state_store_ms/p90",
"registrar/state_store_ms/p95",
"registrar/state_store_ms/p99",
"registrar/state_store_ms/p999",
"registrar/state_store_ms/p9999",
}
ret, ok := m[g]
if !ok { if !ok {
log.Println("[mesos] Unkown metrics group: ", g) log.Printf("[mesos] Unkown %s metrics group: %s\n", role, group)
return []string{} return []string{}
} }
return ret return ret
} }
// removeGroup(), remove unwanted sets func (m *Mesos) filterMetrics(role Role, metrics *map[string]interface{}) {
func (m *Mesos) removeGroup(j *map[string]interface{}) {
var ok bool var ok bool
var selectedMetrics []string
b := metricsDiff(m.MasterCols) if role == MASTER {
selectedMetrics = m.MasterCols
} else if role == SLAVE {
selectedMetrics = m.SlaveCols
}
for _, k := range b { for _, k := range metricsDiff(role, selectedMetrics) {
for _, v := range masterBlocks(k) { for _, v := range getMetrics(role, k) {
if _, ok = (*j)[v]; ok { if _, ok = (*metrics)[v]; ok {
delete((*j), v) delete((*metrics), v)
} }
} }
} }
@ -280,23 +420,66 @@ var client = &http.Client{
Timeout: time.Duration(4 * time.Second), Timeout: time.Duration(4 * time.Second),
} }
// This should not belong to the object func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc telegraf.Accumulator) error {
func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error { var metrics []map[string]interface{}
var jsonOut map[string]interface{}
host, _, err := net.SplitHostPort(a) host, _, err := net.SplitHostPort(address)
if err != nil { if err != nil {
host = a host = address
a = a + ":5050" address = address + defaultPort
} }
tags := map[string]string{ tags := map[string]string{
"server": host, "server": host,
} }
if m.Timeout == 0 { ts := strconv.Itoa(m.Timeout) + "ms"
log.Println("[mesos] Missing timeout value, setting default value (100ms)")
m.Timeout = 100 resp, err := client.Get("http://" + address + "/monitor/statistics?timeout=" + ts)
if err != nil {
return err
}
data, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return err
}
if err = json.Unmarshal([]byte(data), &metrics); err != nil {
return errors.New("Error decoding JSON response")
}
for _, task := range metrics {
tags["task_id"] = task["executor_id"].(string)
jf := jsonparser.JSONFlattener{}
err = jf.FlattenJSON("", task)
if err != nil {
return err
}
acc.AddFields("mesos-tasks", jf.Fields, tags)
}
return nil
}
// This should not belong to the object
func (m *Mesos) gatherMainMetrics(a string, defaultPort string, role Role, acc telegraf.Accumulator) error {
var jsonOut map[string]interface{}
host, _, err := net.SplitHostPort(a)
if err != nil {
host = a
a = a + defaultPort
}
tags := map[string]string{
"server": host,
"role": string(role),
} }
ts := strconv.Itoa(m.Timeout) + "ms" ts := strconv.Itoa(m.Timeout) + "ms"
@ -317,7 +500,7 @@ func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error {
return errors.New("Error decoding JSON response") return errors.New("Error decoding JSON response")
} }
m.removeGroup(&jsonOut) m.filterMetrics(role, &jsonOut)
jf := jsonparser.JSONFlattener{} jf := jsonparser.JSONFlattener{}

View File

@ -2,70 +2,275 @@ package mesos
import ( import (
"encoding/json" "encoding/json"
"fmt"
"math/rand" "math/rand"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os" "os"
"testing" "testing"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
var mesosMetrics map[string]interface{} var masterMetrics map[string]interface{}
var ts *httptest.Server var masterTestServer *httptest.Server
var slaveMetrics map[string]interface{}
var slaveTaskMetrics map[string]interface{}
var slaveTestServer *httptest.Server
func randUUID() string {
b := make([]byte, 16)
rand.Read(b)
return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
}
func generateMetrics() { func generateMetrics() {
mesosMetrics = make(map[string]interface{}) masterMetrics = make(map[string]interface{})
metricNames := []string{"master/cpus_percent", "master/cpus_used", "master/cpus_total", metricNames := []string{
"master/cpus_revocable_percent", "master/cpus_revocable_total", "master/cpus_revocable_used", // resources
"master/disk_percent", "master/disk_used", "master/disk_total", "master/disk_revocable_percent", "master/cpus_percent",
"master/disk_revocable_total", "master/disk_revocable_used", "master/mem_percent", "master/cpus_used",
"master/mem_used", "master/mem_total", "master/mem_revocable_percent", "master/mem_revocable_total", "master/cpus_total",
"master/mem_revocable_used", "master/elected", "master/uptime_secs", "system/cpus_total", "master/cpus_revocable_percent",
"system/load_15min", "system/load_5min", "system/load_1min", "system/mem_free_bytes", "master/cpus_revocable_total",
"system/mem_total_bytes", "master/slave_registrations", "master/slave_removals", "master/cpus_revocable_used",
"master/slave_reregistrations", "master/slave_shutdowns_scheduled", "master/slave_shutdowns_canceled", "master/disk_percent",
"master/slave_shutdowns_completed", "master/slaves_active", "master/slaves_connected", "master/disk_used",
"master/slaves_disconnected", "master/slaves_inactive", "master/frameworks_active", "master/disk_total",
"master/frameworks_connected", "master/frameworks_disconnected", "master/frameworks_inactive", "master/disk_revocable_percent",
"master/outstanding_offers", "master/tasks_error", "master/tasks_failed", "master/tasks_finished", "master/disk_revocable_total",
"master/tasks_killed", "master/tasks_lost", "master/tasks_running", "master/tasks_staging", "master/disk_revocable_used",
"master/tasks_starting", "master/invalid_executor_to_framework_messages", "master/invalid_framework_to_executor_messages", "master/gpus_percent",
"master/invalid_status_update_acknowledgements", "master/invalid_status_updates", "master/gpus_used",
"master/dropped_messages", "master/messages_authenticate", "master/messages_deactivate_framework", "master/gpus_total",
"master/messages_decline_offers", "master/messages_executor_to_framework", "master/messages_exited_executor", "master/gpus_revocable_percent",
"master/messages_framework_to_executor", "master/messages_kill_task", "master/messages_launch_tasks", "master/gpus_revocable_total",
"master/messages_reconcile_tasks", "master/messages_register_framework", "master/messages_register_slave", "master/gpus_revocable_used",
"master/messages_reregister_framework", "master/messages_reregister_slave", "master/messages_resource_request", "master/mem_percent",
"master/messages_revive_offers", "master/messages_status_update", "master/messages_status_update_acknowledgement", "master/mem_used",
"master/messages_unregister_framework", "master/messages_unregister_slave", "master/messages_update_slave", "master/mem_total",
"master/recovery_slave_removals", "master/slave_removals/reason_registered", "master/slave_removals/reason_unhealthy", "master/mem_revocable_percent",
"master/slave_removals/reason_unregistered", "master/valid_framework_to_executor_messages", "master/valid_status_update_acknowledgements", "master/mem_revocable_total",
"master/valid_status_updates", "master/task_lost/source_master/reason_invalid_offers", "master/mem_revocable_used",
"master/task_lost/source_master/reason_slave_removed", "master/task_lost/source_slave/reason_executor_terminated", // master
"master/valid_executor_to_framework_messages", "master/event_queue_dispatches", "master/elected",
"master/event_queue_http_requests", "master/event_queue_messages", "registrar/state_fetch_ms", "master/uptime_secs",
"registrar/state_store_ms", "registrar/state_store_ms/max", "registrar/state_store_ms/min", // system
"registrar/state_store_ms/p50", "registrar/state_store_ms/p90", "registrar/state_store_ms/p95", "system/cpus_total",
"registrar/state_store_ms/p99", "registrar/state_store_ms/p999", "registrar/state_store_ms/p9999"} "system/load_15min",
"system/load_5min",
"system/load_1min",
"system/mem_free_bytes",
"system/mem_total_bytes",
// agents
"master/slave_registrations",
"master/slave_removals",
"master/slave_reregistrations",
"master/slave_shutdowns_scheduled",
"master/slave_shutdowns_canceled",
"master/slave_shutdowns_completed",
"master/slaves_active",
"master/slaves_connected",
"master/slaves_disconnected",
"master/slaves_inactive",
// frameworks
"master/frameworks_active",
"master/frameworks_connected",
"master/frameworks_disconnected",
"master/frameworks_inactive",
"master/outstanding_offers",
// tasks
"master/tasks_error",
"master/tasks_failed",
"master/tasks_finished",
"master/tasks_killed",
"master/tasks_lost",
"master/tasks_running",
"master/tasks_staging",
"master/tasks_starting",
// messages
"master/invalid_executor_to_framework_messages",
"master/invalid_framework_to_executor_messages",
"master/invalid_status_update_acknowledgements",
"master/invalid_status_updates",
"master/dropped_messages",
"master/messages_authenticate",
"master/messages_deactivate_framework",
"master/messages_decline_offers",
"master/messages_executor_to_framework",
"master/messages_exited_executor",
"master/messages_framework_to_executor",
"master/messages_kill_task",
"master/messages_launch_tasks",
"master/messages_reconcile_tasks",
"master/messages_register_framework",
"master/messages_register_slave",
"master/messages_reregister_framework",
"master/messages_reregister_slave",
"master/messages_resource_request",
"master/messages_revive_offers",
"master/messages_status_update",
"master/messages_status_update_acknowledgement",
"master/messages_unregister_framework",
"master/messages_unregister_slave",
"master/messages_update_slave",
"master/recovery_slave_removals",
"master/slave_removals/reason_registered",
"master/slave_removals/reason_unhealthy",
"master/slave_removals/reason_unregistered",
"master/valid_framework_to_executor_messages",
"master/valid_status_update_acknowledgements",
"master/valid_status_updates",
"master/task_lost/source_master/reason_invalid_offers",
"master/task_lost/source_master/reason_slave_removed",
"master/task_lost/source_slave/reason_executor_terminated",
"master/valid_executor_to_framework_messages",
// evgqueue
"master/event_queue_dispatches",
"master/event_queue_http_requests",
"master/event_queue_messages",
// registrar
"registrar/state_fetch_ms",
"registrar/state_store_ms",
"registrar/state_store_ms/max",
"registrar/state_store_ms/min",
"registrar/state_store_ms/p50",
"registrar/state_store_ms/p90",
"registrar/state_store_ms/p95",
"registrar/state_store_ms/p99",
"registrar/state_store_ms/p999",
"registrar/state_store_ms/p9999",
}
for _, k := range metricNames { for _, k := range metricNames {
mesosMetrics[k] = rand.Float64() masterMetrics[k] = rand.Float64()
}
slaveMetrics = make(map[string]interface{})
metricNames = []string{
// resources
"slave/cpus_percent",
"slave/cpus_used",
"slave/cpus_total",
"slave/cpus_revocable_percent",
"slave/cpus_revocable_total",
"slave/cpus_revocable_used",
"slave/disk_percent",
"slave/disk_used",
"slave/disk_total",
"slave/disk_revocable_percent",
"slave/disk_revocable_total",
"slave/disk_revocable_used",
"slave/gpus_percent",
"slave/gpus_used",
"slave/gpus_total",
"slave/gpus_revocable_percent",
"slave/gpus_revocable_total",
"slave/gpus_revocable_used",
"slave/mem_percent",
"slave/mem_used",
"slave/mem_total",
"slave/mem_revocable_percent",
"slave/mem_revocable_total",
"slave/mem_revocable_used",
// agent
"slave/registered",
"slave/uptime_secs",
// system
"system/cpus_total",
"system/load_15min",
"system/load_5min",
"system/load_1min",
"system/mem_free_bytes",
"system/mem_total_bytes",
// executors
"containerizer/mesos/container_destroy_errors",
"slave/container_launch_errors",
"slave/executors_preempted",
"slave/frameworks_active",
"slave/executor_directory_max_allowed_age_secs",
"slave/executors_registering",
"slave/executors_running",
"slave/executors_terminated",
"slave/executors_terminating",
"slave/recovery_errors",
// tasks
"slave/tasks_failed",
"slave/tasks_finished",
"slave/tasks_killed",
"slave/tasks_lost",
"slave/tasks_running",
"slave/tasks_staging",
"slave/tasks_starting",
// messages
"slave/invalid_framework_messages",
"slave/invalid_status_updates",
"slave/valid_framework_messages",
"slave/valid_status_updates",
}
for _, k := range metricNames {
slaveMetrics[k] = rand.Float64()
}
slaveTaskMetrics = map[string]interface{}{
"executor_id": fmt.Sprintf("task_%s", randUUID()),
"executor_name": "Some task description",
"framework_id": randUUID(),
"source": fmt.Sprintf("task_source_%s", randUUID()),
"statistics": map[string]interface{}{
"cpus_limit": rand.Float64(),
"cpus_system_time_secs": rand.Float64(),
"cpus_user_time_secs": rand.Float64(),
"mem_anon_bytes": float64(rand.Int63()),
"mem_cache_bytes": float64(rand.Int63()),
"mem_critical_pressure_counter": float64(rand.Int63()),
"mem_file_bytes": float64(rand.Int63()),
"mem_limit_bytes": float64(rand.Int63()),
"mem_low_pressure_counter": float64(rand.Int63()),
"mem_mapped_file_bytes": float64(rand.Int63()),
"mem_medium_pressure_counter": float64(rand.Int63()),
"mem_rss_bytes": float64(rand.Int63()),
"mem_swap_bytes": float64(rand.Int63()),
"mem_total_bytes": float64(rand.Int63()),
"mem_total_memsw_bytes": float64(rand.Int63()),
"mem_unevictable_bytes": float64(rand.Int63()),
"timestamp": rand.Float64(),
},
} }
} }
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
generateMetrics() generateMetrics()
r := http.NewServeMux()
r.HandleFunc("/metrics/snapshot", func(w http.ResponseWriter, r *http.Request) { masterRouter := http.NewServeMux()
masterRouter.HandleFunc("/metrics/snapshot", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(mesosMetrics) json.NewEncoder(w).Encode(masterMetrics)
}) })
ts = httptest.NewServer(r) masterTestServer = httptest.NewServer(masterRouter)
slaveRouter := http.NewServeMux()
slaveRouter.HandleFunc("/metrics/snapshot", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(slaveMetrics)
})
slaveRouter.HandleFunc("/monitor/statistics", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode([]map[string]interface{}{slaveTaskMetrics})
})
slaveTestServer = httptest.NewServer(slaveRouter)
rc := m.Run() rc := m.Run()
ts.Close()
masterTestServer.Close()
slaveTestServer.Close()
os.Exit(rc) os.Exit(rc)
} }
@ -73,7 +278,7 @@ func TestMesosMaster(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
m := Mesos{ m := Mesos{
Masters: []string{ts.Listener.Addr().String()}, Masters: []string{masterTestServer.Listener.Addr().String()},
Timeout: 10, Timeout: 10,
} }
@ -83,34 +288,88 @@ func TestMesosMaster(t *testing.T) {
t.Errorf(err.Error()) t.Errorf(err.Error())
} }
acc.AssertContainsFields(t, "mesos", mesosMetrics) acc.AssertContainsFields(t, "mesos", masterMetrics)
} }
func TestRemoveGroup(t *testing.T) { func TestMasterFilter(t *testing.T) {
generateMetrics()
m := Mesos{ m := Mesos{
MasterCols: []string{ MasterCols: []string{
"resources", "master", "registrar", "resources", "master", "registrar",
}, },
} }
b := []string{ b := []string{
"system", "slaves", "frameworks", "system", "agents", "frameworks",
"messages", "evqueue", "messages", "evqueue", "tasks",
} }
m.removeGroup(&mesosMetrics) m.filterMetrics(MASTER, &masterMetrics)
for _, v := range b { for _, v := range b {
for _, x := range masterBlocks(v) { for _, x := range getMetrics(MASTER, v) {
if _, ok := mesosMetrics[x]; ok { if _, ok := masterMetrics[x]; ok {
t.Errorf("Found key %s, it should be gone.", x) t.Errorf("Found key %s, it should be gone.", x)
} }
} }
} }
for _, v := range m.MasterCols { for _, v := range m.MasterCols {
for _, x := range masterBlocks(v) { for _, x := range getMetrics(MASTER, v) {
if _, ok := mesosMetrics[x]; !ok { if _, ok := masterMetrics[x]; !ok {
t.Errorf("Didn't find key %s, it should present.", x)
}
}
}
}
func TestMesosSlave(t *testing.T) {
var acc testutil.Accumulator
m := Mesos{
Masters: []string{},
Slaves: []string{slaveTestServer.Listener.Addr().String()},
SlaveTasks: true,
Timeout: 10,
}
err := m.Gather(&acc)
if err != nil {
t.Errorf(err.Error())
}
acc.AssertContainsFields(t, "mesos", slaveMetrics)
jf := jsonparser.JSONFlattener{}
err = jf.FlattenJSON("", slaveTaskMetrics)
if err != nil {
t.Errorf(err.Error())
}
acc.AssertContainsFields(t, "mesos-tasks", jf.Fields)
}
func TestSlaveFilter(t *testing.T) {
m := Mesos{
SlaveCols: []string{
"resources", "agent", "tasks",
},
}
b := []string{
"system", "executors", "messages",
}
m.filterMetrics(SLAVE, &slaveMetrics)
for _, v := range b {
for _, x := range getMetrics(SLAVE, v) {
if _, ok := slaveMetrics[x]; ok {
t.Errorf("Found key %s, it should be gone.", x)
}
}
}
for _, v := range m.MasterCols {
for _, x := range getMetrics(SLAVE, v) {
if _, ok := slaveMetrics[x]; !ok {
t.Errorf("Didn't find key %s, it should present.", x) t.Errorf("Didn't find key %s, it should present.", x)
} }
} }