From b9ff1d042b157d057ae574d9f04a64b8ccc2400b Mon Sep 17 00:00:00 2001 From: LABOUARDY Mohamed Date: Tue, 14 Aug 2018 01:34:59 +0200 Subject: [PATCH] Add ActiveMQ input plugin (#2689) --- README.md | 1 + plugins/inputs/activemq/README.md | 86 ++++++++ plugins/inputs/activemq/activemq.go | 261 +++++++++++++++++++++++ plugins/inputs/activemq/activemq_test.go | 139 ++++++++++++ plugins/inputs/all/all.go | 1 + 5 files changed, 488 insertions(+) create mode 100644 plugins/inputs/activemq/README.md create mode 100644 plugins/inputs/activemq/activemq.go create mode 100644 plugins/inputs/activemq/activemq_test.go diff --git a/README.md b/README.md index 6307b5356..700f0dd2a 100644 --- a/README.md +++ b/README.md @@ -127,6 +127,7 @@ configuration options. ## Input Plugins +* [activemq](./plugins/inputs/activemq) * [aerospike](./plugins/inputs/aerospike) * [amqp_consumer](./plugins/inputs/amqp_consumer) (rabbitmq) * [apache](./plugins/inputs/apache) diff --git a/plugins/inputs/activemq/README.md b/plugins/inputs/activemq/README.md new file mode 100644 index 000000000..b44d12d22 --- /dev/null +++ b/plugins/inputs/activemq/README.md @@ -0,0 +1,86 @@ +# Telegraf Input Plugin: ActiveMQ + +This plugin gather queues, topics & subscribers metrics using ActiveMQ Console API. + +### Configuration: + +```toml +# Description +[[inputs.activemq]] + ## Required ActiveMQ Endpoint + # server = "192.168.50.10" + + ## Required ActiveMQ port + # port = 8161 + + ## Credentials for basic HTTP authentication + # username = "admin" + # password = "admin" + + ## Required ActiveMQ webadmin root path + # webadmin = "admin" + + ## Maximum time to receive response. + # response_timeout = "5s" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification +``` + +### Measurements & Fields: + +Every effort was made to preserve the names based on the XML response from the ActiveMQ Console API. + +- activemq_queues: + - size + - consumer_count + - enqueue_count + - dequeue_count + - activemq_topics: + - size + - consumer_count + - enqueue_count + - dequeue_count + - subscribers_metrics: + - pending_queue_size + - dispatched_queue_size + - dispatched_counter + - enqueue_counter + - dequeue_counter + +### Tags: + +- activemq_queues: + - name + - source + - port +- activemq_topics: + - name + - source + - port +- activemq_subscribers: + - client_id + - subscription_name + - connection_id + - destination_name + - selector + - active + - source + - port + +### Example Output: + +``` +$ ./telegraf -config telegraf.conf -input-filter activemq -test +activemq_queues,name=sandra,host=88284b2fe51b,source=localhost,port=8161 consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000 +activemq_queues,name=Test,host=88284b2fe51b,source=localhost,port=8161 dequeue_count=0i,size=0i,consumer_count=0i,enqueue_count=0i 1492610703000000000 +activemq_topics,name=ActiveMQ.Advisory.MasterBroker\ ,host=88284b2fe51b,source=localhost,port=8161 size=0i,consumer_count=0i,enqueue_count=1i,dequeue_count=0i 1492610703000000000 +activemq_topics,host=88284b2fe51b,name=AAA\,source=localhost,port=8161 size=0i,consumer_count=1i,enqueue_count=0i,dequeue_count=0i 1492610703000000000 +activemq_topics,name=ActiveMQ.Advisory.Topic\,source=localhost,port=8161 ,host=88284b2fe51b enqueue_count=1i,dequeue_count=0i,size=0i,consumer_count=0i 1492610703000000000 +activemq_topics,name=ActiveMQ.Advisory.Queue\,source=localhost,port=8161 ,host=88284b2fe51b size=0i,consumer_count=0i,enqueue_count=2i,dequeue_count=0i 1492610703000000000 +activemq_topics,name=AAAA\ ,host=88284b2fe51b,source=localhost,port=8161 consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000 +activemq_subscribers,connection_id=NOTSET,destination_name=AAA,,source=localhost,port=8161,selector=AA,active=no,host=88284b2fe51b,client_id=AAA,subscription_name=AAA pending_queue_size=0i,dispatched_queue_size=0i,dispatched_counter=0i,enqueue_counter=0i,dequeue_counter=0i 1492610703000000000 +``` diff --git a/plugins/inputs/activemq/activemq.go b/plugins/inputs/activemq/activemq.go new file mode 100644 index 000000000..5b59730d2 --- /dev/null +++ b/plugins/inputs/activemq/activemq.go @@ -0,0 +1,261 @@ +package activemq + +import ( + "encoding/xml" + "fmt" + "io/ioutil" + "net/http" + "strconv" + "time" + + "strings" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/inputs" +) + +type ActiveMQ struct { + Server string `json:"server"` + Port int `json:"port"` + Username string `json:"username"` + Password string `json:"password"` + Webadmin string `json:"webadmin"` + ResponseTimeout internal.Duration + tls.ClientConfig + + client *http.Client +} + +type Topics struct { + XMLName xml.Name `xml:"topics"` + TopicItems []Topic `xml:"topic"` +} + +type Topic struct { + XMLName xml.Name `xml:"topic"` + Name string `xml:"name,attr"` + Stats Stats `xml:"stats"` +} + +type Subscribers struct { + XMLName xml.Name `xml:"subscribers"` + SubscriberItems []Subscriber `xml:"subscriber"` +} + +type Subscriber struct { + XMLName xml.Name `xml:"subscriber"` + ClientId string `xml:"clientId,attr"` + SubscriptionName string `xml:"subscriptionName,attr"` + ConnectionId string `xml:"connectionId,attr"` + DestinationName string `xml:"destinationName,attr"` + Selector string `xml:"selector,attr"` + Active string `xml:"active,attr"` + Stats Stats `xml:"stats"` +} + +type Queues struct { + XMLName xml.Name `xml:"queues"` + QueueItems []Queue `xml:"queue"` +} + +type Queue struct { + XMLName xml.Name `xml:"queue"` + Name string `xml:"name,attr"` + Stats Stats `xml:"stats"` +} + +type Stats struct { + XMLName xml.Name `xml:"stats"` + Size int `xml:"size,attr"` + ConsumerCount int `xml:"consumerCount,attr"` + EnqueueCount int `xml:"enqueueCount,attr"` + DequeueCount int `xml:"dequeueCount,attr"` + PendingQueueSize int `xml:"pendingQueueSize,attr"` + DispatchedQueueSize int `xml:"dispatchedQueueSize,attr"` + DispatchedCounter int `xml:"dispatchedCounter,attr"` + EnqueueCounter int `xml:"enqueueCounter,attr"` + DequeueCounter int `xml:"dequeueCounter,attr"` +} + +const ( + QUEUES_STATS = "queues" + TOPICS_STATS = "topics" + SUBSCRIBERS_STATS = "subscribers" +) + +var sampleConfig = ` + ## Required ActiveMQ Endpoint + # server = "192.168.50.10" + + ## Required ActiveMQ port + # port = 8161 + + ## Credentials for basic HTTP authentication + # username = "admin" + # password = "admin" + + ## Required ActiveMQ webadmin root path + # webadmin = "admin" + + ## Maximum time to receive response. + # response_timeout = "5s" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + ` + +func (a *ActiveMQ) Description() string { + return "Gather ActiveMQ metrics" +} + +func (a *ActiveMQ) SampleConfig() string { + return sampleConfig +} + +func (a *ActiveMQ) createHttpClient() (*http.Client, error) { + tlsCfg, err := a.ClientConfig.TLSConfig() + if err != nil { + return nil, err + } + + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + }, + Timeout: a.ResponseTimeout.Duration, + } + + return client, nil +} + +func (a *ActiveMQ) GetMetrics(keyword string) ([]byte, error) { + if a.ResponseTimeout.Duration < time.Second { + a.ResponseTimeout.Duration = time.Second * 5 + } + + if a.client == nil { + client, err := a.createHttpClient() + if err != nil { + return nil, err + } + a.client = client + } + url := fmt.Sprintf("http://%s:%d/%s/xml/%s.jsp", a.Server, a.Port, a.Webadmin, keyword) + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + + req.SetBasicAuth(a.Username, a.Password) + resp, err := a.client.Do(req) + if err != nil { + return nil, err + } + + defer resp.Body.Close() + return ioutil.ReadAll(resp.Body) +} + +func (a *ActiveMQ) GatherQueuesMetrics(acc telegraf.Accumulator, queues Queues) { + for _, queue := range queues.QueueItems { + records := make(map[string]interface{}) + tags := make(map[string]string) + + tags["name"] = strings.TrimSpace(queue.Name) + tags["source"] = a.Server + tags["port"] = strconv.Itoa(a.Port) + + records["size"] = queue.Stats.Size + records["consumer_count"] = queue.Stats.ConsumerCount + records["enqueue_count"] = queue.Stats.EnqueueCount + records["dequeue_count"] = queue.Stats.DequeueCount + + acc.AddFields("activemq_queues", records, tags) + } +} + +func (a *ActiveMQ) GatherTopicsMetrics(acc telegraf.Accumulator, topics Topics) { + for _, topic := range topics.TopicItems { + records := make(map[string]interface{}) + tags := make(map[string]string) + + tags["name"] = topic.Name + tags["source"] = a.Server + tags["port"] = strconv.Itoa(a.Port) + + records["size"] = topic.Stats.Size + records["consumer_count"] = topic.Stats.ConsumerCount + records["enqueue_count"] = topic.Stats.EnqueueCount + records["dequeue_count"] = topic.Stats.DequeueCount + + acc.AddFields("activemq_topics", records, tags) + } +} + +func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscribers Subscribers) { + for _, subscriber := range subscribers.SubscriberItems { + records := make(map[string]interface{}) + tags := make(map[string]string) + + tags["client_id"] = subscriber.ClientId + tags["subscription_name"] = subscriber.SubscriptionName + tags["connection_id"] = subscriber.ConnectionId + tags["destination_name"] = subscriber.DestinationName + tags["selector"] = subscriber.Selector + tags["active"] = subscriber.Active + tags["source"] = a.Server + tags["port"] = strconv.Itoa(a.Port) + + records["pending_queue_size"] = subscriber.Stats.PendingQueueSize + records["dispatched_queue_size"] = subscriber.Stats.DispatchedQueueSize + records["dispatched_counter"] = subscriber.Stats.DispatchedCounter + records["enqueue_counter"] = subscriber.Stats.EnqueueCounter + records["dequeue_counter"] = subscriber.Stats.DequeueCounter + + acc.AddFields("activemq_subscribers", records, tags) + } +} + +func (a *ActiveMQ) Gather(acc telegraf.Accumulator) error { + dataQueues, err := a.GetMetrics(QUEUES_STATS) + queues := Queues{} + err = xml.Unmarshal(dataQueues, &queues) + if err != nil { + return err + } + + dataTopics, err := a.GetMetrics(TOPICS_STATS) + topics := Topics{} + err = xml.Unmarshal(dataTopics, &topics) + if err != nil { + return err + } + + dataSubscribers, err := a.GetMetrics(SUBSCRIBERS_STATS) + subscribers := Subscribers{} + err = xml.Unmarshal(dataSubscribers, &subscribers) + if err != nil { + return err + } + + a.GatherQueuesMetrics(acc, queues) + a.GatherTopicsMetrics(acc, topics) + a.GatherSubscribersMetrics(acc, subscribers) + + return nil +} + +func init() { + inputs.Add("activemq", func() telegraf.Input { + return &ActiveMQ{ + Server: "localhost", + Port: 8161, + } + }) +} diff --git a/plugins/inputs/activemq/activemq_test.go b/plugins/inputs/activemq/activemq_test.go new file mode 100644 index 000000000..c277af3c5 --- /dev/null +++ b/plugins/inputs/activemq/activemq_test.go @@ -0,0 +1,139 @@ +package activemq + +import ( + "encoding/xml" + "testing" + + "github.com/influxdata/telegraf/testutil" +) + +func TestGatherQueuesMetrics(t *testing.T) { + + s := ` + + + +queueBrowse/sandra?view=rss&feedType=atom_1.0 +queueBrowse/sandra?view=rss&feedType=rss_2.0 + + + + + +queueBrowse/Test?view=rss&feedType=atom_1.0 +queueBrowse/Test?view=rss&feedType=rss_2.0 + + +` + + queues := Queues{} + + xml.Unmarshal([]byte(s), &queues) + + records := make(map[string]interface{}) + tags := make(map[string]string) + + tags["name"] = "Test" + tags["source"] = "localhost" + tags["port"] = "8161" + + records["size"] = 0 + records["consumer_count"] = 0 + records["enqueue_count"] = 0 + records["dequeue_count"] = 0 + + var acc testutil.Accumulator + + activeMQ := new(ActiveMQ) + activeMQ.Server = "localhost" + activeMQ.Port = 8161 + + activeMQ.GatherQueuesMetrics(&acc, queues) + acc.AssertContainsTaggedFields(t, "activemq_queues", records, tags) +} + +func TestGatherTopicsMetrics(t *testing.T) { + + s := ` + + + + + + + + + + + + + + + +` + + topics := Topics{} + + xml.Unmarshal([]byte(s), &topics) + + records := make(map[string]interface{}) + tags := make(map[string]string) + + tags["name"] = "ActiveMQ.Advisory.MasterBroker " + tags["source"] = "localhost" + tags["port"] = "8161" + + records["size"] = 0 + records["consumer_count"] = 0 + records["enqueue_count"] = 1 + records["dequeue_count"] = 0 + + var acc testutil.Accumulator + + activeMQ := new(ActiveMQ) + activeMQ.Server = "localhost" + activeMQ.Port = 8161 + + activeMQ.GatherTopicsMetrics(&acc, topics) + acc.AssertContainsTaggedFields(t, "activemq_topics", records, tags) +} + +func TestGatherSubscribersMetrics(t *testing.T) { + + s := ` + + + +` + + subscribers := Subscribers{} + + xml.Unmarshal([]byte(s), &subscribers) + + records := make(map[string]interface{}) + tags := make(map[string]string) + + tags["client_id"] = "AAA" + tags["subscription_name"] = "AAA" + tags["connection_id"] = "NOTSET" + tags["destination_name"] = "AAA" + tags["selector"] = "AA" + tags["active"] = "no" + tags["source"] = "localhost" + tags["port"] = "8161" + + records["pending_queue_size"] = 0 + records["dispatched_queue_size"] = 0 + records["dispatched_counter"] = 0 + records["enqueue_counter"] = 0 + records["dequeue_counter"] = 0 + + var acc testutil.Accumulator + + activeMQ := new(ActiveMQ) + activeMQ.Server = "localhost" + activeMQ.Port = 8161 + + activeMQ.GatherSubscribersMetrics(&acc, subscribers) + acc.AssertContainsTaggedFields(t, "activemq_subscribers", records, tags) +} diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index bbca99521..ac86fb879 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -1,6 +1,7 @@ package all import ( + _ "github.com/influxdata/telegraf/plugins/inputs/activemq" _ "github.com/influxdata/telegraf/plugins/inputs/aerospike" _ "github.com/influxdata/telegraf/plugins/inputs/amqp_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/apache"