From 130c5c5f12f85a62df7841f19632d3d8d2286a7b Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 9 Jul 2019 10:40:14 -0700 Subject: [PATCH] Fix https support in activemq input (#6092) --- plugins/inputs/activemq/README.md | 60 ++++++----- plugins/inputs/activemq/activemq.go | 132 ++++++++++++++++------- plugins/inputs/activemq/activemq_test.go | 41 +++++++ 3 files changed, 163 insertions(+), 70 deletions(-) diff --git a/plugins/inputs/activemq/README.md b/plugins/inputs/activemq/README.md index b44d12d22..aba5a7f83 100644 --- a/plugins/inputs/activemq/README.md +++ b/plugins/inputs/activemq/README.md @@ -1,4 +1,4 @@ -# Telegraf Input Plugin: ActiveMQ +# ActiveMQ Input Plugin This plugin gather queues, topics & subscribers metrics using ActiveMQ Console API. @@ -7,12 +7,14 @@ This plugin gather queues, topics & subscribers metrics using ActiveMQ Console A ```toml # Description [[inputs.activemq]] - ## Required ActiveMQ Endpoint - # server = "192.168.50.10" + ## ActiveMQ WebConsole URL + url = "http://127.0.0.1:8161" - ## Required ActiveMQ port + ## Required ActiveMQ Endpoint + ## deprecated in 1.11; use the url option + # server = "192.168.50.10" # port = 8161 - + ## Credentials for basic HTTP authentication # username = "admin" # password = "admin" @@ -22,46 +24,41 @@ This plugin gather queues, topics & subscribers metrics using ActiveMQ Console A ## Maximum time to receive response. # response_timeout = "5s" - + ## Optional TLS Config # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification + # insecure_skip_verify = false ``` -### Measurements & Fields: +### Metrics Every effort was made to preserve the names based on the XML response from the ActiveMQ Console API. -- activemq_queues: +- activemq_queues + - tags: + - name + - source + - port + - fields: - size - consumer_count - enqueue_count - dequeue_count - - activemq_topics: ++ activemq_topics + - tags: + - name + - source + - port + - fields: - size - consumer_count - enqueue_count - dequeue_count - - subscribers_metrics: - - pending_queue_size - - dispatched_queue_size - - dispatched_counter - - enqueue_counter - - dequeue_counter - -### Tags: - -- activemq_queues: - - name - - source - - port -- activemq_topics: - - name - - source - - port -- activemq_subscribers: +- activemq_subscribers + - tags: - client_id - subscription_name - connection_id @@ -70,11 +67,16 @@ Every effort was made to preserve the names based on the XML response from the A - active - source - port + - fields: + - pending_queue_size + - dispatched_queue_size + - dispatched_counter + - enqueue_counter + - dequeue_counter -### Example Output: +### Example Output ``` -$ ./telegraf -config telegraf.conf -input-filter activemq -test activemq_queues,name=sandra,host=88284b2fe51b,source=localhost,port=8161 consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000 activemq_queues,name=Test,host=88284b2fe51b,source=localhost,port=8161 dequeue_count=0i,size=0i,consumer_count=0i,enqueue_count=0i 1492610703000000000 activemq_topics,name=ActiveMQ.Advisory.MasterBroker\ ,host=88284b2fe51b,source=localhost,port=8161 size=0i,consumer_count=0i,enqueue_count=1i,dequeue_count=0i 1492610703000000000 diff --git a/plugins/inputs/activemq/activemq.go b/plugins/inputs/activemq/activemq.go index 9cc9037ed..9d08661b7 100644 --- a/plugins/inputs/activemq/activemq.go +++ b/plugins/inputs/activemq/activemq.go @@ -5,10 +5,11 @@ import ( "fmt" "io/ioutil" "net/http" + "net/url" + "path" "strconv" - "time" - "strings" + "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" @@ -17,15 +18,17 @@ import ( ) type ActiveMQ struct { - Server string `json:"server"` - Port int `json:"port"` - Username string `json:"username"` - Password string `json:"password"` - Webadmin string `json:"webadmin"` - ResponseTimeout internal.Duration + Server string `toml:"server"` + Port int `toml:"port"` + URL string `toml:"url"` + Username string `toml:"username"` + Password string `toml:"password"` + Webadmin string `toml:"webadmin"` + ResponseTimeout internal.Duration `toml:"response_timeout"` tls.ClientConfig - client *http.Client + client *http.Client + baseURL *url.URL } type Topics struct { @@ -79,17 +82,13 @@ type Stats struct { DequeueCounter int `xml:"dequeueCounter,attr"` } -const ( - QUEUES_STATS = "queues" - TOPICS_STATS = "topics" - SUBSCRIBERS_STATS = "subscribers" -) - var sampleConfig = ` - ## Required ActiveMQ Endpoint - # server = "192.168.50.10" + ## ActiveMQ WebConsole URL + url = "http://127.0.0.1:8161" - ## Required ActiveMQ port + ## Required ActiveMQ Endpoint + ## deprecated in 1.11; use the url option + # server = "127.0.0.1" # port = 8161 ## Credentials for basic HTTP authentication @@ -107,6 +106,7 @@ var sampleConfig = ` # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification + # insecure_skip_verify = false ` func (a *ActiveMQ) Description() string { @@ -133,32 +133,57 @@ func (a *ActiveMQ) createHttpClient() (*http.Client, error) { return client, nil } -func (a *ActiveMQ) GetMetrics(keyword string) ([]byte, error) { +func (a *ActiveMQ) Init() error { if a.ResponseTimeout.Duration < time.Second { a.ResponseTimeout.Duration = time.Second * 5 } - if a.client == nil { - client, err := a.createHttpClient() + var err error + u := &url.URL{Scheme: "http", Host: a.Server + ":" + strconv.Itoa(a.Port)} + if a.URL != "" { + u, err = url.Parse(a.URL) if err != nil { - return nil, err + return err } - a.client = client } - url := fmt.Sprintf("http://%s:%d/%s/xml/%s.jsp", a.Server, a.Port, a.Webadmin, keyword) - req, err := http.NewRequest("GET", url, nil) + if !strings.HasPrefix(u.Scheme, "http") { + return fmt.Errorf("invalid scheme %q", u.Scheme) + } + + if u.Hostname() == "" { + return fmt.Errorf("invalid hostname %q", u.Hostname()) + } + + a.baseURL = u + + a.client, err = a.createHttpClient() + if err != nil { + return err + } + return nil +} + +func (a *ActiveMQ) GetMetrics(u string) ([]byte, error) { + req, err := http.NewRequest("GET", u, nil) if err != nil { return nil, err } - req.SetBasicAuth(a.Username, a.Password) + if a.Username != "" || a.Password != "" { + req.SetBasicAuth(a.Username, a.Password) + } + resp, err := a.client.Do(req) if err != nil { return nil, err } - defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("GET %s returned status %q", u, resp.Status) + } + return ioutil.ReadAll(resp.Body) } @@ -168,8 +193,8 @@ func (a *ActiveMQ) GatherQueuesMetrics(acc telegraf.Accumulator, queues Queues) tags := make(map[string]string) tags["name"] = strings.TrimSpace(queue.Name) - tags["source"] = a.Server - tags["port"] = strconv.Itoa(a.Port) + tags["source"] = a.baseURL.Hostname() + tags["port"] = a.baseURL.Port() records["size"] = queue.Stats.Size records["consumer_count"] = queue.Stats.ConsumerCount @@ -186,8 +211,8 @@ func (a *ActiveMQ) GatherTopicsMetrics(acc telegraf.Accumulator, topics Topics) tags := make(map[string]string) tags["name"] = topic.Name - tags["source"] = a.Server - tags["port"] = strconv.Itoa(a.Port) + tags["source"] = a.baseURL.Hostname() + tags["port"] = a.baseURL.Port() records["size"] = topic.Stats.Size records["consumer_count"] = topic.Stats.ConsumerCount @@ -209,8 +234,8 @@ func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscriber tags["destination_name"] = subscriber.DestinationName tags["selector"] = subscriber.Selector tags["active"] = subscriber.Active - tags["source"] = a.Server - tags["port"] = strconv.Itoa(a.Port) + tags["source"] = a.baseURL.Hostname() + tags["port"] = a.baseURL.Port() records["pending_queue_size"] = subscriber.Stats.PendingQueueSize records["dispatched_queue_size"] = subscriber.Stats.DispatchedQueueSize @@ -223,25 +248,34 @@ func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscriber } func (a *ActiveMQ) Gather(acc telegraf.Accumulator) error { - dataQueues, err := a.GetMetrics(QUEUES_STATS) + dataQueues, err := a.GetMetrics(a.QueuesURL()) + if err != nil { + return err + } queues := Queues{} err = xml.Unmarshal(dataQueues, &queues) if err != nil { - return err + return fmt.Errorf("queues XML unmarshal error: %v", err) } - dataTopics, err := a.GetMetrics(TOPICS_STATS) + dataTopics, err := a.GetMetrics(a.TopicsURL()) + if err != nil { + return err + } topics := Topics{} err = xml.Unmarshal(dataTopics, &topics) if err != nil { - return err + return fmt.Errorf("topics XML unmarshal error: %v", err) } - dataSubscribers, err := a.GetMetrics(SUBSCRIBERS_STATS) + dataSubscribers, err := a.GetMetrics(a.SubscribersURL()) + if err != nil { + return err + } subscribers := Subscribers{} err = xml.Unmarshal(dataSubscribers, &subscribers) if err != nil { - return err + return fmt.Errorf("subscribers XML unmarshal error: %v", err) } a.GatherQueuesMetrics(acc, queues) @@ -251,11 +285,27 @@ func (a *ActiveMQ) Gather(acc telegraf.Accumulator) error { return nil } +func (a *ActiveMQ) QueuesURL() string { + ref := url.URL{Path: path.Join("/", a.Webadmin, "/xml/queues.jsp")} + return a.baseURL.ResolveReference(&ref).String() +} + +func (a *ActiveMQ) TopicsURL() string { + ref := url.URL{Path: path.Join("/", a.Webadmin, "/xml/topics.jsp")} + return a.baseURL.ResolveReference(&ref).String() +} + +func (a *ActiveMQ) SubscribersURL() string { + ref := url.URL{Path: path.Join("/", a.Webadmin, "/xml/subscribers.jsp")} + return a.baseURL.ResolveReference(&ref).String() +} + func init() { inputs.Add("activemq", func() telegraf.Input { return &ActiveMQ{ - Server: "localhost", - Port: 8161, + Server: "localhost", + Port: 8161, + Webadmin: "admin", } }) } diff --git a/plugins/inputs/activemq/activemq_test.go b/plugins/inputs/activemq/activemq_test.go index c277af3c5..407a38177 100644 --- a/plugins/inputs/activemq/activemq_test.go +++ b/plugins/inputs/activemq/activemq_test.go @@ -2,9 +2,12 @@ package activemq import ( "encoding/xml" + "net/http" + "net/http/httptest" "testing" "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" ) func TestGatherQueuesMetrics(t *testing.T) { @@ -47,6 +50,7 @@ func TestGatherQueuesMetrics(t *testing.T) { activeMQ := new(ActiveMQ) activeMQ.Server = "localhost" activeMQ.Port = 8161 + activeMQ.Init() activeMQ.GatherQueuesMetrics(&acc, queues) acc.AssertContainsTaggedFields(t, "activemq_queues", records, tags) @@ -93,6 +97,7 @@ func TestGatherTopicsMetrics(t *testing.T) { activeMQ := new(ActiveMQ) activeMQ.Server = "localhost" activeMQ.Port = 8161 + activeMQ.Init() activeMQ.GatherTopicsMetrics(&acc, topics) acc.AssertContainsTaggedFields(t, "activemq_topics", records, tags) @@ -133,7 +138,43 @@ func TestGatherSubscribersMetrics(t *testing.T) { activeMQ := new(ActiveMQ) activeMQ.Server = "localhost" activeMQ.Port = 8161 + activeMQ.Init() activeMQ.GatherSubscribersMetrics(&acc, subscribers) acc.AssertContainsTaggedFields(t, "activemq_subscribers", records, tags) } + +func TestURLs(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/admin/xml/queues.jsp": + w.WriteHeader(http.StatusOK) + w.Write([]byte("")) + case "/admin/xml/topics.jsp": + w.WriteHeader(http.StatusOK) + w.Write([]byte("")) + case "/admin/xml/subscribers.jsp": + w.WriteHeader(http.StatusOK) + w.Write([]byte("")) + default: + w.WriteHeader(http.StatusNotFound) + t.Fatalf("unexpected path: " + r.URL.Path) + } + }) + + plugin := ActiveMQ{ + URL: "http://" + ts.Listener.Addr().String(), + Webadmin: "admin", + } + err := plugin.Init() + require.NoError(t, err) + + var acc testutil.Accumulator + err = plugin.Gather(&acc) + require.NoError(t, err) + + require.Len(t, acc.GetTelegrafMetrics(), 0) +}