diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 9f2122e21..74331e54b 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -20,6 +20,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/lustre2" _ "github.com/influxdata/telegraf/plugins/inputs/mailchimp" _ "github.com/influxdata/telegraf/plugins/inputs/memcached" + _ "github.com/influxdata/telegraf/plugins/inputs/mesos" _ "github.com/influxdata/telegraf/plugins/inputs/mongodb" _ "github.com/influxdata/telegraf/plugins/inputs/mqtt_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/mysql" diff --git a/plugins/inputs/mesos/mesos.go b/plugins/inputs/mesos/mesos.go new file mode 100644 index 000000000..835c14b78 --- /dev/null +++ b/plugins/inputs/mesos/mesos.go @@ -0,0 +1,260 @@ +package mesos + +import ( + "encoding/json" + "errors" + "io/ioutil" + "net" + "net/http" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" +) + +type Mesos struct { + Timeout string + Servers []string + Blacklist []string +} + +func masterBlocks(g string) ([]string, error) { + var m map[string][]string + + m = make(map[string][]string) + + m["resources"] = []string{ + "master/cpus_percent", + "master/cpus_used", + "master/cpus_total", + "master/cpus_revocable_percent", + "master/cpus_revocable_total", + "master/cpus_revocable_used", + "master/disk_percent", + "master/disk_used", + "master/disk_total", + "master/disk_revocable_percent", + "master/disk_revocable_total", + "master/disk_revocable_used", + "master/mem_percent", + "master/mem_used", + "master/mem_total", + "master/mem_revocable_percent", + "master/mem_revocable_total", + "master/mem_revocable_used", + } + + m["master"] = []string{ + "master/elected", + "master/uptime_secs", + } + + m["system"] = []string{ + "system/cpus_total", + "system/load_15min", + "system/load_5min", + "system/load_1min", + "system/mem_free_bytes", + "system/mem_total_bytes", + } + + m["slaves"] = []string{ + "master/slave_registrations", + "master/slave_removals", + "master/slave_reregistrations", + "master/slave_shutdowns_scheduled", + "master/slave_shutdowns_canceled", + "master/slave_shutdowns_completed", + "master/slaves_active", + "master/slaves_connected", + "master/slaves_disconnected", + "master/slaves_inactive", + } + + m["frameworks"] = []string{ + "master/frameworks_active", + "master/frameworks_connected", + "master/frameworks_disconnected", + "master/frameworks_inactive", + "master/outstanding_offers", + } + + m["tasks"] = []string{ + "master/tasks_error", + "master/tasks_failed", + "master/tasks_finished", + "master/tasks_killed", + "master/tasks_lost", + "master/tasks_running", + "master/tasks_staging", + "master/tasks_starting", + } + + m["messages"] = []string{ + "master/invalid_executor_to_framework_messages", + "master/invalid_framework_to_executor_messages", + "master/invalid_status_update_acknowledgements", + "master/invalid_status_updates", + "master/dropped_messages", + "master/messages_authenticate", + "master/messages_deactivate_framework", + "master/messages_decline_offers", + "master/messages_executor_to_framework", + "master/messages_exited_executor", + "master/messages_framework_to_executor", + "master/messages_kill_task", + "master/messages_launch_tasks", + "master/messages_reconcile_tasks", + "master/messages_register_framework", + "master/messages_register_slave", + "master/messages_reregister_framework", + "master/messages_reregister_slave", + "master/messages_resource_request", + "master/messages_revive_offers", + "master/messages_status_update", + "master/messages_status_update_acknowledgement", + "master/messages_unregister_framework", + "master/messages_unregister_slave", + "master/messages_update_slave", + "master/recovery_slave_removals", + "master/slave_removals/reason_registered", + "master/slave_removals/reason_unhealthy", + "master/slave_removals/reason_unregistered", + "master/valid_framework_to_executor_messages", + "master/valid_status_update_acknowledgements", + "master/valid_status_updates", + "master/task_lost/source_master/reason_invalid_offers", + "master/task_lost/source_master/reason_slave_removed", + "master/task_lost/source_slave/reason_executor_terminated", + "master/valid_executor_to_framework_messages", + } + + m["evqueue"] = []string{ + "master/event_queue_dispatches", + "master/event_queue_http_requests", + "master/event_queue_messages", + } + + m["registrar"] = []string{ + "registrar/state_fetch_ms", + "registrar/state_store_ms", + "registrar/state_store_ms/max", + "registrar/state_store_ms/min", + "registrar/state_store_ms/p50", + "registrar/state_store_ms/p90", + "registrar/state_store_ms/p95", + "registrar/state_store_ms/p99", + "registrar/state_store_ms/p999", + "registrar/state_store_ms/p9999", + } + + ret, ok := m[g] + + if !ok { + return nil, errors.New("Unknown group:" + g) + } + + return ret, nil +} + +type masterMestrics struct { + resources []string +} + +var sampleConfig = ` + # Timeout, in ms. + timeout = 2000 + # A list of Mesos masters. e.g. master1:5050, master2:5080, etc. + # The port can be skipped if using the default (5050) + # Default value is localhost:5050. + servers = ["localhost:5050"] + blacklist = ["system"] +` + +// removeGroup(), remove blacklisted groups +func (m *Mesos) removeGroup(j *map[string]interface{}) error { + for _, v := range m.Blacklist { + ms, err := masterBlocks(v) + if err != nil { + return err + } + for _, sv := range ms { + delete((*j), sv) + } + } + return nil +} + +// SampleConfig returns a sample configuration block +func (m *Mesos) SampleConfig() string { + return sampleConfig +} + +// Description just returns a short description of the Mesos plugin +func (m *Mesos) Description() string { + return "Telegraf plugin for gathering metrics from N Mesos masters" +} + +func (m *Mesos) Gather(acc telegraf.Accumulator) error { + if len(m.Servers) == 0 { + return m.gatherMetrics("localhost:5050", acc) + } + + for _, v := range m.Servers { + if err := m.gatherMetrics(v, acc); err != nil { + return err + } + } + return nil +} + +func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error { + var jsonOut map[string]interface{} + + if _, _, err := net.SplitHostPort(a); err != nil { + a = a + ":5050" + } + + tags := map[string]string{ + "server": a, + } + + // TODO: Use Timeout + resp, err := http.Get("http://" + a + "/metrics/snapshot") + + if err != nil { + return err + } + + data, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + return err + } + + if err = json.Unmarshal([]byte(data), &jsonOut); err != nil { + return errors.New("Error decoding JSON response") + } + + if len(m.Blacklist) > 0 { + m.removeGroup(&jsonOut) + } + + jf := internal.JSONFlattener{} + + err = jf.FlattenJSON("", jsonOut) + + if err != nil { + return err + } + + acc.AddFields("mesos", jf.Fields, tags) + + return nil +} + +func init() { + inputs.Add("mesos", func() telegraf.Input { + return &Mesos{} + }) +} diff --git a/plugins/inputs/mesos/mesos_test.go b/plugins/inputs/mesos/mesos_test.go new file mode 100644 index 000000000..0bd9d02cb --- /dev/null +++ b/plugins/inputs/mesos/mesos_test.go @@ -0,0 +1,119 @@ +package mesos + +import ( + "encoding/json" + "math/rand" + "net/http" + "net/http/httptest" + "os" + "testing" + + "github.com/influxdata/telegraf/testutil" +) + +var mesosMetrics map[string]interface{} +var ts *httptest.Server + +func generateMetrics() { + mesosMetrics = make(map[string]interface{}) + + metricNames := []string{"master/cpus_percent", "master/cpus_used", "master/cpus_total", + "master/cpus_revocable_percent", "master/cpus_revocable_total", "master/cpus_revocable_used", + "master/disk_percent", "master/disk_used", "master/disk_total", "master/disk_revocable_percent", + "master/disk_revocable_total", "master/disk_revocable_used", "master/mem_percent", + "master/mem_used", "master/mem_total", "master/mem_revocable_percent", "master/mem_revocable_total", + "master/mem_revocable_used", "master/elected", "master/uptime_secs", "system/cpus_total", + "system/load_15min", "system/load_5min", "system/load_1min", "system/mem_free_bytes", + "system/mem_total_bytes", "master/slave_registrations", "master/slave_removals", + "master/slave_reregistrations", "master/slave_shutdowns_scheduled", "master/slave_shutdowns_canceled", + "master/slave_shutdowns_completed", "master/slaves_active", "master/slaves_connected", + "master/slaves_disconnected", "master/slaves_inactive", "master/frameworks_active", + "master/frameworks_connected", "master/frameworks_disconnected", "master/frameworks_inactive", + "master/outstanding_offers", "master/tasks_error", "master/tasks_failed", "master/tasks_finished", + "master/tasks_killed", "master/tasks_lost", "master/tasks_running", "master/tasks_staging", + "master/tasks_starting", "master/invalid_executor_to_framework_messages", "master/invalid_framework_to_executor_messages", + "master/invalid_status_update_acknowledgements", "master/invalid_status_updates", + "master/dropped_messages", "master/messages_authenticate", "master/messages_deactivate_framework", + "master/messages_decline_offers", "master/messages_executor_to_framework", "master/messages_exited_executor", + "master/messages_framework_to_executor", "master/messages_kill_task", "master/messages_launch_tasks", + "master/messages_reconcile_tasks", "master/messages_register_framework", "master/messages_register_slave", + "master/messages_reregister_framework", "master/messages_reregister_slave", "master/messages_resource_request", + "master/messages_revive_offers", "master/messages_status_update", "master/messages_status_update_acknowledgement", + "master/messages_unregister_framework", "master/messages_unregister_slave", "master/messages_update_slave", + "master/recovery_slave_removals", "master/slave_removals/reason_registered", "master/slave_removals/reason_unhealthy", + "master/slave_removals/reason_unregistered", "master/valid_framework_to_executor_messages", "master/valid_status_update_acknowledgements", + "master/valid_status_updates", "master/task_lost/source_master/reason_invalid_offers", + "master/task_lost/source_master/reason_slave_removed", "master/task_lost/source_slave/reason_executor_terminated", + "master/valid_executor_to_framework_messages", "master/event_queue_dispatches", + "master/event_queue_http_requests", "master/event_queue_messages", "registrar/state_fetch_ms", + "registrar/state_store_ms", "registrar/state_store_ms/max", "registrar/state_store_ms/min", + "registrar/state_store_ms/p50", "registrar/state_store_ms/p90", "registrar/state_store_ms/p95", + "registrar/state_store_ms/p99", "registrar/state_store_ms/p999", "registrar/state_store_ms/p9999"} + + for _, k := range metricNames { + mesosMetrics[k] = rand.Float64() + } +} + +func TestMain(m *testing.M) { + generateMetrics() + r := http.NewServeMux() + r.HandleFunc("/metrics/snapshot", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(mesosMetrics) + }) + ts = httptest.NewServer(r) + rc := m.Run() + ts.Close() + os.Exit(rc) +} + +func TestMesosMaster(t *testing.T) { + var acc testutil.Accumulator + + m := Mesos{ + Servers: []string{ts.Listener.Addr().String()}, + } + + err := m.Gather(&acc) + + if err != nil { + t.Errorf(err.Error()) + } + + acc.AssertContainsFields(t, "mesos", mesosMetrics) +} + +func TestRemoveGroup(t *testing.T) { + j := []string{ + "resources", "master", + "system", "slaves", "frameworks", + "tasks", "messages", "evqueue", + "messages", "registrar", + } + + generateMetrics() + + for _, v := range j { + m := Mesos{ + Blacklist: []string{v}, + } + err := m.removeGroup(&mesosMetrics) + if err != nil { + t.Errorf("Error removing non-exiting key: %s.", v) + } + } + + if len(mesosMetrics) > 0 { + t.Error("Keys were left at slice sample") + } + + m := Mesos{ + Blacklist: []string{"fail"}, + } + + if err := m.removeGroup(&mesosMetrics); err == nil { + t.Errorf("Key %s should have returned error.", m.Blacklist[0]) + } +}