plugin(mesos): Initial commit
The plugin is able to query a Mesos master and push the metrics, a blacklist can be configured and a timeout, it's still not used. Added unit test, might be a good idea to have system test using docker.
This commit is contained in:
parent
cabea27adf
commit
5a7c19f47d
|
@ -20,6 +20,7 @@ import (
|
|||
_ "github.com/influxdata/telegraf/plugins/inputs/lustre2"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/mailchimp"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/memcached"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/mesos"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/mongodb"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/mqtt_consumer"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/mysql"
|
||||
|
|
|
@ -0,0 +1,260 @@
|
|||
package mesos
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
type Mesos struct {
|
||||
Timeout string
|
||||
Servers []string
|
||||
Blacklist []string
|
||||
}
|
||||
|
||||
func masterBlocks(g string) ([]string, error) {
|
||||
var m map[string][]string
|
||||
|
||||
m = make(map[string][]string)
|
||||
|
||||
m["resources"] = []string{
|
||||
"master/cpus_percent",
|
||||
"master/cpus_used",
|
||||
"master/cpus_total",
|
||||
"master/cpus_revocable_percent",
|
||||
"master/cpus_revocable_total",
|
||||
"master/cpus_revocable_used",
|
||||
"master/disk_percent",
|
||||
"master/disk_used",
|
||||
"master/disk_total",
|
||||
"master/disk_revocable_percent",
|
||||
"master/disk_revocable_total",
|
||||
"master/disk_revocable_used",
|
||||
"master/mem_percent",
|
||||
"master/mem_used",
|
||||
"master/mem_total",
|
||||
"master/mem_revocable_percent",
|
||||
"master/mem_revocable_total",
|
||||
"master/mem_revocable_used",
|
||||
}
|
||||
|
||||
m["master"] = []string{
|
||||
"master/elected",
|
||||
"master/uptime_secs",
|
||||
}
|
||||
|
||||
m["system"] = []string{
|
||||
"system/cpus_total",
|
||||
"system/load_15min",
|
||||
"system/load_5min",
|
||||
"system/load_1min",
|
||||
"system/mem_free_bytes",
|
||||
"system/mem_total_bytes",
|
||||
}
|
||||
|
||||
m["slaves"] = []string{
|
||||
"master/slave_registrations",
|
||||
"master/slave_removals",
|
||||
"master/slave_reregistrations",
|
||||
"master/slave_shutdowns_scheduled",
|
||||
"master/slave_shutdowns_canceled",
|
||||
"master/slave_shutdowns_completed",
|
||||
"master/slaves_active",
|
||||
"master/slaves_connected",
|
||||
"master/slaves_disconnected",
|
||||
"master/slaves_inactive",
|
||||
}
|
||||
|
||||
m["frameworks"] = []string{
|
||||
"master/frameworks_active",
|
||||
"master/frameworks_connected",
|
||||
"master/frameworks_disconnected",
|
||||
"master/frameworks_inactive",
|
||||
"master/outstanding_offers",
|
||||
}
|
||||
|
||||
m["tasks"] = []string{
|
||||
"master/tasks_error",
|
||||
"master/tasks_failed",
|
||||
"master/tasks_finished",
|
||||
"master/tasks_killed",
|
||||
"master/tasks_lost",
|
||||
"master/tasks_running",
|
||||
"master/tasks_staging",
|
||||
"master/tasks_starting",
|
||||
}
|
||||
|
||||
m["messages"] = []string{
|
||||
"master/invalid_executor_to_framework_messages",
|
||||
"master/invalid_framework_to_executor_messages",
|
||||
"master/invalid_status_update_acknowledgements",
|
||||
"master/invalid_status_updates",
|
||||
"master/dropped_messages",
|
||||
"master/messages_authenticate",
|
||||
"master/messages_deactivate_framework",
|
||||
"master/messages_decline_offers",
|
||||
"master/messages_executor_to_framework",
|
||||
"master/messages_exited_executor",
|
||||
"master/messages_framework_to_executor",
|
||||
"master/messages_kill_task",
|
||||
"master/messages_launch_tasks",
|
||||
"master/messages_reconcile_tasks",
|
||||
"master/messages_register_framework",
|
||||
"master/messages_register_slave",
|
||||
"master/messages_reregister_framework",
|
||||
"master/messages_reregister_slave",
|
||||
"master/messages_resource_request",
|
||||
"master/messages_revive_offers",
|
||||
"master/messages_status_update",
|
||||
"master/messages_status_update_acknowledgement",
|
||||
"master/messages_unregister_framework",
|
||||
"master/messages_unregister_slave",
|
||||
"master/messages_update_slave",
|
||||
"master/recovery_slave_removals",
|
||||
"master/slave_removals/reason_registered",
|
||||
"master/slave_removals/reason_unhealthy",
|
||||
"master/slave_removals/reason_unregistered",
|
||||
"master/valid_framework_to_executor_messages",
|
||||
"master/valid_status_update_acknowledgements",
|
||||
"master/valid_status_updates",
|
||||
"master/task_lost/source_master/reason_invalid_offers",
|
||||
"master/task_lost/source_master/reason_slave_removed",
|
||||
"master/task_lost/source_slave/reason_executor_terminated",
|
||||
"master/valid_executor_to_framework_messages",
|
||||
}
|
||||
|
||||
m["evqueue"] = []string{
|
||||
"master/event_queue_dispatches",
|
||||
"master/event_queue_http_requests",
|
||||
"master/event_queue_messages",
|
||||
}
|
||||
|
||||
m["registrar"] = []string{
|
||||
"registrar/state_fetch_ms",
|
||||
"registrar/state_store_ms",
|
||||
"registrar/state_store_ms/max",
|
||||
"registrar/state_store_ms/min",
|
||||
"registrar/state_store_ms/p50",
|
||||
"registrar/state_store_ms/p90",
|
||||
"registrar/state_store_ms/p95",
|
||||
"registrar/state_store_ms/p99",
|
||||
"registrar/state_store_ms/p999",
|
||||
"registrar/state_store_ms/p9999",
|
||||
}
|
||||
|
||||
ret, ok := m[g]
|
||||
|
||||
if !ok {
|
||||
return nil, errors.New("Unknown group:" + g)
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
type masterMestrics struct {
|
||||
resources []string
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
# Timeout, in ms.
|
||||
timeout = 2000
|
||||
# A list of Mesos masters. e.g. master1:5050, master2:5080, etc.
|
||||
# The port can be skipped if using the default (5050)
|
||||
# Default value is localhost:5050.
|
||||
servers = ["localhost:5050"]
|
||||
blacklist = ["system"]
|
||||
`
|
||||
|
||||
// removeGroup(), remove blacklisted groups
|
||||
func (m *Mesos) removeGroup(j *map[string]interface{}) error {
|
||||
for _, v := range m.Blacklist {
|
||||
ms, err := masterBlocks(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, sv := range ms {
|
||||
delete((*j), sv)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SampleConfig returns a sample configuration block
|
||||
func (m *Mesos) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
// Description just returns a short description of the Mesos plugin
|
||||
func (m *Mesos) Description() string {
|
||||
return "Telegraf plugin for gathering metrics from N Mesos masters"
|
||||
}
|
||||
|
||||
func (m *Mesos) Gather(acc telegraf.Accumulator) error {
|
||||
if len(m.Servers) == 0 {
|
||||
return m.gatherMetrics("localhost:5050", acc)
|
||||
}
|
||||
|
||||
for _, v := range m.Servers {
|
||||
if err := m.gatherMetrics(v, acc); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error {
|
||||
var jsonOut map[string]interface{}
|
||||
|
||||
if _, _, err := net.SplitHostPort(a); err != nil {
|
||||
a = a + ":5050"
|
||||
}
|
||||
|
||||
tags := map[string]string{
|
||||
"server": a,
|
||||
}
|
||||
|
||||
// TODO: Use Timeout
|
||||
resp, err := http.Get("http://" + a + "/metrics/snapshot")
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
data, err := ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = json.Unmarshal([]byte(data), &jsonOut); err != nil {
|
||||
return errors.New("Error decoding JSON response")
|
||||
}
|
||||
|
||||
if len(m.Blacklist) > 0 {
|
||||
m.removeGroup(&jsonOut)
|
||||
}
|
||||
|
||||
jf := internal.JSONFlattener{}
|
||||
|
||||
err = jf.FlattenJSON("", jsonOut)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
acc.AddFields("mesos", jf.Fields, tags)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("mesos", func() telegraf.Input {
|
||||
return &Mesos{}
|
||||
})
|
||||
}
|
|
@ -0,0 +1,119 @@
|
|||
package mesos
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
)
|
||||
|
||||
var mesosMetrics map[string]interface{}
|
||||
var ts *httptest.Server
|
||||
|
||||
func generateMetrics() {
|
||||
mesosMetrics = make(map[string]interface{})
|
||||
|
||||
metricNames := []string{"master/cpus_percent", "master/cpus_used", "master/cpus_total",
|
||||
"master/cpus_revocable_percent", "master/cpus_revocable_total", "master/cpus_revocable_used",
|
||||
"master/disk_percent", "master/disk_used", "master/disk_total", "master/disk_revocable_percent",
|
||||
"master/disk_revocable_total", "master/disk_revocable_used", "master/mem_percent",
|
||||
"master/mem_used", "master/mem_total", "master/mem_revocable_percent", "master/mem_revocable_total",
|
||||
"master/mem_revocable_used", "master/elected", "master/uptime_secs", "system/cpus_total",
|
||||
"system/load_15min", "system/load_5min", "system/load_1min", "system/mem_free_bytes",
|
||||
"system/mem_total_bytes", "master/slave_registrations", "master/slave_removals",
|
||||
"master/slave_reregistrations", "master/slave_shutdowns_scheduled", "master/slave_shutdowns_canceled",
|
||||
"master/slave_shutdowns_completed", "master/slaves_active", "master/slaves_connected",
|
||||
"master/slaves_disconnected", "master/slaves_inactive", "master/frameworks_active",
|
||||
"master/frameworks_connected", "master/frameworks_disconnected", "master/frameworks_inactive",
|
||||
"master/outstanding_offers", "master/tasks_error", "master/tasks_failed", "master/tasks_finished",
|
||||
"master/tasks_killed", "master/tasks_lost", "master/tasks_running", "master/tasks_staging",
|
||||
"master/tasks_starting", "master/invalid_executor_to_framework_messages", "master/invalid_framework_to_executor_messages",
|
||||
"master/invalid_status_update_acknowledgements", "master/invalid_status_updates",
|
||||
"master/dropped_messages", "master/messages_authenticate", "master/messages_deactivate_framework",
|
||||
"master/messages_decline_offers", "master/messages_executor_to_framework", "master/messages_exited_executor",
|
||||
"master/messages_framework_to_executor", "master/messages_kill_task", "master/messages_launch_tasks",
|
||||
"master/messages_reconcile_tasks", "master/messages_register_framework", "master/messages_register_slave",
|
||||
"master/messages_reregister_framework", "master/messages_reregister_slave", "master/messages_resource_request",
|
||||
"master/messages_revive_offers", "master/messages_status_update", "master/messages_status_update_acknowledgement",
|
||||
"master/messages_unregister_framework", "master/messages_unregister_slave", "master/messages_update_slave",
|
||||
"master/recovery_slave_removals", "master/slave_removals/reason_registered", "master/slave_removals/reason_unhealthy",
|
||||
"master/slave_removals/reason_unregistered", "master/valid_framework_to_executor_messages", "master/valid_status_update_acknowledgements",
|
||||
"master/valid_status_updates", "master/task_lost/source_master/reason_invalid_offers",
|
||||
"master/task_lost/source_master/reason_slave_removed", "master/task_lost/source_slave/reason_executor_terminated",
|
||||
"master/valid_executor_to_framework_messages", "master/event_queue_dispatches",
|
||||
"master/event_queue_http_requests", "master/event_queue_messages", "registrar/state_fetch_ms",
|
||||
"registrar/state_store_ms", "registrar/state_store_ms/max", "registrar/state_store_ms/min",
|
||||
"registrar/state_store_ms/p50", "registrar/state_store_ms/p90", "registrar/state_store_ms/p95",
|
||||
"registrar/state_store_ms/p99", "registrar/state_store_ms/p999", "registrar/state_store_ms/p9999"}
|
||||
|
||||
for _, k := range metricNames {
|
||||
mesosMetrics[k] = rand.Float64()
|
||||
}
|
||||
}
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
generateMetrics()
|
||||
r := http.NewServeMux()
|
||||
r.HandleFunc("/metrics/snapshot", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(mesosMetrics)
|
||||
})
|
||||
ts = httptest.NewServer(r)
|
||||
rc := m.Run()
|
||||
ts.Close()
|
||||
os.Exit(rc)
|
||||
}
|
||||
|
||||
func TestMesosMaster(t *testing.T) {
|
||||
var acc testutil.Accumulator
|
||||
|
||||
m := Mesos{
|
||||
Servers: []string{ts.Listener.Addr().String()},
|
||||
}
|
||||
|
||||
err := m.Gather(&acc)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf(err.Error())
|
||||
}
|
||||
|
||||
acc.AssertContainsFields(t, "mesos", mesosMetrics)
|
||||
}
|
||||
|
||||
func TestRemoveGroup(t *testing.T) {
|
||||
j := []string{
|
||||
"resources", "master",
|
||||
"system", "slaves", "frameworks",
|
||||
"tasks", "messages", "evqueue",
|
||||
"messages", "registrar",
|
||||
}
|
||||
|
||||
generateMetrics()
|
||||
|
||||
for _, v := range j {
|
||||
m := Mesos{
|
||||
Blacklist: []string{v},
|
||||
}
|
||||
err := m.removeGroup(&mesosMetrics)
|
||||
if err != nil {
|
||||
t.Errorf("Error removing non-exiting key: %s.", v)
|
||||
}
|
||||
}
|
||||
|
||||
if len(mesosMetrics) > 0 {
|
||||
t.Error("Keys were left at slice sample")
|
||||
}
|
||||
|
||||
m := Mesos{
|
||||
Blacklist: []string{"fail"},
|
||||
}
|
||||
|
||||
if err := m.removeGroup(&mesosMetrics); err == nil {
|
||||
t.Errorf("Key %s should have returned error.", m.Blacklist[0])
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue