telegraf/plugins/inputs/mesos/mesos.go

331 lines
7.4 KiB
Go

package mesos
import (
"encoding/json"
"errors"
"io/ioutil"
"log"
"net"
"net/http"
"strconv"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
)
type Mesos struct {
Timeout int
Masters []string
MasterCols []string `toml:"master_collections"`
}
var defaultMetrics = []string{
"resources", "master", "system", "slaves", "frameworks",
"tasks", "messages", "evqueue", "messages", "registrar",
}
var sampleConfig = `
# Timeout, in ms.
timeout = 100
# A list of Mesos masters, default value is localhost:5050.
masters = ["localhost:5050"]
# Metrics groups to be collected, by default, all enabled.
master_collections = ["resources","master","system","slaves","frameworks","messages","evqueue","registrar"]
`
// SampleConfig returns a sample configuration block
func (m *Mesos) SampleConfig() string {
return sampleConfig
}
// Description just returns a short description of the Mesos plugin
func (m *Mesos) Description() string {
return "Telegraf plugin for gathering metrics from N Mesos masters"
}
// Gather() metrics from given list of Mesos Masters
func (m *Mesos) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
var errorChannel chan error
if len(m.Masters) == 0 {
m.Masters = []string{"localhost:5050"}
}
errorChannel = make(chan error, len(m.Masters)*2)
for _, v := range m.Masters {
wg.Add(1)
go func(c string) {
errorChannel <- m.gatherMetrics(c, acc)
wg.Done()
return
}(v)
}
wg.Wait()
close(errorChannel)
errorStrings := []string{}
// Gather all errors for returning them at once
for err := range errorChannel {
if err != nil {
errorStrings = append(errorStrings, err.Error())
}
}
if len(errorStrings) > 0 {
return errors.New(strings.Join(errorStrings, "\n"))
}
return nil
}
// metricsDiff() returns set names for removal
func metricsDiff(w []string) []string {
b := []string{}
s := make(map[string]bool)
if len(w) == 0 {
return b
}
for _, v := range w {
s[v] = true
}
for _, d := range defaultMetrics {
if _, ok := s[d]; !ok {
b = append(b, d)
}
}
return b
}
// masterBlocks serves as kind of metrics registry groupping them in sets
func masterBlocks(g string) []string {
var m map[string][]string
m = make(map[string][]string)
m["resources"] = []string{
"master/cpus_percent",
"master/cpus_used",
"master/cpus_total",
"master/cpus_revocable_percent",
"master/cpus_revocable_total",
"master/cpus_revocable_used",
"master/disk_percent",
"master/disk_used",
"master/disk_total",
"master/disk_revocable_percent",
"master/disk_revocable_total",
"master/disk_revocable_used",
"master/mem_percent",
"master/mem_used",
"master/mem_total",
"master/mem_revocable_percent",
"master/mem_revocable_total",
"master/mem_revocable_used",
}
m["master"] = []string{
"master/elected",
"master/uptime_secs",
}
m["system"] = []string{
"system/cpus_total",
"system/load_15min",
"system/load_5min",
"system/load_1min",
"system/mem_free_bytes",
"system/mem_total_bytes",
}
m["slaves"] = []string{
"master/slave_registrations",
"master/slave_removals",
"master/slave_reregistrations",
"master/slave_shutdowns_scheduled",
"master/slave_shutdowns_canceled",
"master/slave_shutdowns_completed",
"master/slaves_active",
"master/slaves_connected",
"master/slaves_disconnected",
"master/slaves_inactive",
}
m["frameworks"] = []string{
"master/frameworks_active",
"master/frameworks_connected",
"master/frameworks_disconnected",
"master/frameworks_inactive",
"master/outstanding_offers",
}
m["tasks"] = []string{
"master/tasks_error",
"master/tasks_failed",
"master/tasks_finished",
"master/tasks_killed",
"master/tasks_lost",
"master/tasks_running",
"master/tasks_staging",
"master/tasks_starting",
}
m["messages"] = []string{
"master/invalid_executor_to_framework_messages",
"master/invalid_framework_to_executor_messages",
"master/invalid_status_update_acknowledgements",
"master/invalid_status_updates",
"master/dropped_messages",
"master/messages_authenticate",
"master/messages_deactivate_framework",
"master/messages_decline_offers",
"master/messages_executor_to_framework",
"master/messages_exited_executor",
"master/messages_framework_to_executor",
"master/messages_kill_task",
"master/messages_launch_tasks",
"master/messages_reconcile_tasks",
"master/messages_register_framework",
"master/messages_register_slave",
"master/messages_reregister_framework",
"master/messages_reregister_slave",
"master/messages_resource_request",
"master/messages_revive_offers",
"master/messages_status_update",
"master/messages_status_update_acknowledgement",
"master/messages_unregister_framework",
"master/messages_unregister_slave",
"master/messages_update_slave",
"master/recovery_slave_removals",
"master/slave_removals/reason_registered",
"master/slave_removals/reason_unhealthy",
"master/slave_removals/reason_unregistered",
"master/valid_framework_to_executor_messages",
"master/valid_status_update_acknowledgements",
"master/valid_status_updates",
"master/task_lost/source_master/reason_invalid_offers",
"master/task_lost/source_master/reason_slave_removed",
"master/task_lost/source_slave/reason_executor_terminated",
"master/valid_executor_to_framework_messages",
}
m["evqueue"] = []string{
"master/event_queue_dispatches",
"master/event_queue_http_requests",
"master/event_queue_messages",
}
m["registrar"] = []string{
"registrar/state_fetch_ms",
"registrar/state_store_ms",
"registrar/state_store_ms/max",
"registrar/state_store_ms/min",
"registrar/state_store_ms/p50",
"registrar/state_store_ms/p90",
"registrar/state_store_ms/p95",
"registrar/state_store_ms/p99",
"registrar/state_store_ms/p999",
"registrar/state_store_ms/p9999",
}
ret, ok := m[g]
if !ok {
log.Println("[mesos] Unkown metrics group: ", g)
return []string{}
}
return ret
}
// removeGroup(), remove unwanted sets
func (m *Mesos) removeGroup(j *map[string]interface{}) {
var ok bool
b := metricsDiff(m.MasterCols)
for _, k := range b {
for _, v := range masterBlocks(k) {
if _, ok = (*j)[v]; ok {
delete((*j), v)
}
}
}
}
var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}
var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
// This should not belong to the object
func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error {
var jsonOut map[string]interface{}
host, _, err := net.SplitHostPort(a)
if err != nil {
host = a
a = a + ":5050"
}
tags := map[string]string{
"server": host,
}
if m.Timeout == 0 {
log.Println("[mesos] Missing timeout value, setting default value (100ms)")
m.Timeout = 100
}
ts := strconv.Itoa(m.Timeout) + "ms"
resp, err := client.Get("http://" + a + "/metrics/snapshot?timeout=" + ts)
if err != nil {
return err
}
data, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return err
}
if err = json.Unmarshal([]byte(data), &jsonOut); err != nil {
return errors.New("Error decoding JSON response")
}
m.removeGroup(&jsonOut)
jf := jsonparser.JSONFlattener{}
err = jf.FlattenJSON("", jsonOut)
if err != nil {
return err
}
acc.AddFields("mesos", jf.Fields, tags)
return nil
}
func init() {
inputs.Add("mesos", func() telegraf.Input {
return &Mesos{}
})
}