diff --git a/plugins/inputs/activemq/README.md b/plugins/inputs/activemq/README.md
index b44d12d22..aba5a7f83 100644
--- a/plugins/inputs/activemq/README.md
+++ b/plugins/inputs/activemq/README.md
@@ -1,4 +1,4 @@
-# Telegraf Input Plugin: ActiveMQ
+# ActiveMQ Input Plugin
This plugin gather queues, topics & subscribers metrics using ActiveMQ Console API.
@@ -7,12 +7,14 @@ This plugin gather queues, topics & subscribers metrics using ActiveMQ Console A
```toml
# Description
[[inputs.activemq]]
- ## Required ActiveMQ Endpoint
- # server = "192.168.50.10"
+ ## ActiveMQ WebConsole URL
+ url = "http://127.0.0.1:8161"
- ## Required ActiveMQ port
+ ## Required ActiveMQ Endpoint
+ ## deprecated in 1.11; use the url option
+ # server = "192.168.50.10"
# port = 8161
-
+
## Credentials for basic HTTP authentication
# username = "admin"
# password = "admin"
@@ -22,46 +24,41 @@ This plugin gather queues, topics & subscribers metrics using ActiveMQ Console A
## Maximum time to receive response.
# response_timeout = "5s"
-
+
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
+ # insecure_skip_verify = false
```
-### Measurements & Fields:
+### Metrics
Every effort was made to preserve the names based on the XML response from the ActiveMQ Console API.
-- activemq_queues:
+- activemq_queues
+ - tags:
+ - name
+ - source
+ - port
+ - fields:
- size
- consumer_count
- enqueue_count
- dequeue_count
- - activemq_topics:
++ activemq_topics
+ - tags:
+ - name
+ - source
+ - port
+ - fields:
- size
- consumer_count
- enqueue_count
- dequeue_count
- - subscribers_metrics:
- - pending_queue_size
- - dispatched_queue_size
- - dispatched_counter
- - enqueue_counter
- - dequeue_counter
-
-### Tags:
-
-- activemq_queues:
- - name
- - source
- - port
-- activemq_topics:
- - name
- - source
- - port
-- activemq_subscribers:
+- activemq_subscribers
+ - tags:
- client_id
- subscription_name
- connection_id
@@ -70,11 +67,16 @@ Every effort was made to preserve the names based on the XML response from the A
- active
- source
- port
+ - fields:
+ - pending_queue_size
+ - dispatched_queue_size
+ - dispatched_counter
+ - enqueue_counter
+ - dequeue_counter
-### Example Output:
+### Example Output
```
-$ ./telegraf -config telegraf.conf -input-filter activemq -test
activemq_queues,name=sandra,host=88284b2fe51b,source=localhost,port=8161 consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000
activemq_queues,name=Test,host=88284b2fe51b,source=localhost,port=8161 dequeue_count=0i,size=0i,consumer_count=0i,enqueue_count=0i 1492610703000000000
activemq_topics,name=ActiveMQ.Advisory.MasterBroker\ ,host=88284b2fe51b,source=localhost,port=8161 size=0i,consumer_count=0i,enqueue_count=1i,dequeue_count=0i 1492610703000000000
diff --git a/plugins/inputs/activemq/activemq.go b/plugins/inputs/activemq/activemq.go
index 9cc9037ed..9d08661b7 100644
--- a/plugins/inputs/activemq/activemq.go
+++ b/plugins/inputs/activemq/activemq.go
@@ -5,10 +5,11 @@ import (
"fmt"
"io/ioutil"
"net/http"
+ "net/url"
+ "path"
"strconv"
- "time"
-
"strings"
+ "time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
@@ -17,15 +18,17 @@ import (
)
type ActiveMQ struct {
- Server string `json:"server"`
- Port int `json:"port"`
- Username string `json:"username"`
- Password string `json:"password"`
- Webadmin string `json:"webadmin"`
- ResponseTimeout internal.Duration
+ Server string `toml:"server"`
+ Port int `toml:"port"`
+ URL string `toml:"url"`
+ Username string `toml:"username"`
+ Password string `toml:"password"`
+ Webadmin string `toml:"webadmin"`
+ ResponseTimeout internal.Duration `toml:"response_timeout"`
tls.ClientConfig
- client *http.Client
+ client *http.Client
+ baseURL *url.URL
}
type Topics struct {
@@ -79,17 +82,13 @@ type Stats struct {
DequeueCounter int `xml:"dequeueCounter,attr"`
}
-const (
- QUEUES_STATS = "queues"
- TOPICS_STATS = "topics"
- SUBSCRIBERS_STATS = "subscribers"
-)
-
var sampleConfig = `
- ## Required ActiveMQ Endpoint
- # server = "192.168.50.10"
+ ## ActiveMQ WebConsole URL
+ url = "http://127.0.0.1:8161"
- ## Required ActiveMQ port
+ ## Required ActiveMQ Endpoint
+ ## deprecated in 1.11; use the url option
+ # server = "127.0.0.1"
# port = 8161
## Credentials for basic HTTP authentication
@@ -107,6 +106,7 @@ var sampleConfig = `
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
+ # insecure_skip_verify = false
`
func (a *ActiveMQ) Description() string {
@@ -133,32 +133,57 @@ func (a *ActiveMQ) createHttpClient() (*http.Client, error) {
return client, nil
}
-func (a *ActiveMQ) GetMetrics(keyword string) ([]byte, error) {
+func (a *ActiveMQ) Init() error {
if a.ResponseTimeout.Duration < time.Second {
a.ResponseTimeout.Duration = time.Second * 5
}
- if a.client == nil {
- client, err := a.createHttpClient()
+ var err error
+ u := &url.URL{Scheme: "http", Host: a.Server + ":" + strconv.Itoa(a.Port)}
+ if a.URL != "" {
+ u, err = url.Parse(a.URL)
if err != nil {
- return nil, err
+ return err
}
- a.client = client
}
- url := fmt.Sprintf("http://%s:%d/%s/xml/%s.jsp", a.Server, a.Port, a.Webadmin, keyword)
- req, err := http.NewRequest("GET", url, nil)
+ if !strings.HasPrefix(u.Scheme, "http") {
+ return fmt.Errorf("invalid scheme %q", u.Scheme)
+ }
+
+ if u.Hostname() == "" {
+ return fmt.Errorf("invalid hostname %q", u.Hostname())
+ }
+
+ a.baseURL = u
+
+ a.client, err = a.createHttpClient()
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (a *ActiveMQ) GetMetrics(u string) ([]byte, error) {
+ req, err := http.NewRequest("GET", u, nil)
if err != nil {
return nil, err
}
- req.SetBasicAuth(a.Username, a.Password)
+ if a.Username != "" || a.Password != "" {
+ req.SetBasicAuth(a.Username, a.Password)
+ }
+
resp, err := a.client.Do(req)
if err != nil {
return nil, err
}
-
defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("GET %s returned status %q", u, resp.Status)
+ }
+
return ioutil.ReadAll(resp.Body)
}
@@ -168,8 +193,8 @@ func (a *ActiveMQ) GatherQueuesMetrics(acc telegraf.Accumulator, queues Queues)
tags := make(map[string]string)
tags["name"] = strings.TrimSpace(queue.Name)
- tags["source"] = a.Server
- tags["port"] = strconv.Itoa(a.Port)
+ tags["source"] = a.baseURL.Hostname()
+ tags["port"] = a.baseURL.Port()
records["size"] = queue.Stats.Size
records["consumer_count"] = queue.Stats.ConsumerCount
@@ -186,8 +211,8 @@ func (a *ActiveMQ) GatherTopicsMetrics(acc telegraf.Accumulator, topics Topics)
tags := make(map[string]string)
tags["name"] = topic.Name
- tags["source"] = a.Server
- tags["port"] = strconv.Itoa(a.Port)
+ tags["source"] = a.baseURL.Hostname()
+ tags["port"] = a.baseURL.Port()
records["size"] = topic.Stats.Size
records["consumer_count"] = topic.Stats.ConsumerCount
@@ -209,8 +234,8 @@ func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscriber
tags["destination_name"] = subscriber.DestinationName
tags["selector"] = subscriber.Selector
tags["active"] = subscriber.Active
- tags["source"] = a.Server
- tags["port"] = strconv.Itoa(a.Port)
+ tags["source"] = a.baseURL.Hostname()
+ tags["port"] = a.baseURL.Port()
records["pending_queue_size"] = subscriber.Stats.PendingQueueSize
records["dispatched_queue_size"] = subscriber.Stats.DispatchedQueueSize
@@ -223,25 +248,34 @@ func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscriber
}
func (a *ActiveMQ) Gather(acc telegraf.Accumulator) error {
- dataQueues, err := a.GetMetrics(QUEUES_STATS)
+ dataQueues, err := a.GetMetrics(a.QueuesURL())
+ if err != nil {
+ return err
+ }
queues := Queues{}
err = xml.Unmarshal(dataQueues, &queues)
if err != nil {
- return err
+ return fmt.Errorf("queues XML unmarshal error: %v", err)
}
- dataTopics, err := a.GetMetrics(TOPICS_STATS)
+ dataTopics, err := a.GetMetrics(a.TopicsURL())
+ if err != nil {
+ return err
+ }
topics := Topics{}
err = xml.Unmarshal(dataTopics, &topics)
if err != nil {
- return err
+ return fmt.Errorf("topics XML unmarshal error: %v", err)
}
- dataSubscribers, err := a.GetMetrics(SUBSCRIBERS_STATS)
+ dataSubscribers, err := a.GetMetrics(a.SubscribersURL())
+ if err != nil {
+ return err
+ }
subscribers := Subscribers{}
err = xml.Unmarshal(dataSubscribers, &subscribers)
if err != nil {
- return err
+ return fmt.Errorf("subscribers XML unmarshal error: %v", err)
}
a.GatherQueuesMetrics(acc, queues)
@@ -251,11 +285,27 @@ func (a *ActiveMQ) Gather(acc telegraf.Accumulator) error {
return nil
}
+func (a *ActiveMQ) QueuesURL() string {
+ ref := url.URL{Path: path.Join("/", a.Webadmin, "/xml/queues.jsp")}
+ return a.baseURL.ResolveReference(&ref).String()
+}
+
+func (a *ActiveMQ) TopicsURL() string {
+ ref := url.URL{Path: path.Join("/", a.Webadmin, "/xml/topics.jsp")}
+ return a.baseURL.ResolveReference(&ref).String()
+}
+
+func (a *ActiveMQ) SubscribersURL() string {
+ ref := url.URL{Path: path.Join("/", a.Webadmin, "/xml/subscribers.jsp")}
+ return a.baseURL.ResolveReference(&ref).String()
+}
+
func init() {
inputs.Add("activemq", func() telegraf.Input {
return &ActiveMQ{
- Server: "localhost",
- Port: 8161,
+ Server: "localhost",
+ Port: 8161,
+ Webadmin: "admin",
}
})
}
diff --git a/plugins/inputs/activemq/activemq_test.go b/plugins/inputs/activemq/activemq_test.go
index c277af3c5..407a38177 100644
--- a/plugins/inputs/activemq/activemq_test.go
+++ b/plugins/inputs/activemq/activemq_test.go
@@ -2,9 +2,12 @@ package activemq
import (
"encoding/xml"
+ "net/http"
+ "net/http/httptest"
"testing"
"github.com/influxdata/telegraf/testutil"
+ "github.com/stretchr/testify/require"
)
func TestGatherQueuesMetrics(t *testing.T) {
@@ -47,6 +50,7 @@ func TestGatherQueuesMetrics(t *testing.T) {
activeMQ := new(ActiveMQ)
activeMQ.Server = "localhost"
activeMQ.Port = 8161
+ activeMQ.Init()
activeMQ.GatherQueuesMetrics(&acc, queues)
acc.AssertContainsTaggedFields(t, "activemq_queues", records, tags)
@@ -93,6 +97,7 @@ func TestGatherTopicsMetrics(t *testing.T) {
activeMQ := new(ActiveMQ)
activeMQ.Server = "localhost"
activeMQ.Port = 8161
+ activeMQ.Init()
activeMQ.GatherTopicsMetrics(&acc, topics)
acc.AssertContainsTaggedFields(t, "activemq_topics", records, tags)
@@ -133,7 +138,43 @@ func TestGatherSubscribersMetrics(t *testing.T) {
activeMQ := new(ActiveMQ)
activeMQ.Server = "localhost"
activeMQ.Port = 8161
+ activeMQ.Init()
activeMQ.GatherSubscribersMetrics(&acc, subscribers)
acc.AssertContainsTaggedFields(t, "activemq_subscribers", records, tags)
}
+
+func TestURLs(t *testing.T) {
+ ts := httptest.NewServer(http.NotFoundHandler())
+ defer ts.Close()
+
+ ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ switch r.URL.Path {
+ case "/admin/xml/queues.jsp":
+ w.WriteHeader(http.StatusOK)
+ w.Write([]byte(""))
+ case "/admin/xml/topics.jsp":
+ w.WriteHeader(http.StatusOK)
+ w.Write([]byte(""))
+ case "/admin/xml/subscribers.jsp":
+ w.WriteHeader(http.StatusOK)
+ w.Write([]byte(""))
+ default:
+ w.WriteHeader(http.StatusNotFound)
+ t.Fatalf("unexpected path: " + r.URL.Path)
+ }
+ })
+
+ plugin := ActiveMQ{
+ URL: "http://" + ts.Listener.Addr().String(),
+ Webadmin: "admin",
+ }
+ err := plugin.Init()
+ require.NoError(t, err)
+
+ var acc testutil.Accumulator
+ err = plugin.Gather(&acc)
+ require.NoError(t, err)
+
+ require.Len(t, acc.GetTelegrafMetrics(), 0)
+}