From 41374aabcbd21860103570e3967b88b711000de9 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Sat, 19 Dec 2015 15:29:07 -0700 Subject: [PATCH] 0.3.0 Removing internal parallelism: httpjson and exec --- plugins/exec/exec.go | 119 ++++++++--------------------------- plugins/httpjson/httpjson.go | 86 +++++++++++-------------- plugins/jolokia/jolokia.go | 11 ---- 3 files changed, 62 insertions(+), 154 deletions(-) diff --git a/plugins/exec/exec.go b/plugins/exec/exec.go index 1571b6bf4..87a8cc72f 100644 --- a/plugins/exec/exec.go +++ b/plugins/exec/exec.go @@ -3,13 +3,8 @@ package exec import ( "bytes" "encoding/json" - "errors" "fmt" - "math" "os/exec" - "strings" - "sync" - "time" "github.com/gonuts/go-shellquote" @@ -18,47 +13,28 @@ import ( ) const sampleConfig = ` - # specify commands via an array of tables - [[plugins.exec.commands]] # the command to run command = "/usr/bin/mycollector --foo=bar" # name of the command (used as a prefix for measurements) name = "mycollector" - - # Only run this command if it has been at least this many - # seconds since it last ran - interval = 10 ` type Exec struct { - Commands []*Command - runner Runner - clock Clock -} + Command string + Name string -type Command struct { - Command string - Name string - Interval int - lastRunAt time.Time + runner Runner } type Runner interface { - Run(*Command) ([]byte, error) -} - -type Clock interface { - Now() time.Time + Run(*Exec) ([]byte, error) } type CommandRunner struct{} -type RealClock struct{} - -func (c CommandRunner) Run(command *Command) ([]byte, error) { - command.lastRunAt = time.Now() - split_cmd, err := shellquote.Split(command.Command) +func (c CommandRunner) Run(e *Exec) ([]byte, error) { + split_cmd, err := shellquote.Split(e.Command) if err != nil || len(split_cmd) == 0 { return nil, fmt.Errorf("exec: unable to parse command, %s", err) } @@ -68,18 +44,14 @@ func (c CommandRunner) Run(command *Command) ([]byte, error) { cmd.Stdout = &out if err := cmd.Run(); err != nil { - return nil, fmt.Errorf("exec: %s for command '%s'", err, command.Command) + return nil, fmt.Errorf("exec: %s for command '%s'", err, e.Command) } return out.Bytes(), nil } -func (c RealClock) Now() time.Time { - return time.Now() -} - func NewExec() *Exec { - return &Exec{runner: CommandRunner{}, clock: RealClock{}} + return &Exec{runner: CommandRunner{}} } func (e *Exec) SampleConfig() string { @@ -91,72 +63,31 @@ func (e *Exec) Description() string { } func (e *Exec) Gather(acc plugins.Accumulator) error { - var wg sync.WaitGroup - - errorChannel := make(chan error, len(e.Commands)) - - for _, c := range e.Commands { - wg.Add(1) - go func(c *Command, acc plugins.Accumulator) { - defer wg.Done() - err := e.gatherCommand(c, acc) - if err != nil { - errorChannel <- err - } - }(c, acc) + out, err := e.runner.Run(e) + if err != nil { + return err } - wg.Wait() - close(errorChannel) - - // Get all errors and return them as one giant error - errorStrings := []string{} - for err := range errorChannel { - errorStrings = append(errorStrings, err.Error()) + var jsonOut interface{} + err = json.Unmarshal(out, &jsonOut) + if err != nil { + return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s", + e.Command, err) } - if len(errorStrings) == 0 { - return nil + f := internal.JSONFlattener{} + err = f.FlattenJSON("", jsonOut) + if err != nil { + return err } - return errors.New(strings.Join(errorStrings, "\n")) -} -func (e *Exec) gatherCommand(c *Command, acc plugins.Accumulator) error { - secondsSinceLastRun := 0.0 - - if c.lastRunAt.Unix() == 0 { // means time is uninitialized - secondsSinceLastRun = math.Inf(1) + var msrmnt_name string + if e.Name == "" { + msrmnt_name = "exec" } else { - secondsSinceLastRun = (e.clock.Now().Sub(c.lastRunAt)).Seconds() - } - - if secondsSinceLastRun >= float64(c.Interval) { - out, err := e.runner.Run(c) - if err != nil { - return err - } - - var jsonOut interface{} - err = json.Unmarshal(out, &jsonOut) - if err != nil { - return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s", - c.Command, err) - } - - f := internal.JSONFlattener{} - err = f.FlattenJSON("", jsonOut) - if err != nil { - return err - } - - var msrmnt_name string - if c.Name == "" { - msrmnt_name = "exec" - } else { - msrmnt_name = "exec_" + c.Name - } - acc.AddFields(msrmnt_name, f.Fields, nil) + msrmnt_name = "exec_" + e.Name } + acc.AddFields(msrmnt_name, f.Fields, nil) return nil } diff --git a/plugins/httpjson/httpjson.go b/plugins/httpjson/httpjson.go index e2b44b7a9..b89f83576 100644 --- a/plugins/httpjson/httpjson.go +++ b/plugins/httpjson/httpjson.go @@ -15,16 +15,12 @@ import ( ) type HttpJson struct { - Services []Service - client HTTPClient -} - -type Service struct { Name string Servers []string Method string TagKeys []string Parameters map[string]string + client HTTPClient } type HTTPClient interface { @@ -48,31 +44,28 @@ func (c RealHTTPClient) MakeRequest(req *http.Request) (*http.Response, error) { } var sampleConfig = ` - # Specify services via an array of tables - [[plugins.httpjson.services]] + # a name for the service being polled + name = "webserver_stats" - # a name for the service being polled - name = "webserver_stats" + # URL of each server in the service's cluster + servers = [ + "http://localhost:9999/stats/", + "http://localhost:9998/stats/", + ] - # URL of each server in the service's cluster - servers = [ - "http://localhost:9999/stats/", - "http://localhost:9998/stats/", - ] + # HTTP method to use (case-sensitive) + method = "GET" - # HTTP method to use (case-sensitive) - method = "GET" + # List of tag names to extract from top-level of JSON server response + # tag_keys = [ + # "my_tag_1", + # "my_tag_2" + # ] - # List of tag names to extract from top-level of JSON server response - # tag_keys = [ - # "my_tag_1", - # "my_tag_2" - # ] - - # HTTP parameters (all values must be strings) - [plugins.httpjson.services.parameters] - event_type = "cpu_spike" - threshold = "0.75" + # HTTP parameters (all values must be strings) + [plugins.httpjson.parameters] + event_type = "cpu_spike" + threshold = "0.75" ` func (h *HttpJson) SampleConfig() string { @@ -87,22 +80,16 @@ func (h *HttpJson) Description() string { func (h *HttpJson) Gather(acc plugins.Accumulator) error { var wg sync.WaitGroup - totalServers := 0 - for _, service := range h.Services { - totalServers += len(service.Servers) - } - errorChannel := make(chan error, totalServers) + errorChannel := make(chan error, len(h.Servers)) - for _, service := range h.Services { - for _, server := range service.Servers { - wg.Add(1) - go func(service Service, server string) { - defer wg.Done() - if err := h.gatherServer(acc, service, server); err != nil { - errorChannel <- err - } - }(service, server) - } + for _, server := range h.Servers { + wg.Add(1) + go func(server string) { + defer wg.Done() + if err := h.gatherServer(acc, server); err != nil { + errorChannel <- err + } + }(server) } wg.Wait() @@ -130,10 +117,9 @@ func (h *HttpJson) Gather(acc plugins.Accumulator) error { // error: Any error that may have occurred func (h *HttpJson) gatherServer( acc plugins.Accumulator, - service Service, serverURL string, ) error { - resp, err := h.sendRequest(service, serverURL) + resp, err := h.sendRequest(serverURL) if err != nil { return err } @@ -147,7 +133,7 @@ func (h *HttpJson) gatherServer( "server": serverURL, } - for _, tag := range service.TagKeys { + for _, tag := range h.TagKeys { switch v := jsonOut[tag].(type) { case string: tags[tag] = v @@ -162,10 +148,10 @@ func (h *HttpJson) gatherServer( } var msrmnt_name string - if service.Name == "" { + if h.Name == "" { msrmnt_name = "httpjson" } else { - msrmnt_name = "httpjson_" + service.Name + msrmnt_name = "httpjson_" + h.Name } acc.AddFields(msrmnt_name, f.Fields, nil) return nil @@ -178,7 +164,7 @@ func (h *HttpJson) gatherServer( // Returns: // string: body of the response // error : Any error that may have occurred -func (h *HttpJson) sendRequest(service Service, serverURL string) (string, error) { +func (h *HttpJson) sendRequest(serverURL string) (string, error) { // Prepare URL requestURL, err := url.Parse(serverURL) if err != nil { @@ -186,21 +172,23 @@ func (h *HttpJson) sendRequest(service Service, serverURL string) (string, error } params := url.Values{} - for k, v := range service.Parameters { + for k, v := range h.Parameters { params.Add(k, v) } requestURL.RawQuery = params.Encode() // Create + send request - req, err := http.NewRequest(service.Method, requestURL.String(), nil) + req, err := http.NewRequest(h.Method, requestURL.String(), nil) if err != nil { return "", err } + defer req.Body.Close() resp, err := h.client.MakeRequest(req) if err != nil { return "", err } + defer resp.Body.Close() defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go index c3241a892..8bebbc3c5 100644 --- a/plugins/jolokia/jolokia.go +++ b/plugins/jolokia/jolokia.go @@ -62,17 +62,6 @@ func (j *Jolokia) SampleConfig() string { [[plugins.jolokia.metrics]] name = "heap_memory_usage" jmx = "/java.lang:type=Memory/HeapMemoryUsage" - - - # This drops the 'committed' value from Eden space measurement - [[plugins.jolokia.metrics]] - name = "memory_eden" - jmx = "/java.lang:type=MemoryPool,name=PS Eden Space/Usage" - - # This passes only DaemonThreadCount and ThreadCount - [[plugins.jolokia.metrics]] - name = "heap_threads" - jmx = "/java.lang:type=Threading" ` }