From 87f1d45ee0f615ff47ab2ec920fd1dd290d3bea0 Mon Sep 17 00:00:00 2001 From: kerams Date: Thu, 4 Jan 2018 02:38:11 +0100 Subject: [PATCH] Add support for exchanges to RabbitMQ input (#3619) --- plugins/inputs/rabbitmq/README.md | 18 ++++ plugins/inputs/rabbitmq/rabbitmq.go | 71 ++++++++++++++- plugins/inputs/rabbitmq/rabbitmq_test.go | 107 +++++++++++++++++++++++ 3 files changed, 193 insertions(+), 3 deletions(-) diff --git a/plugins/inputs/rabbitmq/README.md b/plugins/inputs/rabbitmq/README.md index 83e4bd2ee..a1dfc879a 100644 --- a/plugins/inputs/rabbitmq/README.md +++ b/plugins/inputs/rabbitmq/README.md @@ -40,6 +40,10 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd ## A list of queues to gather as the rabbitmq_queue measurement. If not ## specified, metrics for all queues are gathered. # queues = ["telegraf"] + + ## A list of exchanges to gather as the rabbitmq_exchange measurement. If not + ## specified, metrics for all exchanges are gathered. + # exchanges = ["telegraf"] ``` ### Measurements & Fields: @@ -95,6 +99,10 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd - messages_redeliver_rate (float, messages per second) - messages_unack (integer, count) +- rabbitmq_exchange + - messages_publish_in (int, count) + - messages_publish_out (int, count) + ### Tags: - All measurements have the following tags: @@ -114,6 +122,15 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd - durable - auto_delete +- rabbitmq_exchange + - url + - exchange + - type + - vhost + - internal + - durable + - auto_delete + ### Sample Queries: Message rates for the entire node can be calculated from total message counts. For instance, to get the rate of messages published per minute, use this query: @@ -129,4 +146,5 @@ FROM rabbitmq_overview WHERE time > now() - 10m GROUP BY time(1m) rabbitmq_queue,url=http://amqp.example.org:15672,queue=telegraf,vhost=influxdb,node=rabbit@amqp.example.org,durable=true,auto_delete=false,host=amqp.example.org messages_deliver_get=0i,messages_publish=329i,messages_publish_rate=0.2,messages_redeliver_rate=0,message_bytes_ready=0i,message_bytes_unacked=0i,messages_deliver=329i,messages_unack=0i,consumers=1i,idle_since="",messages=0i,messages_deliver_rate=0.2,messages_deliver_get_rate=0.2,messages_redeliver=0i,memory=43032i,message_bytes_ram=0i,messages_ack=329i,messages_ready=0i,messages_ack_rate=0.2,consumer_utilisation=1,message_bytes=0i,message_bytes_persist=0i 1493684035000000000 rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i 1493684035000000000 rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i 149368403500000000 +rabbitmq_exchange,url=http://amqp.example.org:15672,exchange=telegraf,type=fanout,vhost=influxdb,internal=false,durable=true,auto_delete=false,host=amqp.example.org messages_publish_in=2i,messages_publish_out=1i 149368403500000000 ``` diff --git a/plugins/inputs/rabbitmq/rabbitmq.go b/plugins/inputs/rabbitmq/rabbitmq.go index b6b5a6772..775ea75df 100644 --- a/plugins/inputs/rabbitmq/rabbitmq.go +++ b/plugins/inputs/rabbitmq/rabbitmq.go @@ -48,8 +48,9 @@ type RabbitMQ struct { ResponseHeaderTimeout internal.Duration `toml:"header_timeout"` ClientTimeout internal.Duration `toml:"client_timeout"` - Nodes []string - Queues []string + Nodes []string + Queues []string + Exchanges []string Client *http.Client } @@ -78,6 +79,8 @@ type MessageStats struct { PublishDetails Details `json:"publish_details"` Redeliver int64 RedeliverDetails Details `json:"redeliver_details"` + PublishIn int64 `json:"publish_in"` + PublishOut int64 `json:"publish_out"` } // ObjectTotals ... @@ -133,10 +136,20 @@ type Node struct { SocketsUsed int64 `json:"sockets_used"` } +type Exchange struct { + Name string + MessageStats `json:"message_stats"` + Type string + Internal bool + Vhost string + Durable bool + AutoDelete bool `json:"auto_delete"` +} + // gatherFunc ... type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator) -var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues} +var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues, gatherExchanges} var sampleConfig = ` ## Management Plugin url. (default: http://localhost:15672) @@ -171,6 +184,10 @@ var sampleConfig = ` ## A list of queues to gather as the rabbitmq_queue measurement. If not ## specified, metrics for all queues are gathered. # queues = ["telegraf"] + + ## A list of exchanges to gather as the rabbitmq_exchange measurement. If not + ## specified, metrics for all exchanges are gathered. + # exchanges = ["telegraf"] ` // SampleConfig ... @@ -374,6 +391,40 @@ func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) { } } +func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) { + // Gather information about exchanges + exchanges := make([]Exchange, 0) + err := r.requestJSON("/api/exchanges", &exchanges) + if err != nil { + acc.AddError(err) + return + } + + for _, exchange := range exchanges { + if !r.shouldGatherExchange(exchange) { + continue + } + tags := map[string]string{ + "url": r.URL, + "exchange": exchange.Name, + "type": exchange.Type, + "vhost": exchange.Vhost, + "internal": strconv.FormatBool(exchange.Internal), + "durable": strconv.FormatBool(exchange.Durable), + "auto_delete": strconv.FormatBool(exchange.AutoDelete), + } + + acc.AddFields( + "rabbitmq_exchange", + map[string]interface{}{ + "messages_publish_in": exchange.MessageStats.PublishIn, + "messages_publish_out": exchange.MessageStats.PublishOut, + }, + tags, + ) + } +} + func (r *RabbitMQ) shouldGatherNode(node Node) bool { if len(r.Nodes) == 0 { return true @@ -402,6 +453,20 @@ func (r *RabbitMQ) shouldGatherQueue(queue Queue) bool { return false } +func (r *RabbitMQ) shouldGatherExchange(exchange Exchange) bool { + if len(r.Exchanges) == 0 { + return true + } + + for _, name := range r.Exchanges { + if name == exchange.Name { + return true + } + } + + return false +} + func init() { inputs.Add("rabbitmq", func() telegraf.Input { return &RabbitMQ{ diff --git a/plugins/inputs/rabbitmq/rabbitmq_test.go b/plugins/inputs/rabbitmq/rabbitmq_test.go index 3be0259bc..759c71c41 100644 --- a/plugins/inputs/rabbitmq/rabbitmq_test.go +++ b/plugins/inputs/rabbitmq/rabbitmq_test.go @@ -374,6 +374,102 @@ const sampleQueuesResponse = ` ] ` +const sampleExchangesResponse = ` +[ + { + "arguments": { }, + "internal": false, + "auto_delete": false, + "durable": true, + "type": "direct", + "vhost": "\/", + "name": "" + }, + { + "message_stats": { + "publish_in_details": { + "rate": 0 + }, + "publish_in": 2, + "publish_out_details": { + "rate": 0 + }, + "publish_out": 1 + }, + "arguments": { }, + "internal": false, + "auto_delete": false, + "durable": true, + "type": "fanout", + "vhost": "\/", + "name": "telegraf" + }, + { + "arguments": { }, + "internal": false, + "auto_delete": false, + "durable": true, + "type": "direct", + "vhost": "\/", + "name": "amq.direct" + }, + { + "arguments": { }, + "internal": false, + "auto_delete": false, + "durable": true, + "type": "fanout", + "vhost": "\/", + "name": "amq.fanout" + }, + { + "arguments": { }, + "internal": false, + "auto_delete": false, + "durable": true, + "type": "headers", + "vhost": "\/", + "name": "amq.headers" + }, + { + "arguments": { }, + "internal": false, + "auto_delete": false, + "durable": true, + "type": "headers", + "vhost": "\/", + "name": "amq.match" + }, + { + "arguments": { }, + "internal": true, + "auto_delete": false, + "durable": true, + "type": "topic", + "vhost": "\/", + "name": "amq.rabbitmq.log" + }, + { + "arguments": { }, + "internal": true, + "auto_delete": false, + "durable": true, + "type": "topic", + "vhost": "\/", + "name": "amq.rabbitmq.trace" + }, + { + "arguments": { }, + "internal": false, + "auto_delete": false, + "durable": true, + "type": "topic", + "vhost": "\/", + "name": "amq.topic" + } +] +` + func TestRabbitMQGeneratesMetrics(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var rsp string @@ -385,6 +481,8 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) { rsp = sampleNodesResponse case "/api/queues": rsp = sampleQueuesResponse + case "/api/exchanges": + rsp = sampleExchangesResponse default: panic("Cannot handle request") } @@ -441,4 +539,13 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) { } assert.True(t, acc.HasMeasurement("rabbitmq_queue")) + + exchangeIntMetrics := []string{ + "messages_publish_in", + "messages_publish_out", + } + + for _, metric := range exchangeIntMetrics { + assert.True(t, acc.HasInt64Field("rabbitmq_exchange", metric)) + } }