292 lines
6.6 KiB
Go
292 lines
6.6 KiB
Go
package mesos
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"io/ioutil"
|
|
"log"
|
|
"net"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/internal"
|
|
"github.com/influxdata/telegraf/plugins/inputs"
|
|
)
|
|
|
|
type Mesos struct {
|
|
Timeout string
|
|
Servers []string
|
|
MetricsCol []string `toml:"metrics_collection"`
|
|
}
|
|
|
|
// SampleConfig returns a sample configuration block
|
|
func (m *Mesos) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
// Description just returns a short description of the Mesos plugin
|
|
func (m *Mesos) Description() string {
|
|
return "Telegraf plugin for gathering metrics from N Mesos masters"
|
|
}
|
|
|
|
func (m *Mesos) Gather(acc telegraf.Accumulator) error {
|
|
var wg sync.WaitGroup
|
|
var errorChannel chan error
|
|
|
|
if len(m.Servers) == 0 {
|
|
m.Servers = []string{"localhost:5050"}
|
|
}
|
|
|
|
errorChannel = make(chan error, len(m.Servers)*2)
|
|
|
|
for _, v := range m.Servers {
|
|
wg.Add(1)
|
|
go func() {
|
|
errorChannel <- m.gatherMetrics(v, acc)
|
|
wg.Done()
|
|
return
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
close(errorChannel)
|
|
errorStrings := []string{}
|
|
|
|
for err := range errorChannel {
|
|
if err != nil {
|
|
errorStrings = append(errorStrings, err.Error())
|
|
}
|
|
}
|
|
|
|
if len(errorStrings) > 0 {
|
|
return errors.New(strings.Join(errorStrings, "\n"))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func masterBlocks(g string) []string {
|
|
var m map[string][]string
|
|
|
|
m = make(map[string][]string)
|
|
|
|
m["resources"] = []string{
|
|
"master/cpus_percent",
|
|
"master/cpus_used",
|
|
"master/cpus_total",
|
|
"master/cpus_revocable_percent",
|
|
"master/cpus_revocable_total",
|
|
"master/cpus_revocable_used",
|
|
"master/disk_percent",
|
|
"master/disk_used",
|
|
"master/disk_total",
|
|
"master/disk_revocable_percent",
|
|
"master/disk_revocable_total",
|
|
"master/disk_revocable_used",
|
|
"master/mem_percent",
|
|
"master/mem_used",
|
|
"master/mem_total",
|
|
"master/mem_revocable_percent",
|
|
"master/mem_revocable_total",
|
|
"master/mem_revocable_used",
|
|
}
|
|
|
|
m["master"] = []string{
|
|
"master/elected",
|
|
"master/uptime_secs",
|
|
}
|
|
|
|
m["system"] = []string{
|
|
"system/cpus_total",
|
|
"system/load_15min",
|
|
"system/load_5min",
|
|
"system/load_1min",
|
|
"system/mem_free_bytes",
|
|
"system/mem_total_bytes",
|
|
}
|
|
|
|
m["slaves"] = []string{
|
|
"master/slave_registrations",
|
|
"master/slave_removals",
|
|
"master/slave_reregistrations",
|
|
"master/slave_shutdowns_scheduled",
|
|
"master/slave_shutdowns_canceled",
|
|
"master/slave_shutdowns_completed",
|
|
"master/slaves_active",
|
|
"master/slaves_connected",
|
|
"master/slaves_disconnected",
|
|
"master/slaves_inactive",
|
|
}
|
|
|
|
m["frameworks"] = []string{
|
|
"master/frameworks_active",
|
|
"master/frameworks_connected",
|
|
"master/frameworks_disconnected",
|
|
"master/frameworks_inactive",
|
|
"master/outstanding_offers",
|
|
}
|
|
|
|
m["tasks"] = []string{
|
|
"master/tasks_error",
|
|
"master/tasks_failed",
|
|
"master/tasks_finished",
|
|
"master/tasks_killed",
|
|
"master/tasks_lost",
|
|
"master/tasks_running",
|
|
"master/tasks_staging",
|
|
"master/tasks_starting",
|
|
}
|
|
|
|
m["messages"] = []string{
|
|
"master/invalid_executor_to_framework_messages",
|
|
"master/invalid_framework_to_executor_messages",
|
|
"master/invalid_status_update_acknowledgements",
|
|
"master/invalid_status_updates",
|
|
"master/dropped_messages",
|
|
"master/messages_authenticate",
|
|
"master/messages_deactivate_framework",
|
|
"master/messages_decline_offers",
|
|
"master/messages_executor_to_framework",
|
|
"master/messages_exited_executor",
|
|
"master/messages_framework_to_executor",
|
|
"master/messages_kill_task",
|
|
"master/messages_launch_tasks",
|
|
"master/messages_reconcile_tasks",
|
|
"master/messages_register_framework",
|
|
"master/messages_register_slave",
|
|
"master/messages_reregister_framework",
|
|
"master/messages_reregister_slave",
|
|
"master/messages_resource_request",
|
|
"master/messages_revive_offers",
|
|
"master/messages_status_update",
|
|
"master/messages_status_update_acknowledgement",
|
|
"master/messages_unregister_framework",
|
|
"master/messages_unregister_slave",
|
|
"master/messages_update_slave",
|
|
"master/recovery_slave_removals",
|
|
"master/slave_removals/reason_registered",
|
|
"master/slave_removals/reason_unhealthy",
|
|
"master/slave_removals/reason_unregistered",
|
|
"master/valid_framework_to_executor_messages",
|
|
"master/valid_status_update_acknowledgements",
|
|
"master/valid_status_updates",
|
|
"master/task_lost/source_master/reason_invalid_offers",
|
|
"master/task_lost/source_master/reason_slave_removed",
|
|
"master/task_lost/source_slave/reason_executor_terminated",
|
|
"master/valid_executor_to_framework_messages",
|
|
}
|
|
|
|
m["evqueue"] = []string{
|
|
"master/event_queue_dispatches",
|
|
"master/event_queue_http_requests",
|
|
"master/event_queue_messages",
|
|
}
|
|
|
|
m["registrar"] = []string{
|
|
"registrar/state_fetch_ms",
|
|
"registrar/state_store_ms",
|
|
"registrar/state_store_ms/max",
|
|
"registrar/state_store_ms/min",
|
|
"registrar/state_store_ms/p50",
|
|
"registrar/state_store_ms/p90",
|
|
"registrar/state_store_ms/p95",
|
|
"registrar/state_store_ms/p99",
|
|
"registrar/state_store_ms/p999",
|
|
"registrar/state_store_ms/p9999",
|
|
}
|
|
|
|
ret, ok := m[g]
|
|
|
|
if !ok {
|
|
log.Println("Unkown metrics group: ", g)
|
|
return []string{}
|
|
}
|
|
|
|
return ret
|
|
}
|
|
|
|
var sampleConfig = `
|
|
# Timeout, in ms.
|
|
timeout = 2000
|
|
# A list of Mesos masters. e.g. master1:5050, master2:5080, etc.
|
|
# The port can be skipped if using the default (5050)
|
|
# Default value is localhost:5050.
|
|
servers = ["localhost:5050"]
|
|
# Metrics groups to be collected.
|
|
# Default, all enabled.
|
|
metrics_collection = ["resources","master","system","slaves","frameworks","messages","evqueues","registrar"]
|
|
`
|
|
|
|
// removeGroup(), remove blacklisted groups
|
|
func (m *Mesos) removeGroup(j *map[string]interface{}) {
|
|
var ok bool
|
|
u := map[string]bool{}
|
|
|
|
for _, v := range m.MetricsCol {
|
|
for _, k := range masterBlocks(v) {
|
|
u[k] = true
|
|
}
|
|
}
|
|
|
|
for k, _ := range u {
|
|
if _, ok = (*j)[k]; ok {
|
|
delete((*j), k)
|
|
}
|
|
}
|
|
}
|
|
|
|
// This should not belong to the object
|
|
func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error {
|
|
var jsonOut map[string]interface{}
|
|
|
|
host, _, err := net.SplitHostPort(a)
|
|
if err != nil {
|
|
host = a
|
|
a = a + ":5050"
|
|
}
|
|
|
|
tags := map[string]string{
|
|
"server": host,
|
|
}
|
|
|
|
// TODO: Use Timeout
|
|
resp, err := http.Get("http://" + a + "/metrics/snapshot")
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
data, err := ioutil.ReadAll(resp.Body)
|
|
resp.Body.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = json.Unmarshal([]byte(data), &jsonOut); err != nil {
|
|
return errors.New("Error decoding JSON response")
|
|
}
|
|
|
|
//if len(m.Blacklist) > 0 {
|
|
// m.removeGroup(&jsonOut)
|
|
//}
|
|
|
|
jf := internal.JSONFlattener{}
|
|
|
|
err = jf.FlattenJSON("", jsonOut)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
acc.AddFields("mesos", jf.Fields, tags)
|
|
|
|
return nil
|
|
}
|
|
|
|
func init() {
|
|
inputs.Add("mesos", func() telegraf.Input {
|
|
return &Mesos{}
|
|
})
|
|
}
|