Disable mesos tasks statistics until we find a better way to deal with them.

Due to quite real problem of generating vast number of data series through
mesos tasks metrics this feature is disabled until better solution is found.
This commit is contained in:
Łukasz Harasimowicz 2016-09-23 21:39:59 +02:00 committed by Cameron Sparr
parent 80391bfe1f
commit 32268fb25b
3 changed files with 57 additions and 95 deletions

View File

@ -35,16 +35,10 @@ For more information, please check the [Mesos Observability Metrics](http://meso
# "tasks", # "tasks",
# "messages", # "messages",
# ] # ]
## Include mesos tasks statistics, default is false
# slave_tasks = true
## Should tags in slave task metrics be normalized? This will remove UUIDs from
## task_id tag so we don't generate milions of series in InfluxDB, default is false
# slave_tasks_normalize = true
``` ```
By default this plugin is not configured to gather metrics from mesos. Since a mesos cluster can be deployed in numerous ways it does not provide any default By default this plugin is not configured to gather metrics from mesos. Since a mesos cluster can be deployed in numerous ways it does not provide any default
values. User needs to specify master/slave nodes this plugin will gather metrics from. Additionally, enabling `slave_tasks` will allow values. User needs to specify master/slave nodes this plugin will gather metrics from.
gathering metrics from tasks running on specified slaves (this option is disabled by default).
### Measurements & Fields: ### Measurements & Fields:
@ -238,27 +232,6 @@ Mesos slave metric groups
- slave/valid_framework_messages - slave/valid_framework_messages
- slave/valid_status_updates - slave/valid_status_updates
Mesos tasks metric groups
- executor_id
- cpus_limit
- cpus_system_time_secs
- cpus_user_time_secs
- mem_anon_bytes
- mem_cache_bytes
- mem_critical_pressure_counter
- mem_file_bytes
- mem_limit_bytes
- mem_low_pressure_counter
- mem_mapped_file_bytes
- mem_medium_pressure_counter
- mem_rss_bytes
- mem_swap_bytes
- mem_total_bytes
- mem_total_memsw_bytes
- mem_unevictable_bytes
- timestamp
### Tags: ### Tags:
- All master/slave measurements have the following tags: - All master/slave measurements have the following tags:
@ -268,10 +241,6 @@ Mesos tasks metric groups
- All master measurements have the extra tags: - All master measurements have the extra tags:
- state (leader/follower) - state (leader/follower)
- Tasks measurements have the following tags:
- server
- framework_id
### Example Output: ### Example Output:
``` ```
$ telegraf -config ~/mesos.conf -input-filter mesos -test $ telegraf -config ~/mesos.conf -input-filter mesos -test
@ -295,9 +264,3 @@ master/mem_used=0,master/messages_authenticate=0,
master/messages_deactivate_framework=0 ... master/messages_deactivate_framework=0 ...
``` ```
Meoso tasks metrics (if enabled):
```
> mesos_tasks,framework_id=20151016-120318-1243483658-5050-6139-0000,host=localhost,server=mesos-1
cpus_limit=0.2,cpus_system_time_secs=84.04,cpus_user_time_secs=1161,executor_id="some_app.5d9f3cf8-6b19-11e6-8d24-0242f3fd597e",
mem_limit_bytes=348127232,mem_rss_bytes=310820864,timestamp=1472572204.22177 1472572204000000000...
```

View File

@ -30,7 +30,7 @@ type Mesos struct {
MasterCols []string `toml:"master_collections"` MasterCols []string `toml:"master_collections"`
Slaves []string Slaves []string
SlaveCols []string `toml:"slave_collections"` SlaveCols []string `toml:"slave_collections"`
SlaveTasks bool //SlaveTasks bool
} }
var allMetrics = map[Role][]string{ var allMetrics = map[Role][]string{
@ -66,8 +66,6 @@ var sampleConfig = `
# "tasks", # "tasks",
# "messages", # "messages",
# ] # ]
## Include mesos tasks statistics, default is false
# slave_tasks = true
` `
// SampleConfig returns a sample configuration block // SampleConfig returns a sample configuration block
@ -121,16 +119,16 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error {
return return
}(v) }(v)
if !m.SlaveTasks { // if !m.SlaveTasks {
continue // continue
} // }
wg.Add(1) // wg.Add(1)
go func(c string) { // go func(c string) {
errorChannel <- m.gatherSlaveTaskMetrics(c, ":5051", acc) // errorChannel <- m.gatherSlaveTaskMetrics(c, ":5051", acc)
wg.Done() // wg.Done()
return // return
}(v) // }(v)
} }
wg.Wait() wg.Wait()

View File

@ -15,7 +15,8 @@ import (
var masterMetrics map[string]interface{} var masterMetrics map[string]interface{}
var masterTestServer *httptest.Server var masterTestServer *httptest.Server
var slaveMetrics map[string]interface{} var slaveMetrics map[string]interface{}
var slaveTaskMetrics map[string]interface{}
// var slaveTaskMetrics map[string]interface{}
var slaveTestServer *httptest.Server var slaveTestServer *httptest.Server
func randUUID() string { func randUUID() string {
@ -215,31 +216,31 @@ func generateMetrics() {
slaveMetrics[k] = rand.Float64() slaveMetrics[k] = rand.Float64()
} }
slaveTaskMetrics = map[string]interface{}{ // slaveTaskMetrics = map[string]interface{}{
"executor_id": fmt.Sprintf("task_name.%s", randUUID()), // "executor_id": fmt.Sprintf("task_name.%s", randUUID()),
"executor_name": "Some task description", // "executor_name": "Some task description",
"framework_id": randUUID(), // "framework_id": randUUID(),
"source": fmt.Sprintf("task_source.%s", randUUID()), // "source": fmt.Sprintf("task_source.%s", randUUID()),
"statistics": map[string]interface{}{ // "statistics": map[string]interface{}{
"cpus_limit": rand.Float64(), // "cpus_limit": rand.Float64(),
"cpus_system_time_secs": rand.Float64(), // "cpus_system_time_secs": rand.Float64(),
"cpus_user_time_secs": rand.Float64(), // "cpus_user_time_secs": rand.Float64(),
"mem_anon_bytes": float64(rand.Int63()), // "mem_anon_bytes": float64(rand.Int63()),
"mem_cache_bytes": float64(rand.Int63()), // "mem_cache_bytes": float64(rand.Int63()),
"mem_critical_pressure_counter": float64(rand.Int63()), // "mem_critical_pressure_counter": float64(rand.Int63()),
"mem_file_bytes": float64(rand.Int63()), // "mem_file_bytes": float64(rand.Int63()),
"mem_limit_bytes": float64(rand.Int63()), // "mem_limit_bytes": float64(rand.Int63()),
"mem_low_pressure_counter": float64(rand.Int63()), // "mem_low_pressure_counter": float64(rand.Int63()),
"mem_mapped_file_bytes": float64(rand.Int63()), // "mem_mapped_file_bytes": float64(rand.Int63()),
"mem_medium_pressure_counter": float64(rand.Int63()), // "mem_medium_pressure_counter": float64(rand.Int63()),
"mem_rss_bytes": float64(rand.Int63()), // "mem_rss_bytes": float64(rand.Int63()),
"mem_swap_bytes": float64(rand.Int63()), // "mem_swap_bytes": float64(rand.Int63()),
"mem_total_bytes": float64(rand.Int63()), // "mem_total_bytes": float64(rand.Int63()),
"mem_total_memsw_bytes": float64(rand.Int63()), // "mem_total_memsw_bytes": float64(rand.Int63()),
"mem_unevictable_bytes": float64(rand.Int63()), // "mem_unevictable_bytes": float64(rand.Int63()),
"timestamp": rand.Float64(), // "timestamp": rand.Float64(),
}, // },
} // }
} }
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
@ -259,11 +260,11 @@ func TestMain(m *testing.M) {
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(slaveMetrics) json.NewEncoder(w).Encode(slaveMetrics)
}) })
slaveRouter.HandleFunc("/monitor/statistics", func(w http.ResponseWriter, r *http.Request) { // slaveRouter.HandleFunc("/monitor/statistics", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) // w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json") // w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode([]map[string]interface{}{slaveTaskMetrics}) // json.NewEncoder(w).Encode([]map[string]interface{}{slaveTaskMetrics})
}) // })
slaveTestServer = httptest.NewServer(slaveRouter) slaveTestServer = httptest.NewServer(slaveRouter)
rc := m.Run() rc := m.Run()
@ -323,10 +324,10 @@ func TestMesosSlave(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
m := Mesos{ m := Mesos{
Masters: []string{}, Masters: []string{},
Slaves: []string{slaveTestServer.Listener.Addr().String()}, Slaves: []string{slaveTestServer.Listener.Addr().String()},
SlaveTasks: true, // SlaveTasks: true,
Timeout: 10, Timeout: 10,
} }
err := m.Gather(&acc) err := m.Gather(&acc)
@ -337,17 +338,17 @@ func TestMesosSlave(t *testing.T) {
acc.AssertContainsFields(t, "mesos", slaveMetrics) acc.AssertContainsFields(t, "mesos", slaveMetrics)
expectedFields := make(map[string]interface{}, len(slaveTaskMetrics["statistics"].(map[string]interface{}))+1) // expectedFields := make(map[string]interface{}, len(slaveTaskMetrics["statistics"].(map[string]interface{}))+1)
for k, v := range slaveTaskMetrics["statistics"].(map[string]interface{}) { // for k, v := range slaveTaskMetrics["statistics"].(map[string]interface{}) {
expectedFields[k] = v // expectedFields[k] = v
} // }
expectedFields["executor_id"] = slaveTaskMetrics["executor_id"] // expectedFields["executor_id"] = slaveTaskMetrics["executor_id"]
acc.AssertContainsTaggedFields( // acc.AssertContainsTaggedFields(
t, // t,
"mesos_tasks", // "mesos_tasks",
expectedFields, // expectedFields,
map[string]string{"server": "127.0.0.1", "framework_id": slaveTaskMetrics["framework_id"].(string)}) // map[string]string{"server": "127.0.0.1", "framework_id": slaveTaskMetrics["framework_id"].(string)})
} }
func TestSlaveFilter(t *testing.T) { func TestSlaveFilter(t *testing.T) {