Some improvment in mesos input plugin, (#1572)

* Some improvment in mesos input plugin,
     Removing uneeded statistics prefix for task's metric,
     Adding framework id tags into each task's metric,
     Adding state (leader/follower) tags to master's metric,
     Make sure the slave's metrics are tags with slave

* typo, replacing cpus_total with elected to determine leader

* Remove remaining statistics_ from sample

* using timestamp from mesos as metric timestamp

* change mesos-tasks to mesos_tasks, measurement

* change measurement name in test

* Replace follower by standby
This commit is contained in:
tuier 2016-08-30 15:25:29 +01:00 committed by Cameron Sparr
parent 602a36e241
commit 30cdc31a27
3 changed files with 44 additions and 18 deletions

View File

@ -241,7 +241,7 @@ Mesos tasks metric groups
- executor_name
- framework_id
- source
- statistics (all metrics below will have `statistics_` prefix included in their names
- statistics
- cpus_limit
- cpus_system_time_secs
- cpus_user_time_secs
@ -266,14 +266,20 @@ Mesos tasks metric groups
- server
- role (master/slave)
- Tasks measurements have the following tags:
- All master measurements have the extra tags:
- state (leader/follower)
- Tasks measurements have the following tags:
- server
- framework_id
- task_id
### Example Output:
```
$ telegraf -config ~/mesos.conf -input-filter mesos -test
* Plugin: mesos, Collection 1
mesos,host=172.17.8.102,server=172.17.8.101 allocator/event_queue_dispatches=0,master/cpus_percent=0,
mesos,role=master,state=leader,host=172.17.8.102,server=172.17.8.101
allocator/event_queue_dispatches=0,master/cpus_percent=0,
master/cpus_revocable_percent=0,master/cpus_revocable_total=0,
master/cpus_revocable_used=0,master/cpus_total=2,
master/cpus_used=0,master/disk_percent=0,master/disk_revocable_percent=0,
@ -293,13 +299,13 @@ master/messages_deactivate_framework=0 ...
Meoso tasks metrics (if enabled):
```
mesos-tasks,host=172.17.8.102,server=172.17.8.101,task_id=hello-world.e4b5b497-2ccd-11e6-a659-0242fb222ce2
statistics_cpus_limit=0.2,statistics_cpus_system_time_secs=142.49,statistics_cpus_user_time_secs=388.14,
statistics_mem_anon_bytes=359129088,statistics_mem_cache_bytes=3964928,
statistics_mem_critical_pressure_counter=0,statistics_mem_file_bytes=3964928,
statistics_mem_limit_bytes=767557632,statistics_mem_low_pressure_counter=0,
statistics_mem_mapped_file_bytes=114688,statistics_mem_medium_pressure_counter=0,
statistics_mem_rss_bytes=359129088,statistics_mem_swap_bytes=0,statistics_mem_total_bytes=363094016,
statistics_mem_total_memsw_bytes=363094016,statistics_mem_unevictable_bytes=0,
statistics_timestamp=1465486052.70525 1465486053052811792...
mesos-tasks,host=172.17.8.102,server=172.17.8.101,framework_id=e3060235-c4ed-4765-9d36-784e3beca07f-0000,task_id=hello-world.e4b5b497-2ccd-11e6-a659-0242fb222ce2
cpus_limit=0.2,cpus_system_time_secs=142.49,cpus_user_time_secs=388.14,
mem_anon_bytes=359129088,mem_cache_bytes=3964928,
mem_critical_pressure_counter=0,mem_file_bytes=3964928,
mem_limit_bytes=767557632,mem_low_pressure_counter=0,
mem_mapped_file_bytes=114688,mem_medium_pressure_counter=0,
mem_rss_bytes=359129088,mem_swap_bytes=0,mem_total_bytes=363094016,
mem_total_memsw_bytes=363094016,mem_unevictable_bytes=0,
timestamp=1465486052.70525 1465486053052811792...
```

View File

@ -116,7 +116,7 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error {
for _, v := range m.Slaves {
wg.Add(1)
go func(c string) {
errorChannel <- m.gatherMainMetrics(c, ":5051", MASTER, acc)
errorChannel <- m.gatherMainMetrics(c, ":5051", SLAVE, acc)
wg.Done()
return
}(v)
@ -420,8 +420,15 @@ var client = &http.Client{
Timeout: time.Duration(4 * time.Second),
}
// TaskStats struct for JSON API output /monitor/statistics
type TaskStats struct {
ExecutorID string `json:"executor_id"`
FrameworkID string `json:"framework_id"`
Statistics map[string]interface{} `json:"statistics"`
}
func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc telegraf.Accumulator) error {
var metrics []map[string]interface{}
var metrics []TaskStats
host, _, err := net.SplitHostPort(address)
if err != nil {
@ -452,16 +459,18 @@ func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc t
}
for _, task := range metrics {
tags["task_id"] = task["executor_id"].(string)
tags["task_id"] = task.ExecutorID
tags["framework_id"] = task.FrameworkID
jf := jsonparser.JSONFlattener{}
err = jf.FlattenJSON("", task)
err = jf.FlattenJSON("", task.Statistics)
if err != nil {
return err
}
timestamp := time.Unix(int64(jf.Fields["timestamp"].(float64)), 0)
acc.AddFields("mesos-tasks", jf.Fields, tags)
acc.AddFields("mesos_tasks", jf.Fields, tags, timestamp)
}
return nil
@ -510,6 +519,14 @@ func (m *Mesos) gatherMainMetrics(a string, defaultPort string, role Role, acc t
return err
}
if role == MASTER {
if jf.Fields["master/elected"] != 0.0 {
tags["state"] = "leader"
} else {
tags["state"] = "standby"
}
}
acc.AddFields("mesos", jf.Fields, tags)
return nil

View File

@ -345,7 +345,10 @@ func TestMesosSlave(t *testing.T) {
t.Errorf(err.Error())
}
acc.AssertContainsFields(t, "mesos-tasks", jf.Fields)
acc.AssertContainsFields(
t,
"mesos_tasks",
slaveTaskMetrics["statistics"].(map[string]interface{}))
}
func TestSlaveFilter(t *testing.T) {