0.3.0 Removing internal parallelism: twemproxy and rabbitmq

This commit is contained in:
Cameron Sparr 2015-12-19 20:26:18 -07:00
parent 148b93bafb
commit 28007557cb
2 changed files with 43 additions and 93 deletions

View File

@ -14,17 +14,13 @@ const DefaultUsername = "guest"
const DefaultPassword = "guest" const DefaultPassword = "guest"
const DefaultURL = "http://localhost:15672" const DefaultURL = "http://localhost:15672"
type Server struct { type RabbitMQ struct {
URL string URL string
Name string Name string
Username string Username string
Password string Password string
Nodes []string Nodes []string
Queues []string Queues []string
}
type RabbitMQ struct {
Servers []*Server
Client *http.Client Client *http.Client
} }
@ -95,15 +91,13 @@ type Node struct {
SocketsUsed int64 `json:"sockets_used"` SocketsUsed int64 `json:"sockets_used"`
} }
type gatherFunc func(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) type gatherFunc func(r *RabbitMQ, acc plugins.Accumulator, errChan chan error)
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues} var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues}
var sampleConfig = ` var sampleConfig = `
# Specify servers via an array of tables url = "http://localhost:15672" # required
[[plugins.rabbitmq.servers]]
# name = "rmq-server-1" # optional tag # name = "rmq-server-1" # optional tag
# url = "http://localhost:15672"
# username = "guest" # username = "guest"
# password = "guest" # password = "guest"
@ -120,27 +114,18 @@ func (r *RabbitMQ) Description() string {
return "Read metrics from one or many RabbitMQ servers via the management API" return "Read metrics from one or many RabbitMQ servers via the management API"
} }
var localhost = &Server{URL: DefaultURL}
func (r *RabbitMQ) Gather(acc plugins.Accumulator) error { func (r *RabbitMQ) Gather(acc plugins.Accumulator) error {
if r.Client == nil { if r.Client == nil {
r.Client = &http.Client{} r.Client = &http.Client{}
} }
var errChan = make(chan error, len(r.Servers)) var errChan = make(chan error, len(gatherFunctions))
// use localhost is no servers are specified in config
if len(r.Servers) == 0 {
r.Servers = append(r.Servers, localhost)
}
for _, serv := range r.Servers {
for _, f := range gatherFunctions { for _, f := range gatherFunctions {
go f(r, serv, acc, errChan) go f(r, acc, errChan)
}
} }
for i := 1; i <= len(r.Servers)*len(gatherFunctions); i++ { for i := 1; i <= len(gatherFunctions); i++ {
err := <-errChan err := <-errChan
if err != nil { if err != nil {
return err return err
@ -150,20 +135,20 @@ func (r *RabbitMQ) Gather(acc plugins.Accumulator) error {
return nil return nil
} }
func (r *RabbitMQ) requestJSON(serv *Server, u string, target interface{}) error { func (r *RabbitMQ) requestJSON(u string, target interface{}) error {
u = fmt.Sprintf("%s%s", serv.URL, u) u = fmt.Sprintf("%s%s", r.URL, u)
req, err := http.NewRequest("GET", u, nil) req, err := http.NewRequest("GET", u, nil)
if err != nil { if err != nil {
return err return err
} }
username := serv.Username username := r.Username
if username == "" { if username == "" {
username = DefaultUsername username = DefaultUsername
} }
password := serv.Password password := r.Password
if password == "" { if password == "" {
password = DefaultPassword password = DefaultPassword
} }
@ -182,10 +167,10 @@ func (r *RabbitMQ) requestJSON(serv *Server, u string, target interface{}) error
return nil return nil
} }
func gatherOverview(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) { func gatherOverview(r *RabbitMQ, acc plugins.Accumulator, errChan chan error) {
overview := &OverviewResponse{} overview := &OverviewResponse{}
err := r.requestJSON(serv, "/api/overview", &overview) err := r.requestJSON("/api/overview", &overview)
if err != nil { if err != nil {
errChan <- err errChan <- err
return return
@ -196,9 +181,9 @@ func gatherOverview(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan
return return
} }
tags := map[string]string{"url": serv.URL} tags := map[string]string{"url": r.URL}
if serv.Name != "" { if r.Name != "" {
tags["name"] = serv.Name tags["name"] = r.Name
} }
fields := map[string]interface{}{ fields := map[string]interface{}{
"messages": overview.QueueTotals.Messages, "messages": overview.QueueTotals.Messages,
@ -218,10 +203,10 @@ func gatherOverview(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan
errChan <- nil errChan <- nil
} }
func gatherNodes(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) { func gatherNodes(r *RabbitMQ, acc plugins.Accumulator, errChan chan error) {
nodes := make([]Node, 0) nodes := make([]Node, 0)
// Gather information about nodes // Gather information about nodes
err := r.requestJSON(serv, "/api/nodes", &nodes) err := r.requestJSON("/api/nodes", &nodes)
if err != nil { if err != nil {
errChan <- err errChan <- err
return return
@ -229,11 +214,11 @@ func gatherNodes(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan cha
now := time.Now() now := time.Now()
for _, node := range nodes { for _, node := range nodes {
if !shouldGatherNode(node, serv) { if !r.shouldGatherNode(node) {
continue continue
} }
tags := map[string]string{"url": serv.URL} tags := map[string]string{"url": r.URL}
tags["node"] = node.Name tags["node"] = node.Name
fields := map[string]interface{}{ fields := map[string]interface{}{
@ -255,21 +240,21 @@ func gatherNodes(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan cha
errChan <- nil errChan <- nil
} }
func gatherQueues(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) { func gatherQueues(r *RabbitMQ, acc plugins.Accumulator, errChan chan error) {
// Gather information about queues // Gather information about queues
queues := make([]Queue, 0) queues := make([]Queue, 0)
err := r.requestJSON(serv, "/api/queues", &queues) err := r.requestJSON("/api/queues", &queues)
if err != nil { if err != nil {
errChan <- err errChan <- err
return return
} }
for _, queue := range queues { for _, queue := range queues {
if !shouldGatherQueue(queue, serv) { if !r.shouldGatherQueue(queue) {
continue continue
} }
tags := map[string]string{ tags := map[string]string{
"url": serv.URL, "url": r.URL,
"queue": queue.Name, "queue": queue.Name,
"vhost": queue.Vhost, "vhost": queue.Vhost,
"node": queue.Node, "node": queue.Node,
@ -306,12 +291,12 @@ func gatherQueues(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan ch
errChan <- nil errChan <- nil
} }
func shouldGatherNode(node Node, serv *Server) bool { func (r *RabbitMQ) shouldGatherNode(node Node) bool {
if len(serv.Nodes) == 0 { if len(r.Nodes) == 0 {
return true return true
} }
for _, name := range serv.Nodes { for _, name := range r.Nodes {
if name == node.Name { if name == node.Name {
return true return true
} }
@ -320,12 +305,12 @@ func shouldGatherNode(node Node, serv *Server) bool {
return false return false
} }
func shouldGatherQueue(queue Queue, serv *Server) bool { func (r *RabbitMQ) shouldGatherQueue(queue Queue) bool {
if len(serv.Queues) == 0 { if len(r.Queues) == 0 {
return true return true
} }
for _, name := range serv.Queues { for _, name := range r.Queues {
if name == queue.Name { if name == queue.Name {
return true return true
} }

View File

@ -5,24 +5,17 @@ import (
"errors" "errors"
"io/ioutil" "io/ioutil"
"net" "net"
"strings"
"sync"
"time" "time"
"github.com/influxdb/telegraf/plugins" "github.com/influxdb/telegraf/plugins"
) )
type Twemproxy struct { type Twemproxy struct {
Instances []TwemproxyInstance
}
type TwemproxyInstance struct {
Addr string Addr string
Pools []string Pools []string
} }
var sampleConfig = ` var sampleConfig = `
[[plugins.twemproxy.instances]]
# Twemproxy stats address and port (no scheme) # Twemproxy stats address and port (no scheme)
addr = "localhost:22222" addr = "localhost:22222"
# Monitor pool name # Monitor pool name
@ -39,35 +32,7 @@ func (t *Twemproxy) Description() string {
// Gather data from all Twemproxy instances // Gather data from all Twemproxy instances
func (t *Twemproxy) Gather(acc plugins.Accumulator) error { func (t *Twemproxy) Gather(acc plugins.Accumulator) error {
var wg sync.WaitGroup conn, err := net.DialTimeout("tcp", t.Addr, 1*time.Second)
errorChan := make(chan error, len(t.Instances))
for _, inst := range t.Instances {
wg.Add(1)
go func(inst TwemproxyInstance) {
defer wg.Done()
if err := inst.Gather(acc); err != nil {
errorChan <- err
}
}(inst)
}
wg.Wait()
close(errorChan)
errs := []string{}
for err := range errorChan {
errs = append(errs, err.Error())
}
if len(errs) == 0 {
return nil
}
return errors.New(strings.Join(errs, "\n"))
}
// Gather data from one Twemproxy
func (ti *TwemproxyInstance) Gather(
acc plugins.Accumulator,
) error {
conn, err := net.DialTimeout("tcp", ti.Addr, 1*time.Second)
if err != nil { if err != nil {
return err return err
} }
@ -82,14 +47,14 @@ func (ti *TwemproxyInstance) Gather(
} }
tags := make(map[string]string) tags := make(map[string]string)
tags["twemproxy"] = ti.Addr tags["twemproxy"] = t.Addr
ti.processStat(acc, tags, stats) t.processStat(acc, tags, stats)
return nil return nil
} }
// Process Twemproxy server stats // Process Twemproxy server stats
func (ti *TwemproxyInstance) processStat( func (t *Twemproxy) processStat(
acc plugins.Accumulator, acc plugins.Accumulator,
tags map[string]string, tags map[string]string,
data map[string]interface{}, data map[string]interface{},
@ -111,19 +76,19 @@ func (ti *TwemproxyInstance) processStat(
} }
acc.AddFields("twemproxy", fields, tags) acc.AddFields("twemproxy", fields, tags)
for _, pool := range ti.Pools { for _, pool := range t.Pools {
if poolStat, ok := data[pool]; ok { if poolStat, ok := data[pool]; ok {
if data, ok := poolStat.(map[string]interface{}); ok { if data, ok := poolStat.(map[string]interface{}); ok {
poolTags := copyTags(tags) poolTags := copyTags(tags)
poolTags["pool"] = pool poolTags["pool"] = pool
ti.processPool(acc, poolTags, data) t.processPool(acc, poolTags, data)
} }
} }
} }
} }
// Process pool data in Twemproxy stats // Process pool data in Twemproxy stats
func (ti *TwemproxyInstance) processPool( func (t *Twemproxy) processPool(
acc plugins.Accumulator, acc plugins.Accumulator,
tags map[string]string, tags map[string]string,
data map[string]interface{}, data map[string]interface{},
@ -143,7 +108,7 @@ func (ti *TwemproxyInstance) processPool(
serverTags[key] = copyTags(tags) serverTags[key] = copyTags(tags)
serverTags[key]["server"] = key serverTags[key]["server"] = key
} }
ti.processServer(acc, serverTags[key], data) t.processServer(acc, serverTags[key], data)
} }
} }
} }
@ -151,7 +116,7 @@ func (ti *TwemproxyInstance) processPool(
} }
// Process backend server(redis/memcached) stats // Process backend server(redis/memcached) stats
func (ti *TwemproxyInstance) processServer( func (t *Twemproxy) processServer(
acc plugins.Accumulator, acc plugins.Accumulator,
tags map[string]string, tags map[string]string,
data map[string]interface{}, data map[string]interface{},