Add gathering of RabbitMQ federation link metrics (#6283)

This commit is contained in:
Jacques Heunis 2019-10-24 02:08:19 +02:00 committed by Daniel Nelson
parent 1761e25c96
commit 8b3a8d1113
4 changed files with 228 additions and 7 deletions

View File

@ -48,6 +48,13 @@ For additional details reference the [RabbitMQ Management HTTP Stats][management
## specified, metrics for all exchanges are gathered.
# exchanges = ["telegraf"]
## A list of federation upstreams to gather as the rabbitmq_federation measurement.
## If not specified, metrics for all federation upstreams are gathered.
## Federation link metrics will only be gathered for queues and exchanges
## whose non-federation metrics will be collected (e.g a queue excluded
## by the 'queue_name_exclude' option will also be excluded from federation).
# federation_upstreams = ["dataCentre2"]
## Queues to include and exclude. Globs accepted.
## Note that an empty array for both will include all queues
# queue_name_include = []
@ -158,6 +165,16 @@ For additional details reference the [RabbitMQ Management HTTP Stats][management
- messages_publish_out (int, count)
- messages_publish_out_rate (int, messages per second)
- rabbitmq_federation
- acks_uncommitted (int, count)
- consumers (int, count)
- messages_unacknowledged (int, count)
- messages_uncommitted (int, count)
- messages_unconfirmed (int, count)
- messages_confirm (int, count)
- messages_publish (int, count)
- messages_return_unroutable (int, count)
### Tags:
- All measurements have the following tags:
@ -187,6 +204,14 @@ For additional details reference the [RabbitMQ Management HTTP Stats][management
- durable
- auto_delete
- rabbitmq_federation
- url
- vhost
- type
- upstream
- local_entity
- upstream_entity
### Sample Queries:
Message rates for the entire node can be calculated from total message counts. For instance, to get the rate of messages published per minute, use this query:

View File

@ -47,14 +47,17 @@ type RabbitMQ struct {
Queues []string
Exchanges []string
QueueInclude []string `toml:"queue_name_include"`
QueueExclude []string `toml:"queue_name_exclude"`
QueueInclude []string `toml:"queue_name_include"`
QueueExclude []string `toml:"queue_name_exclude"`
FederationUpstreamInclude []string `toml:"federation_upstream_include"`
FederationUpstreamExclude []string `toml:"federation_upstream_exclude"`
Client *http.Client
filterCreated bool
excludeEveryQueue bool
queueFilter filter.Filter
upstreamFilter filter.Filter
}
// OverviewResponse ...
@ -178,6 +181,38 @@ type Exchange struct {
AutoDelete bool `json:"auto_delete"`
}
// FederationLinkChannelMessageStats ...
type FederationLinkChannelMessageStats struct {
Confirm int64 `json:"confirm"`
ConfirmDetails Details `json:"confirm_details"`
Publish int64 `json:"publish"`
PublishDetails Details `json:"publish_details"`
ReturnUnroutable int64 `json:"return_unroutable"`
ReturnUnroutableDetails Details `json:"return_unroutable_details"`
}
// FederationLinkChannel ...
type FederationLinkChannel struct {
AcksUncommitted int64 `json:"acks_uncommitted"`
ConsumerCount int64 `json:"consumer_count"`
MessagesUnacknowledged int64 `json:"messages_unacknowledged"`
MessagesUncommitted int64 `json:"messages_uncommitted"`
MessagesUnconfirmed int64 `json:"messages_unconfirmed"`
MessageStats FederationLinkChannelMessageStats `json:"message_stats"`
}
// FederationLink ...
type FederationLink struct {
Type string `json:"type"`
Queue string `json:"queue"`
UpstreamQueue string `json:"upstream_queue"`
Exchange string `json:"exchange"`
UpstreamExchange string `json:"upstream_exchange"`
Vhost string `json:"vhost"`
Upstream string `json:"upstream"`
LocalChannel FederationLinkChannel `json:"local_channel"`
}
type HealthCheck struct {
Status string `json:"status"`
}
@ -214,7 +249,7 @@ type Memory struct {
// gatherFunc ...
type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator)
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues, gatherExchanges}
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues, gatherExchanges, gatherFederationLinks}
var sampleConfig = `
## Management Plugin url. (default: http://localhost:15672)
@ -258,6 +293,15 @@ var sampleConfig = `
## Note that an empty array for both will include all queues
queue_name_include = []
queue_name_exclude = []
## Federation upstreams include and exclude when gathering the rabbitmq_federation measurement.
## If neither are specified, metrics for all federation upstreams are gathered.
## Federation link metrics will only be gathered for queues and exchanges
## whose non-federation metrics will be collected (e.g a queue excluded
## by the 'queue_name_exclude' option will also be excluded from federation).
## Globs accepted.
# federation_upstream_include = ["dataCentre-*"]
# federation_upstream_exclude = []
`
func boolToInt(b bool) int64 {
@ -294,12 +338,16 @@ func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
}
}
// Create queue filter if not already created
// Create gather filters if not already created
if !r.filterCreated {
err := r.createQueueFilter()
if err != nil {
return err
}
err = r.createUpstreamFilter()
if err != nil {
return err
}
r.filterCreated = true
}
@ -598,7 +646,7 @@ func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) {
}
for _, exchange := range exchanges {
if !r.shouldGatherExchange(exchange) {
if !r.shouldGatherExchange(exchange.Name) {
continue
}
tags := map[string]string{
@ -624,6 +672,52 @@ func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) {
}
}
func gatherFederationLinks(r *RabbitMQ, acc telegraf.Accumulator) {
// Gather information about federation links
federationLinks := make([]FederationLink, 0)
err := r.requestJSON("/api/federation-links", &federationLinks)
if err != nil {
acc.AddError(err)
return
}
for _, link := range federationLinks {
if !r.shouldGatherFederationLink(link) {
continue
}
tags := map[string]string{
"url": r.URL,
"type": link.Type,
"vhost": link.Vhost,
"upstream": link.Upstream,
}
if link.Type == "exchange" {
tags["exchange"] = link.Exchange
tags["upstream_exchange"] = link.UpstreamExchange
} else {
tags["queue"] = link.Queue
tags["upstream_queue"] = link.UpstreamQueue
}
acc.AddFields(
"rabbitmq_federation",
map[string]interface{}{
"acks_uncommitted": link.LocalChannel.AcksUncommitted,
"consumers": link.LocalChannel.ConsumerCount,
"messages_unacknowledged": link.LocalChannel.MessagesUnacknowledged,
"messages_uncommitted": link.LocalChannel.MessagesUncommitted,
"messages_unconfirmed": link.LocalChannel.MessagesUnconfirmed,
"messages_confirm": link.LocalChannel.MessageStats.Confirm,
"messages_publish": link.LocalChannel.MessageStats.Publish,
"messages_return_unroutable": link.LocalChannel.MessageStats.ReturnUnroutable,
},
tags,
)
}
}
func (r *RabbitMQ) shouldGatherNode(node Node) bool {
if len(r.Nodes) == 0 {
return true
@ -659,13 +753,23 @@ func (r *RabbitMQ) createQueueFilter() error {
return nil
}
func (r *RabbitMQ) shouldGatherExchange(exchange Exchange) bool {
func (r *RabbitMQ) createUpstreamFilter() error {
upstreamFilter, err := filter.NewIncludeExcludeFilter(r.FederationUpstreamInclude, r.FederationUpstreamExclude)
if err != nil {
return err
}
r.upstreamFilter = upstreamFilter
return nil
}
func (r *RabbitMQ) shouldGatherExchange(exchangeName string) bool {
if len(r.Exchanges) == 0 {
return true
}
for _, name := range r.Exchanges {
if name == exchange.Name {
if name == exchangeName {
return true
}
}
@ -673,6 +777,21 @@ func (r *RabbitMQ) shouldGatherExchange(exchange Exchange) bool {
return false
}
func (r *RabbitMQ) shouldGatherFederationLink(link FederationLink) bool {
if !r.upstreamFilter.Match(link.Upstream) {
return false
}
switch link.Type {
case "exchange":
return r.shouldGatherExchange(link.Exchange)
case "queue":
return r.queueFilter.Match(link.Queue)
default:
return false
}
}
func init() {
inputs.Add("rabbitmq", func() telegraf.Input {
return &RabbitMQ{

View File

@ -28,6 +28,8 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
jsonFilePath = "testdata/exchanges.json"
case "/api/healthchecks/node/rabbit@vagrant-ubuntu-trusty-64":
jsonFilePath = "testdata/healthchecks.json"
case "/api/federation-links":
jsonFilePath = "testdata/federation-links.json"
case "/api/nodes/rabbit@vagrant-ubuntu-trusty-64/memory":
jsonFilePath = "testdata/memory.json"
default:
@ -162,6 +164,18 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
"messages_publish_out_rate": 5.1,
}
compareMetrics(t, exchangeMetrics, acc, "rabbitmq_exchange")
federationLinkMetrics := map[string]interface{}{
"acks_uncommitted": 1,
"consumers": 2,
"messages_unacknowledged": 3,
"messages_uncommitted": 4,
"messages_unconfirmed": 5,
"messages_confirm": 67,
"messages_publish": 890,
"messages_return_unroutable": 1,
}
compareMetrics(t, federationLinkMetrics, acc, "rabbitmq_federation")
}
func compareMetrics(t *testing.T, expectedMetrics map[string]interface{},

View File

@ -0,0 +1,63 @@
[
{
"node": "rabbit@rmqlocal",
"queue": "exampleLocalQueue",
"upstream_queue": "exampleUpstreamQueue",
"type": "queue",
"vhost": "/",
"upstream": "ExampleFederationUpstream",
"id": "8ba5218f",
"status": "running",
"local_connection": "<rabbit@somehost>",
"uri": "amqp://appsv03",
"timestamp": "2019-08-19 15:34:15",
"local_channel": {
"acks_uncommitted": 1,
"confirm": true,
"connection_details": {
"name": "<rabbit@somehost>",
"peer_host": "undefined",
"peer_port": "undefined"
},
"consumer_count": 2,
"garbage_collection": {
"fullsweep_after": 65535,
"max_heap_size": 0,
"min_bin_vheap_size": 46422,
"min_heap_size": 233,
"minor_gcs": 203
},
"global_prefetch_count": 0,
"message_stats": {
"confirm": 67,
"confirm_details": {
"rate": 2
},
"publish": 890,
"publish_details": {
"rate": 2
},
"return_unroutable": 1,
"return_unroutable_details": {
"rate": 0.1
}
},
"messages_unacknowledged": 3,
"messages_uncommitted": 4,
"messages_unconfirmed": 5,
"name": "<rabbit@somehost>",
"node": "rabbit@rmqlocal",
"number": 1,
"prefetch_count": 0,
"reductions": 1926653,
"reductions_details": {
"rate": 1068
},
"state": "running",
"transactional": false,
"user": "none",
"user_who_performed_action": "none",
"vhost": "sorandomsorandom"
}
}
]