From 28007557cb5407eb4d4b448525e1aec7cf42a137 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Sat, 19 Dec 2015 20:26:18 -0700 Subject: [PATCH] 0.3.0 Removing internal parallelism: twemproxy and rabbitmq --- plugins/rabbitmq/rabbitmq.go | 75 ++++++++++++++-------------------- plugins/twemproxy/twemproxy.go | 61 ++++++--------------------- 2 files changed, 43 insertions(+), 93 deletions(-) diff --git a/plugins/rabbitmq/rabbitmq.go b/plugins/rabbitmq/rabbitmq.go index 7101ab431..227811bc8 100644 --- a/plugins/rabbitmq/rabbitmq.go +++ b/plugins/rabbitmq/rabbitmq.go @@ -14,17 +14,13 @@ const DefaultUsername = "guest" const DefaultPassword = "guest" const DefaultURL = "http://localhost:15672" -type Server struct { +type RabbitMQ struct { URL string Name string Username string Password string Nodes []string Queues []string -} - -type RabbitMQ struct { - Servers []*Server Client *http.Client } @@ -95,15 +91,13 @@ type Node struct { SocketsUsed int64 `json:"sockets_used"` } -type gatherFunc func(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) +type gatherFunc func(r *RabbitMQ, acc plugins.Accumulator, errChan chan error) var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues} var sampleConfig = ` - # Specify servers via an array of tables - [[plugins.rabbitmq.servers]] + url = "http://localhost:15672" # required # name = "rmq-server-1" # optional tag - # url = "http://localhost:15672" # username = "guest" # password = "guest" @@ -120,27 +114,18 @@ func (r *RabbitMQ) Description() string { return "Read metrics from one or many RabbitMQ servers via the management API" } -var localhost = &Server{URL: DefaultURL} - func (r *RabbitMQ) Gather(acc plugins.Accumulator) error { if r.Client == nil { r.Client = &http.Client{} } - var errChan = make(chan error, len(r.Servers)) + var errChan = make(chan error, len(gatherFunctions)) - // use localhost is no servers are specified in config - if len(r.Servers) == 0 { - r.Servers = append(r.Servers, localhost) + for _, f := range gatherFunctions { + go f(r, acc, errChan) } - for _, serv := range r.Servers { - for _, f := range gatherFunctions { - go f(r, serv, acc, errChan) - } - } - - for i := 1; i <= len(r.Servers)*len(gatherFunctions); i++ { + for i := 1; i <= len(gatherFunctions); i++ { err := <-errChan if err != nil { return err @@ -150,20 +135,20 @@ func (r *RabbitMQ) Gather(acc plugins.Accumulator) error { return nil } -func (r *RabbitMQ) requestJSON(serv *Server, u string, target interface{}) error { - u = fmt.Sprintf("%s%s", serv.URL, u) +func (r *RabbitMQ) requestJSON(u string, target interface{}) error { + u = fmt.Sprintf("%s%s", r.URL, u) req, err := http.NewRequest("GET", u, nil) if err != nil { return err } - username := serv.Username + username := r.Username if username == "" { username = DefaultUsername } - password := serv.Password + password := r.Password if password == "" { password = DefaultPassword } @@ -182,10 +167,10 @@ func (r *RabbitMQ) requestJSON(serv *Server, u string, target interface{}) error return nil } -func gatherOverview(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) { +func gatherOverview(r *RabbitMQ, acc plugins.Accumulator, errChan chan error) { overview := &OverviewResponse{} - err := r.requestJSON(serv, "/api/overview", &overview) + err := r.requestJSON("/api/overview", &overview) if err != nil { errChan <- err return @@ -196,9 +181,9 @@ func gatherOverview(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan return } - tags := map[string]string{"url": serv.URL} - if serv.Name != "" { - tags["name"] = serv.Name + tags := map[string]string{"url": r.URL} + if r.Name != "" { + tags["name"] = r.Name } fields := map[string]interface{}{ "messages": overview.QueueTotals.Messages, @@ -218,10 +203,10 @@ func gatherOverview(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan errChan <- nil } -func gatherNodes(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) { +func gatherNodes(r *RabbitMQ, acc plugins.Accumulator, errChan chan error) { nodes := make([]Node, 0) // Gather information about nodes - err := r.requestJSON(serv, "/api/nodes", &nodes) + err := r.requestJSON("/api/nodes", &nodes) if err != nil { errChan <- err return @@ -229,11 +214,11 @@ func gatherNodes(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan cha now := time.Now() for _, node := range nodes { - if !shouldGatherNode(node, serv) { + if !r.shouldGatherNode(node) { continue } - tags := map[string]string{"url": serv.URL} + tags := map[string]string{"url": r.URL} tags["node"] = node.Name fields := map[string]interface{}{ @@ -255,21 +240,21 @@ func gatherNodes(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan cha errChan <- nil } -func gatherQueues(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) { +func gatherQueues(r *RabbitMQ, acc plugins.Accumulator, errChan chan error) { // Gather information about queues queues := make([]Queue, 0) - err := r.requestJSON(serv, "/api/queues", &queues) + err := r.requestJSON("/api/queues", &queues) if err != nil { errChan <- err return } for _, queue := range queues { - if !shouldGatherQueue(queue, serv) { + if !r.shouldGatherQueue(queue) { continue } tags := map[string]string{ - "url": serv.URL, + "url": r.URL, "queue": queue.Name, "vhost": queue.Vhost, "node": queue.Node, @@ -306,12 +291,12 @@ func gatherQueues(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan ch errChan <- nil } -func shouldGatherNode(node Node, serv *Server) bool { - if len(serv.Nodes) == 0 { +func (r *RabbitMQ) shouldGatherNode(node Node) bool { + if len(r.Nodes) == 0 { return true } - for _, name := range serv.Nodes { + for _, name := range r.Nodes { if name == node.Name { return true } @@ -320,12 +305,12 @@ func shouldGatherNode(node Node, serv *Server) bool { return false } -func shouldGatherQueue(queue Queue, serv *Server) bool { - if len(serv.Queues) == 0 { +func (r *RabbitMQ) shouldGatherQueue(queue Queue) bool { + if len(r.Queues) == 0 { return true } - for _, name := range serv.Queues { + for _, name := range r.Queues { if name == queue.Name { return true } diff --git a/plugins/twemproxy/twemproxy.go b/plugins/twemproxy/twemproxy.go index 1933e8d0d..fe3fb6de5 100644 --- a/plugins/twemproxy/twemproxy.go +++ b/plugins/twemproxy/twemproxy.go @@ -5,28 +5,21 @@ import ( "errors" "io/ioutil" "net" - "strings" - "sync" "time" "github.com/influxdb/telegraf/plugins" ) type Twemproxy struct { - Instances []TwemproxyInstance -} - -type TwemproxyInstance struct { Addr string Pools []string } var sampleConfig = ` - [[plugins.twemproxy.instances]] - # Twemproxy stats address and port (no scheme) - addr = "localhost:22222" - # Monitor pool name - pools = ["redis_pool", "mc_pool"] + # Twemproxy stats address and port (no scheme) + addr = "localhost:22222" + # Monitor pool name + pools = ["redis_pool", "mc_pool"] ` func (t *Twemproxy) SampleConfig() string { @@ -39,35 +32,7 @@ func (t *Twemproxy) Description() string { // Gather data from all Twemproxy instances func (t *Twemproxy) Gather(acc plugins.Accumulator) error { - var wg sync.WaitGroup - errorChan := make(chan error, len(t.Instances)) - for _, inst := range t.Instances { - wg.Add(1) - go func(inst TwemproxyInstance) { - defer wg.Done() - if err := inst.Gather(acc); err != nil { - errorChan <- err - } - }(inst) - } - wg.Wait() - - close(errorChan) - errs := []string{} - for err := range errorChan { - errs = append(errs, err.Error()) - } - if len(errs) == 0 { - return nil - } - return errors.New(strings.Join(errs, "\n")) -} - -// Gather data from one Twemproxy -func (ti *TwemproxyInstance) Gather( - acc plugins.Accumulator, -) error { - conn, err := net.DialTimeout("tcp", ti.Addr, 1*time.Second) + conn, err := net.DialTimeout("tcp", t.Addr, 1*time.Second) if err != nil { return err } @@ -82,14 +47,14 @@ func (ti *TwemproxyInstance) Gather( } tags := make(map[string]string) - tags["twemproxy"] = ti.Addr - ti.processStat(acc, tags, stats) + tags["twemproxy"] = t.Addr + t.processStat(acc, tags, stats) return nil } // Process Twemproxy server stats -func (ti *TwemproxyInstance) processStat( +func (t *Twemproxy) processStat( acc plugins.Accumulator, tags map[string]string, data map[string]interface{}, @@ -111,19 +76,19 @@ func (ti *TwemproxyInstance) processStat( } acc.AddFields("twemproxy", fields, tags) - for _, pool := range ti.Pools { + for _, pool := range t.Pools { if poolStat, ok := data[pool]; ok { if data, ok := poolStat.(map[string]interface{}); ok { poolTags := copyTags(tags) poolTags["pool"] = pool - ti.processPool(acc, poolTags, data) + t.processPool(acc, poolTags, data) } } } } // Process pool data in Twemproxy stats -func (ti *TwemproxyInstance) processPool( +func (t *Twemproxy) processPool( acc plugins.Accumulator, tags map[string]string, data map[string]interface{}, @@ -143,7 +108,7 @@ func (ti *TwemproxyInstance) processPool( serverTags[key] = copyTags(tags) serverTags[key]["server"] = key } - ti.processServer(acc, serverTags[key], data) + t.processServer(acc, serverTags[key], data) } } } @@ -151,7 +116,7 @@ func (ti *TwemproxyInstance) processPool( } // Process backend server(redis/memcached) stats -func (ti *TwemproxyInstance) processServer( +func (t *Twemproxy) processServer( acc plugins.Accumulator, tags map[string]string, data map[string]interface{},