0.3.0 Removing internal parallelism: twemproxy and rabbitmq
This commit is contained in:
parent
c8914679b7
commit
2e764cb22d
|
@ -14,17 +14,13 @@ const DefaultUsername = "guest"
|
||||||
const DefaultPassword = "guest"
|
const DefaultPassword = "guest"
|
||||||
const DefaultURL = "http://localhost:15672"
|
const DefaultURL = "http://localhost:15672"
|
||||||
|
|
||||||
type Server struct {
|
type RabbitMQ struct {
|
||||||
URL string
|
URL string
|
||||||
Name string
|
Name string
|
||||||
Username string
|
Username string
|
||||||
Password string
|
Password string
|
||||||
Nodes []string
|
Nodes []string
|
||||||
Queues []string
|
Queues []string
|
||||||
}
|
|
||||||
|
|
||||||
type RabbitMQ struct {
|
|
||||||
Servers []*Server
|
|
||||||
|
|
||||||
Client *http.Client
|
Client *http.Client
|
||||||
}
|
}
|
||||||
|
@ -95,15 +91,13 @@ type Node struct {
|
||||||
SocketsUsed int64 `json:"sockets_used"`
|
SocketsUsed int64 `json:"sockets_used"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type gatherFunc func(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error)
|
type gatherFunc func(r *RabbitMQ, acc plugins.Accumulator, errChan chan error)
|
||||||
|
|
||||||
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues}
|
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues}
|
||||||
|
|
||||||
var sampleConfig = `
|
var sampleConfig = `
|
||||||
# Specify servers via an array of tables
|
url = "http://localhost:15672" # required
|
||||||
[[plugins.rabbitmq.servers]]
|
|
||||||
# name = "rmq-server-1" # optional tag
|
# name = "rmq-server-1" # optional tag
|
||||||
# url = "http://localhost:15672"
|
|
||||||
# username = "guest"
|
# username = "guest"
|
||||||
# password = "guest"
|
# password = "guest"
|
||||||
|
|
||||||
|
@ -120,27 +114,18 @@ func (r *RabbitMQ) Description() string {
|
||||||
return "Read metrics from one or many RabbitMQ servers via the management API"
|
return "Read metrics from one or many RabbitMQ servers via the management API"
|
||||||
}
|
}
|
||||||
|
|
||||||
var localhost = &Server{URL: DefaultURL}
|
|
||||||
|
|
||||||
func (r *RabbitMQ) Gather(acc plugins.Accumulator) error {
|
func (r *RabbitMQ) Gather(acc plugins.Accumulator) error {
|
||||||
if r.Client == nil {
|
if r.Client == nil {
|
||||||
r.Client = &http.Client{}
|
r.Client = &http.Client{}
|
||||||
}
|
}
|
||||||
|
|
||||||
var errChan = make(chan error, len(r.Servers))
|
var errChan = make(chan error, len(gatherFunctions))
|
||||||
|
|
||||||
// use localhost is no servers are specified in config
|
for _, f := range gatherFunctions {
|
||||||
if len(r.Servers) == 0 {
|
go f(r, acc, errChan)
|
||||||
r.Servers = append(r.Servers, localhost)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, serv := range r.Servers {
|
for i := 1; i <= len(gatherFunctions); i++ {
|
||||||
for _, f := range gatherFunctions {
|
|
||||||
go f(r, serv, acc, errChan)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 1; i <= len(r.Servers)*len(gatherFunctions); i++ {
|
|
||||||
err := <-errChan
|
err := <-errChan
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -150,20 +135,20 @@ func (r *RabbitMQ) Gather(acc plugins.Accumulator) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RabbitMQ) requestJSON(serv *Server, u string, target interface{}) error {
|
func (r *RabbitMQ) requestJSON(u string, target interface{}) error {
|
||||||
u = fmt.Sprintf("%s%s", serv.URL, u)
|
u = fmt.Sprintf("%s%s", r.URL, u)
|
||||||
|
|
||||||
req, err := http.NewRequest("GET", u, nil)
|
req, err := http.NewRequest("GET", u, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
username := serv.Username
|
username := r.Username
|
||||||
if username == "" {
|
if username == "" {
|
||||||
username = DefaultUsername
|
username = DefaultUsername
|
||||||
}
|
}
|
||||||
|
|
||||||
password := serv.Password
|
password := r.Password
|
||||||
if password == "" {
|
if password == "" {
|
||||||
password = DefaultPassword
|
password = DefaultPassword
|
||||||
}
|
}
|
||||||
|
@ -182,10 +167,10 @@ func (r *RabbitMQ) requestJSON(serv *Server, u string, target interface{}) error
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func gatherOverview(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) {
|
func gatherOverview(r *RabbitMQ, acc plugins.Accumulator, errChan chan error) {
|
||||||
overview := &OverviewResponse{}
|
overview := &OverviewResponse{}
|
||||||
|
|
||||||
err := r.requestJSON(serv, "/api/overview", &overview)
|
err := r.requestJSON("/api/overview", &overview)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errChan <- err
|
errChan <- err
|
||||||
return
|
return
|
||||||
|
@ -196,9 +181,9 @@ func gatherOverview(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
tags := map[string]string{"url": serv.URL}
|
tags := map[string]string{"url": r.URL}
|
||||||
if serv.Name != "" {
|
if r.Name != "" {
|
||||||
tags["name"] = serv.Name
|
tags["name"] = r.Name
|
||||||
}
|
}
|
||||||
fields := map[string]interface{}{
|
fields := map[string]interface{}{
|
||||||
"messages": overview.QueueTotals.Messages,
|
"messages": overview.QueueTotals.Messages,
|
||||||
|
@ -218,10 +203,10 @@ func gatherOverview(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan
|
||||||
errChan <- nil
|
errChan <- nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func gatherNodes(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) {
|
func gatherNodes(r *RabbitMQ, acc plugins.Accumulator, errChan chan error) {
|
||||||
nodes := make([]Node, 0)
|
nodes := make([]Node, 0)
|
||||||
// Gather information about nodes
|
// Gather information about nodes
|
||||||
err := r.requestJSON(serv, "/api/nodes", &nodes)
|
err := r.requestJSON("/api/nodes", &nodes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errChan <- err
|
errChan <- err
|
||||||
return
|
return
|
||||||
|
@ -229,11 +214,11 @@ func gatherNodes(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan cha
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
if !shouldGatherNode(node, serv) {
|
if !r.shouldGatherNode(node) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
tags := map[string]string{"url": serv.URL}
|
tags := map[string]string{"url": r.URL}
|
||||||
tags["node"] = node.Name
|
tags["node"] = node.Name
|
||||||
|
|
||||||
fields := map[string]interface{}{
|
fields := map[string]interface{}{
|
||||||
|
@ -255,21 +240,21 @@ func gatherNodes(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan cha
|
||||||
errChan <- nil
|
errChan <- nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func gatherQueues(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) {
|
func gatherQueues(r *RabbitMQ, acc plugins.Accumulator, errChan chan error) {
|
||||||
// Gather information about queues
|
// Gather information about queues
|
||||||
queues := make([]Queue, 0)
|
queues := make([]Queue, 0)
|
||||||
err := r.requestJSON(serv, "/api/queues", &queues)
|
err := r.requestJSON("/api/queues", &queues)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errChan <- err
|
errChan <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, queue := range queues {
|
for _, queue := range queues {
|
||||||
if !shouldGatherQueue(queue, serv) {
|
if !r.shouldGatherQueue(queue) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
tags := map[string]string{
|
tags := map[string]string{
|
||||||
"url": serv.URL,
|
"url": r.URL,
|
||||||
"queue": queue.Name,
|
"queue": queue.Name,
|
||||||
"vhost": queue.Vhost,
|
"vhost": queue.Vhost,
|
||||||
"node": queue.Node,
|
"node": queue.Node,
|
||||||
|
@ -306,12 +291,12 @@ func gatherQueues(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan ch
|
||||||
errChan <- nil
|
errChan <- nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func shouldGatherNode(node Node, serv *Server) bool {
|
func (r *RabbitMQ) shouldGatherNode(node Node) bool {
|
||||||
if len(serv.Nodes) == 0 {
|
if len(r.Nodes) == 0 {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, name := range serv.Nodes {
|
for _, name := range r.Nodes {
|
||||||
if name == node.Name {
|
if name == node.Name {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@ -320,12 +305,12 @@ func shouldGatherNode(node Node, serv *Server) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func shouldGatherQueue(queue Queue, serv *Server) bool {
|
func (r *RabbitMQ) shouldGatherQueue(queue Queue) bool {
|
||||||
if len(serv.Queues) == 0 {
|
if len(r.Queues) == 0 {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, name := range serv.Queues {
|
for _, name := range r.Queues {
|
||||||
if name == queue.Name {
|
if name == queue.Name {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,28 +5,21 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdb/telegraf/plugins"
|
"github.com/influxdb/telegraf/plugins"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Twemproxy struct {
|
type Twemproxy struct {
|
||||||
Instances []TwemproxyInstance
|
|
||||||
}
|
|
||||||
|
|
||||||
type TwemproxyInstance struct {
|
|
||||||
Addr string
|
Addr string
|
||||||
Pools []string
|
Pools []string
|
||||||
}
|
}
|
||||||
|
|
||||||
var sampleConfig = `
|
var sampleConfig = `
|
||||||
[[plugins.twemproxy.instances]]
|
# Twemproxy stats address and port (no scheme)
|
||||||
# Twemproxy stats address and port (no scheme)
|
addr = "localhost:22222"
|
||||||
addr = "localhost:22222"
|
# Monitor pool name
|
||||||
# Monitor pool name
|
pools = ["redis_pool", "mc_pool"]
|
||||||
pools = ["redis_pool", "mc_pool"]
|
|
||||||
`
|
`
|
||||||
|
|
||||||
func (t *Twemproxy) SampleConfig() string {
|
func (t *Twemproxy) SampleConfig() string {
|
||||||
|
@ -39,35 +32,7 @@ func (t *Twemproxy) Description() string {
|
||||||
|
|
||||||
// Gather data from all Twemproxy instances
|
// Gather data from all Twemproxy instances
|
||||||
func (t *Twemproxy) Gather(acc plugins.Accumulator) error {
|
func (t *Twemproxy) Gather(acc plugins.Accumulator) error {
|
||||||
var wg sync.WaitGroup
|
conn, err := net.DialTimeout("tcp", t.Addr, 1*time.Second)
|
||||||
errorChan := make(chan error, len(t.Instances))
|
|
||||||
for _, inst := range t.Instances {
|
|
||||||
wg.Add(1)
|
|
||||||
go func(inst TwemproxyInstance) {
|
|
||||||
defer wg.Done()
|
|
||||||
if err := inst.Gather(acc); err != nil {
|
|
||||||
errorChan <- err
|
|
||||||
}
|
|
||||||
}(inst)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
close(errorChan)
|
|
||||||
errs := []string{}
|
|
||||||
for err := range errorChan {
|
|
||||||
errs = append(errs, err.Error())
|
|
||||||
}
|
|
||||||
if len(errs) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return errors.New(strings.Join(errs, "\n"))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Gather data from one Twemproxy
|
|
||||||
func (ti *TwemproxyInstance) Gather(
|
|
||||||
acc plugins.Accumulator,
|
|
||||||
) error {
|
|
||||||
conn, err := net.DialTimeout("tcp", ti.Addr, 1*time.Second)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -82,14 +47,14 @@ func (ti *TwemproxyInstance) Gather(
|
||||||
}
|
}
|
||||||
|
|
||||||
tags := make(map[string]string)
|
tags := make(map[string]string)
|
||||||
tags["twemproxy"] = ti.Addr
|
tags["twemproxy"] = t.Addr
|
||||||
ti.processStat(acc, tags, stats)
|
t.processStat(acc, tags, stats)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process Twemproxy server stats
|
// Process Twemproxy server stats
|
||||||
func (ti *TwemproxyInstance) processStat(
|
func (t *Twemproxy) processStat(
|
||||||
acc plugins.Accumulator,
|
acc plugins.Accumulator,
|
||||||
tags map[string]string,
|
tags map[string]string,
|
||||||
data map[string]interface{},
|
data map[string]interface{},
|
||||||
|
@ -111,19 +76,19 @@ func (ti *TwemproxyInstance) processStat(
|
||||||
}
|
}
|
||||||
acc.AddFields("twemproxy", fields, tags)
|
acc.AddFields("twemproxy", fields, tags)
|
||||||
|
|
||||||
for _, pool := range ti.Pools {
|
for _, pool := range t.Pools {
|
||||||
if poolStat, ok := data[pool]; ok {
|
if poolStat, ok := data[pool]; ok {
|
||||||
if data, ok := poolStat.(map[string]interface{}); ok {
|
if data, ok := poolStat.(map[string]interface{}); ok {
|
||||||
poolTags := copyTags(tags)
|
poolTags := copyTags(tags)
|
||||||
poolTags["pool"] = pool
|
poolTags["pool"] = pool
|
||||||
ti.processPool(acc, poolTags, data)
|
t.processPool(acc, poolTags, data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process pool data in Twemproxy stats
|
// Process pool data in Twemproxy stats
|
||||||
func (ti *TwemproxyInstance) processPool(
|
func (t *Twemproxy) processPool(
|
||||||
acc plugins.Accumulator,
|
acc plugins.Accumulator,
|
||||||
tags map[string]string,
|
tags map[string]string,
|
||||||
data map[string]interface{},
|
data map[string]interface{},
|
||||||
|
@ -143,7 +108,7 @@ func (ti *TwemproxyInstance) processPool(
|
||||||
serverTags[key] = copyTags(tags)
|
serverTags[key] = copyTags(tags)
|
||||||
serverTags[key]["server"] = key
|
serverTags[key]["server"] = key
|
||||||
}
|
}
|
||||||
ti.processServer(acc, serverTags[key], data)
|
t.processServer(acc, serverTags[key], data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -151,7 +116,7 @@ func (ti *TwemproxyInstance) processPool(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process backend server(redis/memcached) stats
|
// Process backend server(redis/memcached) stats
|
||||||
func (ti *TwemproxyInstance) processServer(
|
func (t *Twemproxy) processServer(
|
||||||
acc plugins.Accumulator,
|
acc plugins.Accumulator,
|
||||||
tags map[string]string,
|
tags map[string]string,
|
||||||
data map[string]interface{},
|
data map[string]interface{},
|
||||||
|
|
Loading…
Reference in New Issue