Revert "Rabbitmq plugin: connection-related metrics." (#2169)
This commit is contained in:
parent
17b307a7bc
commit
a970b9c62c
|
@ -50,9 +50,8 @@ type RabbitMQ struct {
|
||||||
ClientTimeout internal.Duration `toml:"client_timeout"`
|
ClientTimeout internal.Duration `toml:"client_timeout"`
|
||||||
|
|
||||||
// InsecureSkipVerify bool
|
// InsecureSkipVerify bool
|
||||||
Nodes []string
|
Nodes []string
|
||||||
Queues []string
|
Queues []string
|
||||||
Connections []string
|
|
||||||
|
|
||||||
Client *http.Client
|
Client *http.Client
|
||||||
}
|
}
|
||||||
|
@ -136,22 +135,10 @@ type Node struct {
|
||||||
SocketsUsed int64 `json:"sockets_used"`
|
SocketsUsed int64 `json:"sockets_used"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Connection ...
|
|
||||||
type Connection struct {
|
|
||||||
Name string
|
|
||||||
State string
|
|
||||||
Vhost string
|
|
||||||
Host string
|
|
||||||
Node string
|
|
||||||
ReceiveCount int64 `json:"recv_cnt"`
|
|
||||||
SendCount int64 `json:"send_cnt"`
|
|
||||||
SendPend int64 `json:"send_pend"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// gatherFunc ...
|
// gatherFunc ...
|
||||||
type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error)
|
type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error)
|
||||||
|
|
||||||
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues, gatherConnections}
|
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues}
|
||||||
|
|
||||||
var sampleConfig = `
|
var sampleConfig = `
|
||||||
# url = "http://localhost:15672"
|
# url = "http://localhost:15672"
|
||||||
|
@ -393,42 +380,6 @@ func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) {
|
||||||
errChan <- nil
|
errChan <- nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func gatherConnections(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) {
|
|
||||||
// Gather information about connections
|
|
||||||
connections := make([]Connection, 0)
|
|
||||||
err := r.requestJSON("/api/connections", &connections)
|
|
||||||
if err != nil {
|
|
||||||
errChan <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, connection := range connections {
|
|
||||||
if !r.shouldGatherConnection(connection) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
tags := map[string]string{
|
|
||||||
"url": r.URL,
|
|
||||||
"connection": connection.Name,
|
|
||||||
"vhost": connection.Vhost,
|
|
||||||
"host": connection.Host,
|
|
||||||
"node": connection.Node,
|
|
||||||
}
|
|
||||||
|
|
||||||
acc.AddFields(
|
|
||||||
"rabbitmq_connection",
|
|
||||||
map[string]interface{}{
|
|
||||||
"recv_cnt": connection.ReceiveCount,
|
|
||||||
"send_cnt": connection.SendCount,
|
|
||||||
"send_pend": connection.SendPend,
|
|
||||||
"state": connection.State,
|
|
||||||
},
|
|
||||||
tags,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
errChan <- nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RabbitMQ) shouldGatherNode(node Node) bool {
|
func (r *RabbitMQ) shouldGatherNode(node Node) bool {
|
||||||
if len(r.Nodes) == 0 {
|
if len(r.Nodes) == 0 {
|
||||||
return true
|
return true
|
||||||
|
@ -457,20 +408,6 @@ func (r *RabbitMQ) shouldGatherQueue(queue Queue) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RabbitMQ) shouldGatherConnection(connection Connection) bool {
|
|
||||||
if len(r.Connections) == 0 {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, name := range r.Connections {
|
|
||||||
if name == connection.Name {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
inputs.Add("rabbitmq", func() telegraf.Input {
|
inputs.Add("rabbitmq", func() telegraf.Input {
|
||||||
return &RabbitMQ{
|
return &RabbitMQ{
|
||||||
|
|
|
@ -374,57 +374,6 @@ const sampleQueuesResponse = `
|
||||||
]
|
]
|
||||||
`
|
`
|
||||||
|
|
||||||
const sampleConnectionsResponse = `
|
|
||||||
[
|
|
||||||
{
|
|
||||||
"recv_oct": 166055,
|
|
||||||
"recv_oct_details": {
|
|
||||||
"rate": 0
|
|
||||||
},
|
|
||||||
"send_oct": 589,
|
|
||||||
"send_oct_details": {
|
|
||||||
"rate": 0
|
|
||||||
},
|
|
||||||
"recv_cnt": 124,
|
|
||||||
"send_cnt": 7,
|
|
||||||
"send_pend": 0,
|
|
||||||
"state": "running",
|
|
||||||
"channels": 1,
|
|
||||||
"type": "network",
|
|
||||||
"node": "rabbit@ip-10-0-12-133",
|
|
||||||
"name": "10.0.10.8:32774 -> 10.0.12.131:5672",
|
|
||||||
"port": 5672,
|
|
||||||
"peer_port": 32774,
|
|
||||||
"host": "10.0.12.131",
|
|
||||||
"peer_host": "10.0.10.8",
|
|
||||||
"ssl": false,
|
|
||||||
"peer_cert_subject": null,
|
|
||||||
"peer_cert_issuer": null,
|
|
||||||
"peer_cert_validity": null,
|
|
||||||
"auth_mechanism": "AMQPLAIN",
|
|
||||||
"ssl_protocol": null,
|
|
||||||
"ssl_key_exchange": null,
|
|
||||||
"ssl_cipher": null,
|
|
||||||
"ssl_hash": null,
|
|
||||||
"protocol": "AMQP 0-9-1",
|
|
||||||
"user": "workers",
|
|
||||||
"vhost": "main",
|
|
||||||
"timeout": 0,
|
|
||||||
"frame_max": 131072,
|
|
||||||
"channel_max": 65535,
|
|
||||||
"client_properties": {
|
|
||||||
"product": "py-amqp",
|
|
||||||
"product_version": "1.4.7",
|
|
||||||
"capabilities": {
|
|
||||||
"connection.blocked": true,
|
|
||||||
"consumer_cancel_notify": true
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"connected_at": 1476647837266
|
|
||||||
}
|
|
||||||
]
|
|
||||||
`
|
|
||||||
|
|
||||||
func TestRabbitMQGeneratesMetrics(t *testing.T) {
|
func TestRabbitMQGeneratesMetrics(t *testing.T) {
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
var rsp string
|
var rsp string
|
||||||
|
@ -436,8 +385,6 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
|
||||||
rsp = sampleNodesResponse
|
rsp = sampleNodesResponse
|
||||||
case "/api/queues":
|
case "/api/queues":
|
||||||
rsp = sampleQueuesResponse
|
rsp = sampleQueuesResponse
|
||||||
case "/api/connections":
|
|
||||||
rsp = sampleConnectionsResponse
|
|
||||||
default:
|
default:
|
||||||
panic("Cannot handle request")
|
panic("Cannot handle request")
|
||||||
}
|
}
|
||||||
|
@ -494,15 +441,4 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
assert.True(t, acc.HasMeasurement("rabbitmq_queue"))
|
assert.True(t, acc.HasMeasurement("rabbitmq_queue"))
|
||||||
|
|
||||||
assert.True(t, acc.HasMeasurement("rabbitmq_connection"))
|
|
||||||
|
|
||||||
connection_fields := map[string]interface{}{
|
|
||||||
"recv_cnt": int64(124),
|
|
||||||
"send_cnt": int64(7),
|
|
||||||
"send_pend": int64(0),
|
|
||||||
"state": "running",
|
|
||||||
}
|
|
||||||
|
|
||||||
acc.AssertContainsFields(t, "rabbitmq_connection", connection_fields)
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue