Add ActiveMQ input plugin (#2689)
This commit is contained in:
parent
31e1c04ed0
commit
b9ff1d042b
|
@ -127,6 +127,7 @@ configuration options.
|
||||||
|
|
||||||
## Input Plugins
|
## Input Plugins
|
||||||
|
|
||||||
|
* [activemq](./plugins/inputs/activemq)
|
||||||
* [aerospike](./plugins/inputs/aerospike)
|
* [aerospike](./plugins/inputs/aerospike)
|
||||||
* [amqp_consumer](./plugins/inputs/amqp_consumer) (rabbitmq)
|
* [amqp_consumer](./plugins/inputs/amqp_consumer) (rabbitmq)
|
||||||
* [apache](./plugins/inputs/apache)
|
* [apache](./plugins/inputs/apache)
|
||||||
|
|
|
@ -0,0 +1,86 @@
|
||||||
|
# Telegraf Input Plugin: ActiveMQ
|
||||||
|
|
||||||
|
This plugin gather queues, topics & subscribers metrics using ActiveMQ Console API.
|
||||||
|
|
||||||
|
### Configuration:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
# Description
|
||||||
|
[[inputs.activemq]]
|
||||||
|
## Required ActiveMQ Endpoint
|
||||||
|
# server = "192.168.50.10"
|
||||||
|
|
||||||
|
## Required ActiveMQ port
|
||||||
|
# port = 8161
|
||||||
|
|
||||||
|
## Credentials for basic HTTP authentication
|
||||||
|
# username = "admin"
|
||||||
|
# password = "admin"
|
||||||
|
|
||||||
|
## Required ActiveMQ webadmin root path
|
||||||
|
# webadmin = "admin"
|
||||||
|
|
||||||
|
## Maximum time to receive response.
|
||||||
|
# response_timeout = "5s"
|
||||||
|
|
||||||
|
## Optional TLS Config
|
||||||
|
# tls_ca = "/etc/telegraf/ca.pem"
|
||||||
|
# tls_cert = "/etc/telegraf/cert.pem"
|
||||||
|
# tls_key = "/etc/telegraf/key.pem"
|
||||||
|
## Use TLS but skip chain & host verification
|
||||||
|
```
|
||||||
|
|
||||||
|
### Measurements & Fields:
|
||||||
|
|
||||||
|
Every effort was made to preserve the names based on the XML response from the ActiveMQ Console API.
|
||||||
|
|
||||||
|
- activemq_queues:
|
||||||
|
- size
|
||||||
|
- consumer_count
|
||||||
|
- enqueue_count
|
||||||
|
- dequeue_count
|
||||||
|
- activemq_topics:
|
||||||
|
- size
|
||||||
|
- consumer_count
|
||||||
|
- enqueue_count
|
||||||
|
- dequeue_count
|
||||||
|
- subscribers_metrics:
|
||||||
|
- pending_queue_size
|
||||||
|
- dispatched_queue_size
|
||||||
|
- dispatched_counter
|
||||||
|
- enqueue_counter
|
||||||
|
- dequeue_counter
|
||||||
|
|
||||||
|
### Tags:
|
||||||
|
|
||||||
|
- activemq_queues:
|
||||||
|
- name
|
||||||
|
- source
|
||||||
|
- port
|
||||||
|
- activemq_topics:
|
||||||
|
- name
|
||||||
|
- source
|
||||||
|
- port
|
||||||
|
- activemq_subscribers:
|
||||||
|
- client_id
|
||||||
|
- subscription_name
|
||||||
|
- connection_id
|
||||||
|
- destination_name
|
||||||
|
- selector
|
||||||
|
- active
|
||||||
|
- source
|
||||||
|
- port
|
||||||
|
|
||||||
|
### Example Output:
|
||||||
|
|
||||||
|
```
|
||||||
|
$ ./telegraf -config telegraf.conf -input-filter activemq -test
|
||||||
|
activemq_queues,name=sandra,host=88284b2fe51b,source=localhost,port=8161 consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000
|
||||||
|
activemq_queues,name=Test,host=88284b2fe51b,source=localhost,port=8161 dequeue_count=0i,size=0i,consumer_count=0i,enqueue_count=0i 1492610703000000000
|
||||||
|
activemq_topics,name=ActiveMQ.Advisory.MasterBroker\ ,host=88284b2fe51b,source=localhost,port=8161 size=0i,consumer_count=0i,enqueue_count=1i,dequeue_count=0i 1492610703000000000
|
||||||
|
activemq_topics,host=88284b2fe51b,name=AAA\,source=localhost,port=8161 size=0i,consumer_count=1i,enqueue_count=0i,dequeue_count=0i 1492610703000000000
|
||||||
|
activemq_topics,name=ActiveMQ.Advisory.Topic\,source=localhost,port=8161 ,host=88284b2fe51b enqueue_count=1i,dequeue_count=0i,size=0i,consumer_count=0i 1492610703000000000
|
||||||
|
activemq_topics,name=ActiveMQ.Advisory.Queue\,source=localhost,port=8161 ,host=88284b2fe51b size=0i,consumer_count=0i,enqueue_count=2i,dequeue_count=0i 1492610703000000000
|
||||||
|
activemq_topics,name=AAAA\ ,host=88284b2fe51b,source=localhost,port=8161 consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000
|
||||||
|
activemq_subscribers,connection_id=NOTSET,destination_name=AAA,,source=localhost,port=8161,selector=AA,active=no,host=88284b2fe51b,client_id=AAA,subscription_name=AAA pending_queue_size=0i,dispatched_queue_size=0i,dispatched_counter=0i,enqueue_counter=0i,dequeue_counter=0i 1492610703000000000
|
||||||
|
```
|
|
@ -0,0 +1,261 @@
|
||||||
|
package activemq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/xml"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
"github.com/influxdata/telegraf/internal/tls"
|
||||||
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ActiveMQ struct {
|
||||||
|
Server string `json:"server"`
|
||||||
|
Port int `json:"port"`
|
||||||
|
Username string `json:"username"`
|
||||||
|
Password string `json:"password"`
|
||||||
|
Webadmin string `json:"webadmin"`
|
||||||
|
ResponseTimeout internal.Duration
|
||||||
|
tls.ClientConfig
|
||||||
|
|
||||||
|
client *http.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
type Topics struct {
|
||||||
|
XMLName xml.Name `xml:"topics"`
|
||||||
|
TopicItems []Topic `xml:"topic"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Topic struct {
|
||||||
|
XMLName xml.Name `xml:"topic"`
|
||||||
|
Name string `xml:"name,attr"`
|
||||||
|
Stats Stats `xml:"stats"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Subscribers struct {
|
||||||
|
XMLName xml.Name `xml:"subscribers"`
|
||||||
|
SubscriberItems []Subscriber `xml:"subscriber"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Subscriber struct {
|
||||||
|
XMLName xml.Name `xml:"subscriber"`
|
||||||
|
ClientId string `xml:"clientId,attr"`
|
||||||
|
SubscriptionName string `xml:"subscriptionName,attr"`
|
||||||
|
ConnectionId string `xml:"connectionId,attr"`
|
||||||
|
DestinationName string `xml:"destinationName,attr"`
|
||||||
|
Selector string `xml:"selector,attr"`
|
||||||
|
Active string `xml:"active,attr"`
|
||||||
|
Stats Stats `xml:"stats"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Queues struct {
|
||||||
|
XMLName xml.Name `xml:"queues"`
|
||||||
|
QueueItems []Queue `xml:"queue"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Queue struct {
|
||||||
|
XMLName xml.Name `xml:"queue"`
|
||||||
|
Name string `xml:"name,attr"`
|
||||||
|
Stats Stats `xml:"stats"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Stats struct {
|
||||||
|
XMLName xml.Name `xml:"stats"`
|
||||||
|
Size int `xml:"size,attr"`
|
||||||
|
ConsumerCount int `xml:"consumerCount,attr"`
|
||||||
|
EnqueueCount int `xml:"enqueueCount,attr"`
|
||||||
|
DequeueCount int `xml:"dequeueCount,attr"`
|
||||||
|
PendingQueueSize int `xml:"pendingQueueSize,attr"`
|
||||||
|
DispatchedQueueSize int `xml:"dispatchedQueueSize,attr"`
|
||||||
|
DispatchedCounter int `xml:"dispatchedCounter,attr"`
|
||||||
|
EnqueueCounter int `xml:"enqueueCounter,attr"`
|
||||||
|
DequeueCounter int `xml:"dequeueCounter,attr"`
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
QUEUES_STATS = "queues"
|
||||||
|
TOPICS_STATS = "topics"
|
||||||
|
SUBSCRIBERS_STATS = "subscribers"
|
||||||
|
)
|
||||||
|
|
||||||
|
var sampleConfig = `
|
||||||
|
## Required ActiveMQ Endpoint
|
||||||
|
# server = "192.168.50.10"
|
||||||
|
|
||||||
|
## Required ActiveMQ port
|
||||||
|
# port = 8161
|
||||||
|
|
||||||
|
## Credentials for basic HTTP authentication
|
||||||
|
# username = "admin"
|
||||||
|
# password = "admin"
|
||||||
|
|
||||||
|
## Required ActiveMQ webadmin root path
|
||||||
|
# webadmin = "admin"
|
||||||
|
|
||||||
|
## Maximum time to receive response.
|
||||||
|
# response_timeout = "5s"
|
||||||
|
|
||||||
|
## Optional TLS Config
|
||||||
|
# tls_ca = "/etc/telegraf/ca.pem"
|
||||||
|
# tls_cert = "/etc/telegraf/cert.pem"
|
||||||
|
# tls_key = "/etc/telegraf/key.pem"
|
||||||
|
## Use TLS but skip chain & host verification
|
||||||
|
`
|
||||||
|
|
||||||
|
func (a *ActiveMQ) Description() string {
|
||||||
|
return "Gather ActiveMQ metrics"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *ActiveMQ) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *ActiveMQ) createHttpClient() (*http.Client, error) {
|
||||||
|
tlsCfg, err := a.ClientConfig.TLSConfig()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
client := &http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
TLSClientConfig: tlsCfg,
|
||||||
|
},
|
||||||
|
Timeout: a.ResponseTimeout.Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
return client, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *ActiveMQ) GetMetrics(keyword string) ([]byte, error) {
|
||||||
|
if a.ResponseTimeout.Duration < time.Second {
|
||||||
|
a.ResponseTimeout.Duration = time.Second * 5
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.client == nil {
|
||||||
|
client, err := a.createHttpClient()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
a.client = client
|
||||||
|
}
|
||||||
|
url := fmt.Sprintf("http://%s:%d/%s/xml/%s.jsp", a.Server, a.Port, a.Webadmin, keyword)
|
||||||
|
|
||||||
|
req, err := http.NewRequest("GET", url, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
req.SetBasicAuth(a.Username, a.Password)
|
||||||
|
resp, err := a.client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
defer resp.Body.Close()
|
||||||
|
return ioutil.ReadAll(resp.Body)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *ActiveMQ) GatherQueuesMetrics(acc telegraf.Accumulator, queues Queues) {
|
||||||
|
for _, queue := range queues.QueueItems {
|
||||||
|
records := make(map[string]interface{})
|
||||||
|
tags := make(map[string]string)
|
||||||
|
|
||||||
|
tags["name"] = strings.TrimSpace(queue.Name)
|
||||||
|
tags["source"] = a.Server
|
||||||
|
tags["port"] = strconv.Itoa(a.Port)
|
||||||
|
|
||||||
|
records["size"] = queue.Stats.Size
|
||||||
|
records["consumer_count"] = queue.Stats.ConsumerCount
|
||||||
|
records["enqueue_count"] = queue.Stats.EnqueueCount
|
||||||
|
records["dequeue_count"] = queue.Stats.DequeueCount
|
||||||
|
|
||||||
|
acc.AddFields("activemq_queues", records, tags)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *ActiveMQ) GatherTopicsMetrics(acc telegraf.Accumulator, topics Topics) {
|
||||||
|
for _, topic := range topics.TopicItems {
|
||||||
|
records := make(map[string]interface{})
|
||||||
|
tags := make(map[string]string)
|
||||||
|
|
||||||
|
tags["name"] = topic.Name
|
||||||
|
tags["source"] = a.Server
|
||||||
|
tags["port"] = strconv.Itoa(a.Port)
|
||||||
|
|
||||||
|
records["size"] = topic.Stats.Size
|
||||||
|
records["consumer_count"] = topic.Stats.ConsumerCount
|
||||||
|
records["enqueue_count"] = topic.Stats.EnqueueCount
|
||||||
|
records["dequeue_count"] = topic.Stats.DequeueCount
|
||||||
|
|
||||||
|
acc.AddFields("activemq_topics", records, tags)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscribers Subscribers) {
|
||||||
|
for _, subscriber := range subscribers.SubscriberItems {
|
||||||
|
records := make(map[string]interface{})
|
||||||
|
tags := make(map[string]string)
|
||||||
|
|
||||||
|
tags["client_id"] = subscriber.ClientId
|
||||||
|
tags["subscription_name"] = subscriber.SubscriptionName
|
||||||
|
tags["connection_id"] = subscriber.ConnectionId
|
||||||
|
tags["destination_name"] = subscriber.DestinationName
|
||||||
|
tags["selector"] = subscriber.Selector
|
||||||
|
tags["active"] = subscriber.Active
|
||||||
|
tags["source"] = a.Server
|
||||||
|
tags["port"] = strconv.Itoa(a.Port)
|
||||||
|
|
||||||
|
records["pending_queue_size"] = subscriber.Stats.PendingQueueSize
|
||||||
|
records["dispatched_queue_size"] = subscriber.Stats.DispatchedQueueSize
|
||||||
|
records["dispatched_counter"] = subscriber.Stats.DispatchedCounter
|
||||||
|
records["enqueue_counter"] = subscriber.Stats.EnqueueCounter
|
||||||
|
records["dequeue_counter"] = subscriber.Stats.DequeueCounter
|
||||||
|
|
||||||
|
acc.AddFields("activemq_subscribers", records, tags)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *ActiveMQ) Gather(acc telegraf.Accumulator) error {
|
||||||
|
dataQueues, err := a.GetMetrics(QUEUES_STATS)
|
||||||
|
queues := Queues{}
|
||||||
|
err = xml.Unmarshal(dataQueues, &queues)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
dataTopics, err := a.GetMetrics(TOPICS_STATS)
|
||||||
|
topics := Topics{}
|
||||||
|
err = xml.Unmarshal(dataTopics, &topics)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
dataSubscribers, err := a.GetMetrics(SUBSCRIBERS_STATS)
|
||||||
|
subscribers := Subscribers{}
|
||||||
|
err = xml.Unmarshal(dataSubscribers, &subscribers)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
a.GatherQueuesMetrics(acc, queues)
|
||||||
|
a.GatherTopicsMetrics(acc, topics)
|
||||||
|
a.GatherSubscribersMetrics(acc, subscribers)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
inputs.Add("activemq", func() telegraf.Input {
|
||||||
|
return &ActiveMQ{
|
||||||
|
Server: "localhost",
|
||||||
|
Port: 8161,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,139 @@
|
||||||
|
package activemq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/xml"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGatherQueuesMetrics(t *testing.T) {
|
||||||
|
|
||||||
|
s := `<queues>
|
||||||
|
<queue name="sandra">
|
||||||
|
<stats size="0" consumerCount="0" enqueueCount="0" dequeueCount="0"/>
|
||||||
|
<feed>
|
||||||
|
<atom>queueBrowse/sandra?view=rss&feedType=atom_1.0</atom>
|
||||||
|
<rss>queueBrowse/sandra?view=rss&feedType=rss_2.0</rss>
|
||||||
|
</feed>
|
||||||
|
</queue>
|
||||||
|
<queue name="Test">
|
||||||
|
<stats size="0" consumerCount="0" enqueueCount="0" dequeueCount="0"/>
|
||||||
|
<feed>
|
||||||
|
<atom>queueBrowse/Test?view=rss&feedType=atom_1.0</atom>
|
||||||
|
<rss>queueBrowse/Test?view=rss&feedType=rss_2.0</rss>
|
||||||
|
</feed>
|
||||||
|
</queue>
|
||||||
|
</queues>`
|
||||||
|
|
||||||
|
queues := Queues{}
|
||||||
|
|
||||||
|
xml.Unmarshal([]byte(s), &queues)
|
||||||
|
|
||||||
|
records := make(map[string]interface{})
|
||||||
|
tags := make(map[string]string)
|
||||||
|
|
||||||
|
tags["name"] = "Test"
|
||||||
|
tags["source"] = "localhost"
|
||||||
|
tags["port"] = "8161"
|
||||||
|
|
||||||
|
records["size"] = 0
|
||||||
|
records["consumer_count"] = 0
|
||||||
|
records["enqueue_count"] = 0
|
||||||
|
records["dequeue_count"] = 0
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
activeMQ := new(ActiveMQ)
|
||||||
|
activeMQ.Server = "localhost"
|
||||||
|
activeMQ.Port = 8161
|
||||||
|
|
||||||
|
activeMQ.GatherQueuesMetrics(&acc, queues)
|
||||||
|
acc.AssertContainsTaggedFields(t, "activemq_queues", records, tags)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGatherTopicsMetrics(t *testing.T) {
|
||||||
|
|
||||||
|
s := `<topics>
|
||||||
|
<topic name="ActiveMQ.Advisory.MasterBroker ">
|
||||||
|
<stats size="0" consumerCount="0" enqueueCount="1" dequeueCount="0"/>
|
||||||
|
</topic>
|
||||||
|
<topic name="AAA ">
|
||||||
|
<stats size="0" consumerCount="1" enqueueCount="0" dequeueCount="0"/>
|
||||||
|
</topic>
|
||||||
|
<topic name="ActiveMQ.Advisory.Topic ">
|
||||||
|
<stats size="0" consumerCount="0" enqueueCount="1" dequeueCount="0"/>
|
||||||
|
</topic>
|
||||||
|
<topic name="ActiveMQ.Advisory.Queue ">
|
||||||
|
<stats size="0" consumerCount="0" enqueueCount="2" dequeueCount="0"/>
|
||||||
|
</topic>
|
||||||
|
<topic name="AAAA ">
|
||||||
|
<stats size="0" consumerCount="0" enqueueCount="0" dequeueCount="0"/>
|
||||||
|
</topic>
|
||||||
|
</topics>`
|
||||||
|
|
||||||
|
topics := Topics{}
|
||||||
|
|
||||||
|
xml.Unmarshal([]byte(s), &topics)
|
||||||
|
|
||||||
|
records := make(map[string]interface{})
|
||||||
|
tags := make(map[string]string)
|
||||||
|
|
||||||
|
tags["name"] = "ActiveMQ.Advisory.MasterBroker "
|
||||||
|
tags["source"] = "localhost"
|
||||||
|
tags["port"] = "8161"
|
||||||
|
|
||||||
|
records["size"] = 0
|
||||||
|
records["consumer_count"] = 0
|
||||||
|
records["enqueue_count"] = 1
|
||||||
|
records["dequeue_count"] = 0
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
activeMQ := new(ActiveMQ)
|
||||||
|
activeMQ.Server = "localhost"
|
||||||
|
activeMQ.Port = 8161
|
||||||
|
|
||||||
|
activeMQ.GatherTopicsMetrics(&acc, topics)
|
||||||
|
acc.AssertContainsTaggedFields(t, "activemq_topics", records, tags)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGatherSubscribersMetrics(t *testing.T) {
|
||||||
|
|
||||||
|
s := `<subscribers>
|
||||||
|
<subscriber clientId="AAA" subscriptionName="AAA" connectionId="NOTSET" destinationName="AAA" selector="AA" active="no">
|
||||||
|
<stats pendingQueueSize="0" dispatchedQueueSize="0" dispatchedCounter="0" enqueueCounter="0" dequeueCounter="0"/>
|
||||||
|
</subscriber>
|
||||||
|
</subscribers>`
|
||||||
|
|
||||||
|
subscribers := Subscribers{}
|
||||||
|
|
||||||
|
xml.Unmarshal([]byte(s), &subscribers)
|
||||||
|
|
||||||
|
records := make(map[string]interface{})
|
||||||
|
tags := make(map[string]string)
|
||||||
|
|
||||||
|
tags["client_id"] = "AAA"
|
||||||
|
tags["subscription_name"] = "AAA"
|
||||||
|
tags["connection_id"] = "NOTSET"
|
||||||
|
tags["destination_name"] = "AAA"
|
||||||
|
tags["selector"] = "AA"
|
||||||
|
tags["active"] = "no"
|
||||||
|
tags["source"] = "localhost"
|
||||||
|
tags["port"] = "8161"
|
||||||
|
|
||||||
|
records["pending_queue_size"] = 0
|
||||||
|
records["dispatched_queue_size"] = 0
|
||||||
|
records["dispatched_counter"] = 0
|
||||||
|
records["enqueue_counter"] = 0
|
||||||
|
records["dequeue_counter"] = 0
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
activeMQ := new(ActiveMQ)
|
||||||
|
activeMQ.Server = "localhost"
|
||||||
|
activeMQ.Port = 8161
|
||||||
|
|
||||||
|
activeMQ.GatherSubscribersMetrics(&acc, subscribers)
|
||||||
|
acc.AssertContainsTaggedFields(t, "activemq_subscribers", records, tags)
|
||||||
|
}
|
|
@ -1,6 +1,7 @@
|
||||||
package all
|
package all
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/inputs/activemq"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/aerospike"
|
_ "github.com/influxdata/telegraf/plugins/inputs/aerospike"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/amqp_consumer"
|
_ "github.com/influxdata/telegraf/plugins/inputs/amqp_consumer"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/apache"
|
_ "github.com/influxdata/telegraf/plugins/inputs/apache"
|
||||||
|
|
Loading…
Reference in New Issue