Fix https support in activemq input (#6092)

This commit is contained in:
Daniel Nelson 2019-07-09 10:40:14 -07:00 committed by GitHub
parent 601f499126
commit 130c5c5f12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 163 additions and 70 deletions

View File

@ -1,4 +1,4 @@
# Telegraf Input Plugin: ActiveMQ
# ActiveMQ Input Plugin
This plugin gather queues, topics & subscribers metrics using ActiveMQ Console API.
@ -7,10 +7,12 @@ This plugin gather queues, topics & subscribers metrics using ActiveMQ Console A
```toml
# Description
[[inputs.activemq]]
## Required ActiveMQ Endpoint
# server = "192.168.50.10"
## ActiveMQ WebConsole URL
url = "http://127.0.0.1:8161"
## Required ActiveMQ port
## Required ActiveMQ Endpoint
## deprecated in 1.11; use the url option
# server = "192.168.50.10"
# port = 8161
## Credentials for basic HTTP authentication
@ -28,40 +30,35 @@ This plugin gather queues, topics & subscribers metrics using ActiveMQ Console A
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
```
### Measurements & Fields:
### Metrics
Every effort was made to preserve the names based on the XML response from the ActiveMQ Console API.
- activemq_queues:
- activemq_queues
- tags:
- name
- source
- port
- fields:
- size
- consumer_count
- enqueue_count
- dequeue_count
- activemq_topics:
+ activemq_topics
- tags:
- name
- source
- port
- fields:
- size
- consumer_count
- enqueue_count
- dequeue_count
- subscribers_metrics:
- pending_queue_size
- dispatched_queue_size
- dispatched_counter
- enqueue_counter
- dequeue_counter
### Tags:
- activemq_queues:
- name
- source
- port
- activemq_topics:
- name
- source
- port
- activemq_subscribers:
- activemq_subscribers
- tags:
- client_id
- subscription_name
- connection_id
@ -70,11 +67,16 @@ Every effort was made to preserve the names based on the XML response from the A
- active
- source
- port
- fields:
- pending_queue_size
- dispatched_queue_size
- dispatched_counter
- enqueue_counter
- dequeue_counter
### Example Output:
### Example Output
```
$ ./telegraf -config telegraf.conf -input-filter activemq -test
activemq_queues,name=sandra,host=88284b2fe51b,source=localhost,port=8161 consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000
activemq_queues,name=Test,host=88284b2fe51b,source=localhost,port=8161 dequeue_count=0i,size=0i,consumer_count=0i,enqueue_count=0i 1492610703000000000
activemq_topics,name=ActiveMQ.Advisory.MasterBroker\ ,host=88284b2fe51b,source=localhost,port=8161 size=0i,consumer_count=0i,enqueue_count=1i,dequeue_count=0i 1492610703000000000

View File

@ -5,10 +5,11 @@ import (
"fmt"
"io/ioutil"
"net/http"
"net/url"
"path"
"strconv"
"time"
"strings"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
@ -17,15 +18,17 @@ import (
)
type ActiveMQ struct {
Server string `json:"server"`
Port int `json:"port"`
Username string `json:"username"`
Password string `json:"password"`
Webadmin string `json:"webadmin"`
ResponseTimeout internal.Duration
Server string `toml:"server"`
Port int `toml:"port"`
URL string `toml:"url"`
Username string `toml:"username"`
Password string `toml:"password"`
Webadmin string `toml:"webadmin"`
ResponseTimeout internal.Duration `toml:"response_timeout"`
tls.ClientConfig
client *http.Client
baseURL *url.URL
}
type Topics struct {
@ -79,17 +82,13 @@ type Stats struct {
DequeueCounter int `xml:"dequeueCounter,attr"`
}
const (
QUEUES_STATS = "queues"
TOPICS_STATS = "topics"
SUBSCRIBERS_STATS = "subscribers"
)
var sampleConfig = `
## Required ActiveMQ Endpoint
# server = "192.168.50.10"
## ActiveMQ WebConsole URL
url = "http://127.0.0.1:8161"
## Required ActiveMQ port
## Required ActiveMQ Endpoint
## deprecated in 1.11; use the url option
# server = "127.0.0.1"
# port = 8161
## Credentials for basic HTTP authentication
@ -107,6 +106,7 @@ var sampleConfig = `
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
`
func (a *ActiveMQ) Description() string {
@ -133,32 +133,57 @@ func (a *ActiveMQ) createHttpClient() (*http.Client, error) {
return client, nil
}
func (a *ActiveMQ) GetMetrics(keyword string) ([]byte, error) {
func (a *ActiveMQ) Init() error {
if a.ResponseTimeout.Duration < time.Second {
a.ResponseTimeout.Duration = time.Second * 5
}
if a.client == nil {
client, err := a.createHttpClient()
var err error
u := &url.URL{Scheme: "http", Host: a.Server + ":" + strconv.Itoa(a.Port)}
if a.URL != "" {
u, err = url.Parse(a.URL)
if err != nil {
return nil, err
return err
}
a.client = client
}
url := fmt.Sprintf("http://%s:%d/%s/xml/%s.jsp", a.Server, a.Port, a.Webadmin, keyword)
req, err := http.NewRequest("GET", url, nil)
if !strings.HasPrefix(u.Scheme, "http") {
return fmt.Errorf("invalid scheme %q", u.Scheme)
}
if u.Hostname() == "" {
return fmt.Errorf("invalid hostname %q", u.Hostname())
}
a.baseURL = u
a.client, err = a.createHttpClient()
if err != nil {
return err
}
return nil
}
func (a *ActiveMQ) GetMetrics(u string) ([]byte, error) {
req, err := http.NewRequest("GET", u, nil)
if err != nil {
return nil, err
}
if a.Username != "" || a.Password != "" {
req.SetBasicAuth(a.Username, a.Password)
}
resp, err := a.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("GET %s returned status %q", u, resp.Status)
}
return ioutil.ReadAll(resp.Body)
}
@ -168,8 +193,8 @@ func (a *ActiveMQ) GatherQueuesMetrics(acc telegraf.Accumulator, queues Queues)
tags := make(map[string]string)
tags["name"] = strings.TrimSpace(queue.Name)
tags["source"] = a.Server
tags["port"] = strconv.Itoa(a.Port)
tags["source"] = a.baseURL.Hostname()
tags["port"] = a.baseURL.Port()
records["size"] = queue.Stats.Size
records["consumer_count"] = queue.Stats.ConsumerCount
@ -186,8 +211,8 @@ func (a *ActiveMQ) GatherTopicsMetrics(acc telegraf.Accumulator, topics Topics)
tags := make(map[string]string)
tags["name"] = topic.Name
tags["source"] = a.Server
tags["port"] = strconv.Itoa(a.Port)
tags["source"] = a.baseURL.Hostname()
tags["port"] = a.baseURL.Port()
records["size"] = topic.Stats.Size
records["consumer_count"] = topic.Stats.ConsumerCount
@ -209,8 +234,8 @@ func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscriber
tags["destination_name"] = subscriber.DestinationName
tags["selector"] = subscriber.Selector
tags["active"] = subscriber.Active
tags["source"] = a.Server
tags["port"] = strconv.Itoa(a.Port)
tags["source"] = a.baseURL.Hostname()
tags["port"] = a.baseURL.Port()
records["pending_queue_size"] = subscriber.Stats.PendingQueueSize
records["dispatched_queue_size"] = subscriber.Stats.DispatchedQueueSize
@ -223,25 +248,34 @@ func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscriber
}
func (a *ActiveMQ) Gather(acc telegraf.Accumulator) error {
dataQueues, err := a.GetMetrics(QUEUES_STATS)
dataQueues, err := a.GetMetrics(a.QueuesURL())
if err != nil {
return err
}
queues := Queues{}
err = xml.Unmarshal(dataQueues, &queues)
if err != nil {
return err
return fmt.Errorf("queues XML unmarshal error: %v", err)
}
dataTopics, err := a.GetMetrics(TOPICS_STATS)
dataTopics, err := a.GetMetrics(a.TopicsURL())
if err != nil {
return err
}
topics := Topics{}
err = xml.Unmarshal(dataTopics, &topics)
if err != nil {
return err
return fmt.Errorf("topics XML unmarshal error: %v", err)
}
dataSubscribers, err := a.GetMetrics(SUBSCRIBERS_STATS)
dataSubscribers, err := a.GetMetrics(a.SubscribersURL())
if err != nil {
return err
}
subscribers := Subscribers{}
err = xml.Unmarshal(dataSubscribers, &subscribers)
if err != nil {
return err
return fmt.Errorf("subscribers XML unmarshal error: %v", err)
}
a.GatherQueuesMetrics(acc, queues)
@ -251,11 +285,27 @@ func (a *ActiveMQ) Gather(acc telegraf.Accumulator) error {
return nil
}
func (a *ActiveMQ) QueuesURL() string {
ref := url.URL{Path: path.Join("/", a.Webadmin, "/xml/queues.jsp")}
return a.baseURL.ResolveReference(&ref).String()
}
func (a *ActiveMQ) TopicsURL() string {
ref := url.URL{Path: path.Join("/", a.Webadmin, "/xml/topics.jsp")}
return a.baseURL.ResolveReference(&ref).String()
}
func (a *ActiveMQ) SubscribersURL() string {
ref := url.URL{Path: path.Join("/", a.Webadmin, "/xml/subscribers.jsp")}
return a.baseURL.ResolveReference(&ref).String()
}
func init() {
inputs.Add("activemq", func() telegraf.Input {
return &ActiveMQ{
Server: "localhost",
Port: 8161,
Webadmin: "admin",
}
})
}

View File

@ -2,9 +2,12 @@ package activemq
import (
"encoding/xml"
"net/http"
"net/http/httptest"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestGatherQueuesMetrics(t *testing.T) {
@ -47,6 +50,7 @@ func TestGatherQueuesMetrics(t *testing.T) {
activeMQ := new(ActiveMQ)
activeMQ.Server = "localhost"
activeMQ.Port = 8161
activeMQ.Init()
activeMQ.GatherQueuesMetrics(&acc, queues)
acc.AssertContainsTaggedFields(t, "activemq_queues", records, tags)
@ -93,6 +97,7 @@ func TestGatherTopicsMetrics(t *testing.T) {
activeMQ := new(ActiveMQ)
activeMQ.Server = "localhost"
activeMQ.Port = 8161
activeMQ.Init()
activeMQ.GatherTopicsMetrics(&acc, topics)
acc.AssertContainsTaggedFields(t, "activemq_topics", records, tags)
@ -133,7 +138,43 @@ func TestGatherSubscribersMetrics(t *testing.T) {
activeMQ := new(ActiveMQ)
activeMQ.Server = "localhost"
activeMQ.Port = 8161
activeMQ.Init()
activeMQ.GatherSubscribersMetrics(&acc, subscribers)
acc.AssertContainsTaggedFields(t, "activemq_subscribers", records, tags)
}
func TestURLs(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/admin/xml/queues.jsp":
w.WriteHeader(http.StatusOK)
w.Write([]byte("<queues></queues>"))
case "/admin/xml/topics.jsp":
w.WriteHeader(http.StatusOK)
w.Write([]byte("<topics></topics>"))
case "/admin/xml/subscribers.jsp":
w.WriteHeader(http.StatusOK)
w.Write([]byte("<subscribers></subscribers>"))
default:
w.WriteHeader(http.StatusNotFound)
t.Fatalf("unexpected path: " + r.URL.Path)
}
})
plugin := ActiveMQ{
URL: "http://" + ts.Listener.Addr().String(),
Webadmin: "admin",
}
err := plugin.Init()
require.NoError(t, err)
var acc testutil.Accumulator
err = plugin.Gather(&acc)
require.NoError(t, err)
require.Len(t, acc.GetTelegrafMetrics(), 0)
}