package mesos import ( "encoding/json" "errors" "io/ioutil" "log" "net" "net/http" "strconv" "strings" "sync" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" ) type Mesos struct { Timeout int Masters []string MasterCols []string `toml:"master_collections"` } var defaultMetrics = []string{ "resources", "master", "system", "slaves", "frameworks", "tasks", "messages", "evqueue", "messages", "registrar", } var sampleConfig = ` # Timeout, in ms. timeout = 100 # A list of Mesos masters, default value is localhost:5050. masters = ["localhost:5050"] # Metrics groups to be collected, by default, all enabled. master_collections = [ "resources", "master", "system", "slaves", "frameworks", "messages", "evqueue", "registrar", ] ` // SampleConfig returns a sample configuration block func (m *Mesos) SampleConfig() string { return sampleConfig } // Description just returns a short description of the Mesos plugin func (m *Mesos) Description() string { return "Telegraf plugin for gathering metrics from N Mesos masters" } // Gather() metrics from given list of Mesos Masters func (m *Mesos) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup var errorChannel chan error if len(m.Masters) == 0 { m.Masters = []string{"localhost:5050"} } errorChannel = make(chan error, len(m.Masters)*2) for _, v := range m.Masters { wg.Add(1) go func(c string) { errorChannel <- m.gatherMetrics(c, acc) wg.Done() return }(v) } wg.Wait() close(errorChannel) errorStrings := []string{} // Gather all errors for returning them at once for err := range errorChannel { if err != nil { errorStrings = append(errorStrings, err.Error()) } } if len(errorStrings) > 0 { return errors.New(strings.Join(errorStrings, "\n")) } return nil } // metricsDiff() returns set names for removal func metricsDiff(w []string) []string { b := []string{} s := make(map[string]bool) if len(w) == 0 { return b } for _, v := range w { s[v] = true } for _, d := range defaultMetrics { if _, ok := s[d]; !ok { b = append(b, d) } } return b } // masterBlocks serves as kind of metrics registry groupping them in sets func masterBlocks(g string) []string { var m map[string][]string m = make(map[string][]string) m["resources"] = []string{ "master/cpus_percent", "master/cpus_used", "master/cpus_total", "master/cpus_revocable_percent", "master/cpus_revocable_total", "master/cpus_revocable_used", "master/disk_percent", "master/disk_used", "master/disk_total", "master/disk_revocable_percent", "master/disk_revocable_total", "master/disk_revocable_used", "master/mem_percent", "master/mem_used", "master/mem_total", "master/mem_revocable_percent", "master/mem_revocable_total", "master/mem_revocable_used", } m["master"] = []string{ "master/elected", "master/uptime_secs", } m["system"] = []string{ "system/cpus_total", "system/load_15min", "system/load_5min", "system/load_1min", "system/mem_free_bytes", "system/mem_total_bytes", } m["slaves"] = []string{ "master/slave_registrations", "master/slave_removals", "master/slave_reregistrations", "master/slave_shutdowns_scheduled", "master/slave_shutdowns_canceled", "master/slave_shutdowns_completed", "master/slaves_active", "master/slaves_connected", "master/slaves_disconnected", "master/slaves_inactive", } m["frameworks"] = []string{ "master/frameworks_active", "master/frameworks_connected", "master/frameworks_disconnected", "master/frameworks_inactive", "master/outstanding_offers", } m["tasks"] = []string{ "master/tasks_error", "master/tasks_failed", "master/tasks_finished", "master/tasks_killed", "master/tasks_lost", "master/tasks_running", "master/tasks_staging", "master/tasks_starting", } m["messages"] = []string{ "master/invalid_executor_to_framework_messages", "master/invalid_framework_to_executor_messages", "master/invalid_status_update_acknowledgements", "master/invalid_status_updates", "master/dropped_messages", "master/messages_authenticate", "master/messages_deactivate_framework", "master/messages_decline_offers", "master/messages_executor_to_framework", "master/messages_exited_executor", "master/messages_framework_to_executor", "master/messages_kill_task", "master/messages_launch_tasks", "master/messages_reconcile_tasks", "master/messages_register_framework", "master/messages_register_slave", "master/messages_reregister_framework", "master/messages_reregister_slave", "master/messages_resource_request", "master/messages_revive_offers", "master/messages_status_update", "master/messages_status_update_acknowledgement", "master/messages_unregister_framework", "master/messages_unregister_slave", "master/messages_update_slave", "master/recovery_slave_removals", "master/slave_removals/reason_registered", "master/slave_removals/reason_unhealthy", "master/slave_removals/reason_unregistered", "master/valid_framework_to_executor_messages", "master/valid_status_update_acknowledgements", "master/valid_status_updates", "master/task_lost/source_master/reason_invalid_offers", "master/task_lost/source_master/reason_slave_removed", "master/task_lost/source_slave/reason_executor_terminated", "master/valid_executor_to_framework_messages", } m["evqueue"] = []string{ "master/event_queue_dispatches", "master/event_queue_http_requests", "master/event_queue_messages", } m["registrar"] = []string{ "registrar/state_fetch_ms", "registrar/state_store_ms", "registrar/state_store_ms/max", "registrar/state_store_ms/min", "registrar/state_store_ms/p50", "registrar/state_store_ms/p90", "registrar/state_store_ms/p95", "registrar/state_store_ms/p99", "registrar/state_store_ms/p999", "registrar/state_store_ms/p9999", } ret, ok := m[g] if !ok { log.Println("[mesos] Unkown metrics group: ", g) return []string{} } return ret } // removeGroup(), remove unwanted sets func (m *Mesos) removeGroup(j *map[string]interface{}) { var ok bool b := metricsDiff(m.MasterCols) for _, k := range b { for _, v := range masterBlocks(k) { if _, ok = (*j)[v]; ok { delete((*j), v) } } } } var tr = &http.Transport{ ResponseHeaderTimeout: time.Duration(3 * time.Second), } var client = &http.Client{ Transport: tr, Timeout: time.Duration(4 * time.Second), } // This should not belong to the object func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error { var jsonOut map[string]interface{} host, _, err := net.SplitHostPort(a) if err != nil { host = a a = a + ":5050" } tags := map[string]string{ "server": host, } if m.Timeout == 0 { log.Println("[mesos] Missing timeout value, setting default value (100ms)") m.Timeout = 100 } ts := strconv.Itoa(m.Timeout) + "ms" resp, err := client.Get("http://" + a + "/metrics/snapshot?timeout=" + ts) if err != nil { return err } data, err := ioutil.ReadAll(resp.Body) resp.Body.Close() if err != nil { return err } if err = json.Unmarshal([]byte(data), &jsonOut); err != nil { return errors.New("Error decoding JSON response") } m.removeGroup(&jsonOut) jf := jsonparser.JSONFlattener{} err = jf.FlattenJSON("", jsonOut) if err != nil { return err } acc.AddFields("mesos", jf.Fields, tags) return nil } func init() { inputs.Add("mesos", func() telegraf.Input { return &Mesos{} }) }