plugin(mesos): Reversed removeGroup()

* Now the user selects what to push instead of what not
* Required to check and improve tests
* Missing checks in the code when MetricsCol is empty
This commit is contained in:
Sergio Jimenez 2016-02-03 03:31:39 +01:00
parent 1d50d62a79
commit 52b329be4e
2 changed files with 80 additions and 77 deletions

View File

@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"io/ioutil" "io/ioutil"
"log"
"net" "net"
"net/http" "net/http"
"strings" "strings"
@ -17,10 +18,55 @@ import (
type Mesos struct { type Mesos struct {
Timeout string Timeout string
Servers []string Servers []string
Blacklist []string MetricsCol []string `toml:"metrics_collection"`
} }
func masterBlocks(g string) ([]string, error) { // SampleConfig returns a sample configuration block
func (m *Mesos) SampleConfig() string {
return sampleConfig
}
// Description just returns a short description of the Mesos plugin
func (m *Mesos) Description() string {
return "Telegraf plugin for gathering metrics from N Mesos masters"
}
func (m *Mesos) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
var errorChannel chan error
if len(m.Servers) == 0 {
m.Servers = []string{"localhost:5050"}
}
errorChannel = make(chan error, len(m.Servers)*2)
for _, v := range m.Servers {
wg.Add(1)
go func() {
errorChannel <- m.gatherMetrics(v, acc)
wg.Done()
return
}()
}
wg.Wait()
close(errorChannel)
errorStrings := []string{}
for err := range errorChannel {
if err != nil {
errorStrings = append(errorStrings, err.Error())
}
}
if len(errorStrings) > 0 {
return errors.New(strings.Join(errorStrings, "\n"))
}
return nil
}
func masterBlocks(g string) []string {
var m map[string][]string var m map[string][]string
m = make(map[string][]string) m = make(map[string][]string)
@ -153,14 +199,11 @@ func masterBlocks(g string) ([]string, error) {
ret, ok := m[g] ret, ok := m[g]
if !ok { if !ok {
return nil, errors.New("Unknown group:" + g) log.Println("Unkown metrics group: ", g)
return []string{}
} }
return ret, nil return ret
}
type masterMestrics struct {
resources []string
} }
var sampleConfig = ` var sampleConfig = `
@ -170,68 +213,30 @@ var sampleConfig = `
# The port can be skipped if using the default (5050) # The port can be skipped if using the default (5050)
# Default value is localhost:5050. # Default value is localhost:5050.
servers = ["localhost:5050"] servers = ["localhost:5050"]
blacklist = ["system"] # Metrics groups to be collected.
# Default, all enabled.
metrics_collection = ["resources","master","system","slaves","frameworks","messages","evqueues","registrar"]
` `
// removeGroup(), remove blacklisted groups // removeGroup(), remove blacklisted groups
func (m *Mesos) removeGroup(j *map[string]interface{}) error { func (m *Mesos) removeGroup(j *map[string]interface{}) {
for _, v := range m.Blacklist { var ok bool
ms, err := masterBlocks(v) u := map[string]bool{}
if err != nil {
return err for _, v := range m.MetricsCol {
} for _, k := range masterBlocks(v) {
for _, sv := range ms { u[k] = true
delete((*j), sv) }
}
for k, _ := range u {
if _, ok = (*j)[k]; ok {
delete((*j), k)
} }
} }
return nil
}
// SampleConfig returns a sample configuration block
func (m *Mesos) SampleConfig() string {
return sampleConfig
}
// Description just returns a short description of the Mesos plugin
func (m *Mesos) Description() string {
return "Telegraf plugin for gathering metrics from N Mesos masters"
}
func (m *Mesos) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
var errorChannel chan error
if len(m.Servers) == 0 {
m.Servers = []string{"localhost:5050"}
}
errorChannel = make(chan error, len(m.Servers)*2)
for _, v := range m.Servers {
wg.Add(1)
go func() {
errorChannel <- m.gatherMetrics(v, acc)
wg.Done()
return
}()
}
wg.Wait()
close(errorChannel)
errorStrings := []string{}
for err := range errorChannel {
if err != nil {
errorStrings = append(errorStrings, err.Error())
}
}
if len(errorStrings) > 0 {
return errors.New(strings.Join(errorStrings, "\n"))
}
return nil
} }
// This should not belong to the object
func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error { func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error {
var jsonOut map[string]interface{} var jsonOut map[string]interface{}
@ -262,9 +267,9 @@ func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error {
return errors.New("Error decoding JSON response") return errors.New("Error decoding JSON response")
} }
if len(m.Blacklist) > 0 { //if len(m.Blacklist) > 0 {
m.removeGroup(&jsonOut) // m.removeGroup(&jsonOut)
} //}
jf := internal.JSONFlattener{} jf := internal.JSONFlattener{}

View File

@ -86,6 +86,9 @@ func TestMesosMaster(t *testing.T) {
} }
func TestRemoveGroup(t *testing.T) { func TestRemoveGroup(t *testing.T) {
//t.Skip("needs refactoring")
// FIXME: removeGroup() behavior is the opposite as it was,
// this test has to be refactored
j := []string{ j := []string{
"resources", "master", "resources", "master",
"system", "slaves", "frameworks", "system", "slaves", "frameworks",
@ -97,23 +100,18 @@ func TestRemoveGroup(t *testing.T) {
for _, v := range j { for _, v := range j {
m := Mesos{ m := Mesos{
Blacklist: []string{v}, MetricsCol: []string{v},
}
m.removeGroup(&mesosMetrics)
for _, x := range masterBlocks(v) {
if _, ok := mesosMetrics[x]; ok {
t.Errorf("Found key %s, it should be gone.", x)
} }
err := m.removeGroup(&mesosMetrics)
if err != nil {
t.Errorf("Error removing non-exiting key: %s.", v)
} }
} }
if len(mesosMetrics) > 0 { if len(mesosMetrics) > 0 {
t.Error("Keys were left at slice sample") t.Error("Keys were left at slice sample")
} }
//Test for wrong keys
m := Mesos{
Blacklist: []string{"fail"},
}
if err := m.removeGroup(&mesosMetrics); err == nil {
t.Errorf("Key %s should have returned error.", m.Blacklist[0])
}
} }