From 46595cc439d8e5dbdc345821ad9d141370d00208 Mon Sep 17 00:00:00 2001 From: Sergio Jimenez Date: Wed, 3 Feb 2016 03:31:39 +0100 Subject: [PATCH] plugin(mesos): Reversed removeGroup() * Now the user selects what to push instead of what not * Required to check and improve tests * Missing checks in the code when MetricsCol is empty --- plugins/inputs/mesos/mesos.go | 135 +++++++++++++++-------------- plugins/inputs/mesos/mesos_test.go | 22 +++-- 2 files changed, 80 insertions(+), 77 deletions(-) diff --git a/plugins/inputs/mesos/mesos.go b/plugins/inputs/mesos/mesos.go index ef1b8269d..88be7c027 100644 --- a/plugins/inputs/mesos/mesos.go +++ b/plugins/inputs/mesos/mesos.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "io/ioutil" + "log" "net" "net/http" "strings" @@ -15,12 +16,57 @@ import ( ) type Mesos struct { - Timeout string - Servers []string - Blacklist []string + Timeout string + Servers []string + MetricsCol []string `toml:"metrics_collection"` } -func masterBlocks(g string) ([]string, error) { +// SampleConfig returns a sample configuration block +func (m *Mesos) SampleConfig() string { + return sampleConfig +} + +// Description just returns a short description of the Mesos plugin +func (m *Mesos) Description() string { + return "Telegraf plugin for gathering metrics from N Mesos masters" +} + +func (m *Mesos) Gather(acc telegraf.Accumulator) error { + var wg sync.WaitGroup + var errorChannel chan error + + if len(m.Servers) == 0 { + m.Servers = []string{"localhost:5050"} + } + + errorChannel = make(chan error, len(m.Servers)*2) + + for _, v := range m.Servers { + wg.Add(1) + go func() { + errorChannel <- m.gatherMetrics(v, acc) + wg.Done() + return + }() + } + + wg.Wait() + close(errorChannel) + errorStrings := []string{} + + for err := range errorChannel { + if err != nil { + errorStrings = append(errorStrings, err.Error()) + } + } + + if len(errorStrings) > 0 { + return errors.New(strings.Join(errorStrings, "\n")) + } + return nil +} + +func masterBlocks(g string) []string { var m map[string][]string m = make(map[string][]string) @@ -153,14 +199,11 @@ func masterBlocks(g string) ([]string, error) { ret, ok := m[g] if !ok { - return nil, errors.New("Unknown group:" + g) + log.Println("Unkown metrics group: ", g) + return []string{} } - return ret, nil -} - -type masterMestrics struct { - resources []string + return ret } var sampleConfig = ` @@ -170,68 +213,30 @@ var sampleConfig = ` # The port can be skipped if using the default (5050) # Default value is localhost:5050. servers = ["localhost:5050"] - blacklist = ["system"] + # Metrics groups to be collected. + # Default, all enabled. + metrics_collection = ["resources","master","system","slaves","frameworks","messages","evqueues","registrar"] ` // removeGroup(), remove blacklisted groups -func (m *Mesos) removeGroup(j *map[string]interface{}) error { - for _, v := range m.Blacklist { - ms, err := masterBlocks(v) - if err != nil { - return err - } - for _, sv := range ms { - delete((*j), sv) - } - } - return nil -} +func (m *Mesos) removeGroup(j *map[string]interface{}) { + var ok bool + u := map[string]bool{} -// SampleConfig returns a sample configuration block -func (m *Mesos) SampleConfig() string { - return sampleConfig -} - -// Description just returns a short description of the Mesos plugin -func (m *Mesos) Description() string { - return "Telegraf plugin for gathering metrics from N Mesos masters" -} - -func (m *Mesos) Gather(acc telegraf.Accumulator) error { - var wg sync.WaitGroup - var errorChannel chan error - - if len(m.Servers) == 0 { - m.Servers = []string{"localhost:5050"} - } - - errorChannel = make(chan error, len(m.Servers)*2) - - for _, v := range m.Servers { - wg.Add(1) - go func() { - errorChannel <- m.gatherMetrics(v, acc) - wg.Done() - return - }() - } - - wg.Wait() - close(errorChannel) - errorStrings := []string{} - - for err := range errorChannel { - if err != nil { - errorStrings = append(errorStrings, err.Error()) + for _, v := range m.MetricsCol { + for _, k := range masterBlocks(v) { + u[k] = true } } - if len(errorStrings) > 0 { - return errors.New(strings.Join(errorStrings, "\n")) + for k, _ := range u { + if _, ok = (*j)[k]; ok { + delete((*j), k) + } } - return nil } +// This should not belong to the object func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error { var jsonOut map[string]interface{} @@ -262,9 +267,9 @@ func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error { return errors.New("Error decoding JSON response") } - if len(m.Blacklist) > 0 { - m.removeGroup(&jsonOut) - } + //if len(m.Blacklist) > 0 { + // m.removeGroup(&jsonOut) + //} jf := internal.JSONFlattener{} diff --git a/plugins/inputs/mesos/mesos_test.go b/plugins/inputs/mesos/mesos_test.go index 0bd9d02cb..1f69e4ebb 100644 --- a/plugins/inputs/mesos/mesos_test.go +++ b/plugins/inputs/mesos/mesos_test.go @@ -86,6 +86,9 @@ func TestMesosMaster(t *testing.T) { } func TestRemoveGroup(t *testing.T) { + //t.Skip("needs refactoring") + // FIXME: removeGroup() behavior is the opposite as it was, + // this test has to be refactored j := []string{ "resources", "master", "system", "slaves", "frameworks", @@ -97,23 +100,18 @@ func TestRemoveGroup(t *testing.T) { for _, v := range j { m := Mesos{ - Blacklist: []string{v}, + MetricsCol: []string{v}, } - err := m.removeGroup(&mesosMetrics) - if err != nil { - t.Errorf("Error removing non-exiting key: %s.", v) + m.removeGroup(&mesosMetrics) + for _, x := range masterBlocks(v) { + if _, ok := mesosMetrics[x]; ok { + t.Errorf("Found key %s, it should be gone.", x) + } } } if len(mesosMetrics) > 0 { t.Error("Keys were left at slice sample") } - - m := Mesos{ - Blacklist: []string{"fail"}, - } - - if err := m.removeGroup(&mesosMetrics); err == nil { - t.Errorf("Key %s should have returned error.", m.Blacklist[0]) - } + //Test for wrong keys }