527 lines
14 KiB
Go
527 lines
14 KiB
Go
package rabbitmq
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/filter"
|
|
"github.com/influxdata/telegraf/internal"
|
|
"github.com/influxdata/telegraf/internal/tls"
|
|
"github.com/influxdata/telegraf/plugins/inputs"
|
|
)
|
|
|
|
// DefaultUsername will set a default value that corrasponds to the default
|
|
// value used by Rabbitmq
|
|
const DefaultUsername = "guest"
|
|
|
|
// DefaultPassword will set a default value that corrasponds to the default
|
|
// value used by Rabbitmq
|
|
const DefaultPassword = "guest"
|
|
|
|
// DefaultURL will set a default value that corrasponds to the default value
|
|
// used by Rabbitmq
|
|
const DefaultURL = "http://localhost:15672"
|
|
|
|
// Default http timeouts
|
|
const DefaultResponseHeaderTimeout = 3
|
|
const DefaultClientTimeout = 4
|
|
|
|
// RabbitMQ defines the configuration necessary for gathering metrics,
|
|
// see the sample config for further details
|
|
type RabbitMQ struct {
|
|
URL string
|
|
Name string
|
|
Username string
|
|
Password string
|
|
tls.ClientConfig
|
|
|
|
ResponseHeaderTimeout internal.Duration `toml:"header_timeout"`
|
|
ClientTimeout internal.Duration `toml:"client_timeout"`
|
|
|
|
Nodes []string
|
|
Queues []string
|
|
Exchanges []string
|
|
|
|
QueueInclude []string `toml:"queue_name_include"`
|
|
QueueExclude []string `toml:"queue_name_exclude"`
|
|
|
|
Client *http.Client
|
|
|
|
filterCreated bool
|
|
excludeEveryQueue bool
|
|
queueFilter filter.Filter
|
|
}
|
|
|
|
// OverviewResponse ...
|
|
type OverviewResponse struct {
|
|
MessageStats *MessageStats `json:"message_stats"`
|
|
ObjectTotals *ObjectTotals `json:"object_totals"`
|
|
QueueTotals *QueueTotals `json:"queue_totals"`
|
|
Listeners []Listeners `json:"listeners"`
|
|
}
|
|
|
|
// Listeners ...
|
|
type Listeners struct {
|
|
Protocol string `json:"protocol"`
|
|
}
|
|
|
|
// Details ...
|
|
type Details struct {
|
|
Rate float64
|
|
}
|
|
|
|
// MessageStats ...
|
|
type MessageStats struct {
|
|
Ack int64
|
|
AckDetails Details `json:"ack_details"`
|
|
Deliver int64
|
|
DeliverDetails Details `json:"deliver_details"`
|
|
DeliverGet int64 `json:"deliver_get"`
|
|
DeliverGetDetails Details `json:"deliver_get_details"`
|
|
Publish int64
|
|
PublishDetails Details `json:"publish_details"`
|
|
Redeliver int64
|
|
RedeliverDetails Details `json:"redeliver_details"`
|
|
PublishIn int64 `json:"publish_in"`
|
|
PublishOut int64 `json:"publish_out"`
|
|
}
|
|
|
|
// ObjectTotals ...
|
|
type ObjectTotals struct {
|
|
Channels int64
|
|
Connections int64
|
|
Consumers int64
|
|
Exchanges int64
|
|
Queues int64
|
|
}
|
|
|
|
// QueueTotals ...
|
|
type QueueTotals struct {
|
|
Messages int64
|
|
MessagesReady int64 `json:"messages_ready"`
|
|
MessagesUnacknowledged int64 `json:"messages_unacknowledged"`
|
|
MessageBytes int64 `json:"message_bytes"`
|
|
MessageBytesReady int64 `json:"message_bytes_ready"`
|
|
MessageBytesUnacknowledged int64 `json:"message_bytes_unacknowledged"`
|
|
MessageRAM int64 `json:"message_bytes_ram"`
|
|
MessagePersistent int64 `json:"message_bytes_persistent"`
|
|
}
|
|
|
|
// Queue ...
|
|
type Queue struct {
|
|
QueueTotals // just to not repeat the same code
|
|
MessageStats `json:"message_stats"`
|
|
Memory int64
|
|
Consumers int64
|
|
ConsumerUtilisation float64 `json:"consumer_utilisation"`
|
|
Name string
|
|
Node string
|
|
Vhost string
|
|
Durable bool
|
|
AutoDelete bool `json:"auto_delete"`
|
|
IdleSince string `json:"idle_since"`
|
|
}
|
|
|
|
// Node ...
|
|
type Node struct {
|
|
Name string
|
|
|
|
DiskFree int64 `json:"disk_free"`
|
|
DiskFreeLimit int64 `json:"disk_free_limit"`
|
|
FdTotal int64 `json:"fd_total"`
|
|
FdUsed int64 `json:"fd_used"`
|
|
MemLimit int64 `json:"mem_limit"`
|
|
MemUsed int64 `json:"mem_used"`
|
|
ProcTotal int64 `json:"proc_total"`
|
|
ProcUsed int64 `json:"proc_used"`
|
|
RunQueue int64 `json:"run_queue"`
|
|
SocketsTotal int64 `json:"sockets_total"`
|
|
SocketsUsed int64 `json:"sockets_used"`
|
|
Running bool `json:"running"`
|
|
}
|
|
|
|
type Exchange struct {
|
|
Name string
|
|
MessageStats `json:"message_stats"`
|
|
Type string
|
|
Internal bool
|
|
Vhost string
|
|
Durable bool
|
|
AutoDelete bool `json:"auto_delete"`
|
|
}
|
|
|
|
// gatherFunc ...
|
|
type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator)
|
|
|
|
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues, gatherExchanges}
|
|
|
|
var sampleConfig = `
|
|
## Management Plugin url. (default: http://localhost:15672)
|
|
# url = "http://localhost:15672"
|
|
## Tag added to rabbitmq_overview series; deprecated: use tags
|
|
# name = "rmq-server-1"
|
|
## Credentials
|
|
# username = "guest"
|
|
# password = "guest"
|
|
|
|
## Optional TLS Config
|
|
# tls_ca = "/etc/telegraf/ca.pem"
|
|
# tls_cert = "/etc/telegraf/cert.pem"
|
|
# tls_key = "/etc/telegraf/key.pem"
|
|
## Use TLS but skip chain & host verification
|
|
# insecure_skip_verify = false
|
|
|
|
## Optional request timeouts
|
|
##
|
|
## ResponseHeaderTimeout, if non-zero, specifies the amount of time to wait
|
|
## for a server's response headers after fully writing the request.
|
|
# header_timeout = "3s"
|
|
##
|
|
## client_timeout specifies a time limit for requests made by this client.
|
|
## Includes connection time, any redirects, and reading the response body.
|
|
# client_timeout = "4s"
|
|
|
|
## A list of nodes to gather as the rabbitmq_node measurement. If not
|
|
## specified, metrics for all nodes are gathered.
|
|
# nodes = ["rabbit@node1", "rabbit@node2"]
|
|
|
|
## A list of queues to gather as the rabbitmq_queue measurement. If not
|
|
## specified, metrics for all queues are gathered.
|
|
# queues = ["telegraf"]
|
|
|
|
## A list of exchanges to gather as the rabbitmq_exchange measurement. If not
|
|
## specified, metrics for all exchanges are gathered.
|
|
# exchanges = ["telegraf"]
|
|
|
|
## Queues to include and exclude. Globs accepted.
|
|
## Note that an empty array for both will include all queues
|
|
queue_name_include = []
|
|
queue_name_exclude = []
|
|
`
|
|
|
|
// SampleConfig ...
|
|
func (r *RabbitMQ) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
// Description ...
|
|
func (r *RabbitMQ) Description() string {
|
|
return "Reads metrics from RabbitMQ servers via the Management Plugin"
|
|
}
|
|
|
|
// Gather ...
|
|
func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
|
|
if r.Client == nil {
|
|
tlsCfg, err := r.ClientConfig.TLSConfig()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
tr := &http.Transport{
|
|
ResponseHeaderTimeout: r.ResponseHeaderTimeout.Duration,
|
|
TLSClientConfig: tlsCfg,
|
|
}
|
|
r.Client = &http.Client{
|
|
Transport: tr,
|
|
Timeout: r.ClientTimeout.Duration,
|
|
}
|
|
}
|
|
|
|
// Create queue filter if not already created
|
|
if !r.filterCreated {
|
|
err := r.createQueueFilter()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
r.filterCreated = true
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(len(gatherFunctions))
|
|
for _, f := range gatherFunctions {
|
|
go func(gf gatherFunc) {
|
|
defer wg.Done()
|
|
gf(r, acc)
|
|
}(f)
|
|
}
|
|
wg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *RabbitMQ) requestJSON(u string, target interface{}) error {
|
|
if r.URL == "" {
|
|
r.URL = DefaultURL
|
|
}
|
|
u = fmt.Sprintf("%s%s", r.URL, u)
|
|
|
|
req, err := http.NewRequest("GET", u, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
username := r.Username
|
|
if username == "" {
|
|
username = DefaultUsername
|
|
}
|
|
|
|
password := r.Password
|
|
if password == "" {
|
|
password = DefaultPassword
|
|
}
|
|
|
|
req.SetBasicAuth(username, password)
|
|
|
|
resp, err := r.Client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer resp.Body.Close()
|
|
|
|
json.NewDecoder(resp.Body).Decode(target)
|
|
|
|
return nil
|
|
}
|
|
|
|
func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
|
|
overview := &OverviewResponse{}
|
|
|
|
err := r.requestJSON("/api/overview", &overview)
|
|
if err != nil {
|
|
acc.AddError(err)
|
|
return
|
|
}
|
|
|
|
if overview.QueueTotals == nil || overview.ObjectTotals == nil || overview.MessageStats == nil || overview.Listeners == nil {
|
|
acc.AddError(fmt.Errorf("Wrong answer from rabbitmq. Probably auth issue"))
|
|
return
|
|
}
|
|
|
|
var clustering_listeners, amqp_listeners int64 = 0, 0
|
|
for _, listener := range overview.Listeners {
|
|
if listener.Protocol == "clustering" {
|
|
clustering_listeners++
|
|
} else if listener.Protocol == "amqp" {
|
|
amqp_listeners++
|
|
}
|
|
}
|
|
|
|
tags := map[string]string{"url": r.URL}
|
|
if r.Name != "" {
|
|
tags["name"] = r.Name
|
|
}
|
|
fields := map[string]interface{}{
|
|
"messages": overview.QueueTotals.Messages,
|
|
"messages_ready": overview.QueueTotals.MessagesReady,
|
|
"messages_unacked": overview.QueueTotals.MessagesUnacknowledged,
|
|
"channels": overview.ObjectTotals.Channels,
|
|
"connections": overview.ObjectTotals.Connections,
|
|
"consumers": overview.ObjectTotals.Consumers,
|
|
"exchanges": overview.ObjectTotals.Exchanges,
|
|
"queues": overview.ObjectTotals.Queues,
|
|
"messages_acked": overview.MessageStats.Ack,
|
|
"messages_delivered": overview.MessageStats.Deliver,
|
|
"messages_delivered_get": overview.MessageStats.DeliverGet,
|
|
"messages_published": overview.MessageStats.Publish,
|
|
"clustering_listeners": clustering_listeners,
|
|
"amqp_listeners": amqp_listeners,
|
|
}
|
|
acc.AddFields("rabbitmq_overview", fields, tags)
|
|
}
|
|
|
|
func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
|
|
nodes := make([]Node, 0)
|
|
// Gather information about nodes
|
|
err := r.requestJSON("/api/nodes", &nodes)
|
|
if err != nil {
|
|
acc.AddError(err)
|
|
return
|
|
}
|
|
now := time.Now()
|
|
|
|
for _, node := range nodes {
|
|
if !r.shouldGatherNode(node) {
|
|
continue
|
|
}
|
|
|
|
tags := map[string]string{"url": r.URL}
|
|
tags["node"] = node.Name
|
|
|
|
var running int64 = 0
|
|
if node.Running {
|
|
running = 1
|
|
}
|
|
|
|
fields := map[string]interface{}{
|
|
"disk_free": node.DiskFree,
|
|
"disk_free_limit": node.DiskFreeLimit,
|
|
"fd_total": node.FdTotal,
|
|
"fd_used": node.FdUsed,
|
|
"mem_limit": node.MemLimit,
|
|
"mem_used": node.MemUsed,
|
|
"proc_total": node.ProcTotal,
|
|
"proc_used": node.ProcUsed,
|
|
"run_queue": node.RunQueue,
|
|
"sockets_total": node.SocketsTotal,
|
|
"sockets_used": node.SocketsUsed,
|
|
"running": running,
|
|
}
|
|
acc.AddFields("rabbitmq_node", fields, tags, now)
|
|
}
|
|
}
|
|
|
|
func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) {
|
|
if r.excludeEveryQueue {
|
|
return
|
|
}
|
|
// Gather information about queues
|
|
queues := make([]Queue, 0)
|
|
err := r.requestJSON("/api/queues", &queues)
|
|
if err != nil {
|
|
acc.AddError(err)
|
|
return
|
|
}
|
|
|
|
for _, queue := range queues {
|
|
if !r.queueFilter.Match(queue.Name) {
|
|
continue
|
|
}
|
|
tags := map[string]string{
|
|
"url": r.URL,
|
|
"queue": queue.Name,
|
|
"vhost": queue.Vhost,
|
|
"node": queue.Node,
|
|
"durable": strconv.FormatBool(queue.Durable),
|
|
"auto_delete": strconv.FormatBool(queue.AutoDelete),
|
|
}
|
|
|
|
acc.AddFields(
|
|
"rabbitmq_queue",
|
|
map[string]interface{}{
|
|
// common information
|
|
"consumers": queue.Consumers,
|
|
"consumer_utilisation": queue.ConsumerUtilisation,
|
|
"idle_since": queue.IdleSince,
|
|
"memory": queue.Memory,
|
|
// messages information
|
|
"message_bytes": queue.MessageBytes,
|
|
"message_bytes_ready": queue.MessageBytesReady,
|
|
"message_bytes_unacked": queue.MessageBytesUnacknowledged,
|
|
"message_bytes_ram": queue.MessageRAM,
|
|
"message_bytes_persist": queue.MessagePersistent,
|
|
"messages": queue.Messages,
|
|
"messages_ready": queue.MessagesReady,
|
|
"messages_unack": queue.MessagesUnacknowledged,
|
|
"messages_ack": queue.MessageStats.Ack,
|
|
"messages_ack_rate": queue.MessageStats.AckDetails.Rate,
|
|
"messages_deliver": queue.MessageStats.Deliver,
|
|
"messages_deliver_rate": queue.MessageStats.DeliverDetails.Rate,
|
|
"messages_deliver_get": queue.MessageStats.DeliverGet,
|
|
"messages_deliver_get_rate": queue.MessageStats.DeliverGetDetails.Rate,
|
|
"messages_publish": queue.MessageStats.Publish,
|
|
"messages_publish_rate": queue.MessageStats.PublishDetails.Rate,
|
|
"messages_redeliver": queue.MessageStats.Redeliver,
|
|
"messages_redeliver_rate": queue.MessageStats.RedeliverDetails.Rate,
|
|
},
|
|
tags,
|
|
)
|
|
}
|
|
}
|
|
|
|
func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) {
|
|
// Gather information about exchanges
|
|
exchanges := make([]Exchange, 0)
|
|
err := r.requestJSON("/api/exchanges", &exchanges)
|
|
if err != nil {
|
|
acc.AddError(err)
|
|
return
|
|
}
|
|
|
|
for _, exchange := range exchanges {
|
|
if !r.shouldGatherExchange(exchange) {
|
|
continue
|
|
}
|
|
tags := map[string]string{
|
|
"url": r.URL,
|
|
"exchange": exchange.Name,
|
|
"type": exchange.Type,
|
|
"vhost": exchange.Vhost,
|
|
"internal": strconv.FormatBool(exchange.Internal),
|
|
"durable": strconv.FormatBool(exchange.Durable),
|
|
"auto_delete": strconv.FormatBool(exchange.AutoDelete),
|
|
}
|
|
|
|
acc.AddFields(
|
|
"rabbitmq_exchange",
|
|
map[string]interface{}{
|
|
"messages_publish_in": exchange.MessageStats.PublishIn,
|
|
"messages_publish_out": exchange.MessageStats.PublishOut,
|
|
},
|
|
tags,
|
|
)
|
|
}
|
|
}
|
|
|
|
func (r *RabbitMQ) shouldGatherNode(node Node) bool {
|
|
if len(r.Nodes) == 0 {
|
|
return true
|
|
}
|
|
|
|
for _, name := range r.Nodes {
|
|
if name == node.Name {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (r *RabbitMQ) createQueueFilter() error {
|
|
// Backwards compatibility for deprecated `queues` parameter.
|
|
if len(r.Queues) > 0 {
|
|
r.QueueInclude = append(r.QueueInclude, r.Queues...)
|
|
}
|
|
|
|
filter, err := filter.NewIncludeExcludeFilter(r.QueueInclude, r.QueueExclude)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
r.queueFilter = filter
|
|
|
|
for _, q := range r.QueueExclude {
|
|
if q == "*" {
|
|
r.excludeEveryQueue = true
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *RabbitMQ) shouldGatherExchange(exchange Exchange) bool {
|
|
if len(r.Exchanges) == 0 {
|
|
return true
|
|
}
|
|
|
|
for _, name := range r.Exchanges {
|
|
if name == exchange.Name {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func init() {
|
|
inputs.Add("rabbitmq", func() telegraf.Input {
|
|
return &RabbitMQ{
|
|
ResponseHeaderTimeout: internal.Duration{Duration: DefaultResponseHeaderTimeout * time.Second},
|
|
ClientTimeout: internal.Duration{Duration: DefaultClientTimeout * time.Second},
|
|
}
|
|
})
|
|
}
|