Add redesigned Jolokia input plugin (#2278)

This commit is contained in:
Dylan Meissner 2017-09-26 17:34:46 -07:00 committed by Daniel Nelson
parent cadafa6405
commit ee26191eb5
14 changed files with 2313 additions and 0 deletions

View File

@ -36,6 +36,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/ipmi_sensor"
_ "github.com/influxdata/telegraf/plugins/inputs/iptables"
_ "github.com/influxdata/telegraf/plugins/inputs/jolokia"
_ "github.com/influxdata/telegraf/plugins/inputs/jolokia2"
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer_legacy"
_ "github.com/influxdata/telegraf/plugins/inputs/kapacitor"

View File

@ -0,0 +1,169 @@
# Jolokia2 Input Plugins
The [Jolokia](http://jolokia.org) _agent_ and _proxy_ input plugins collect JMX metrics from an HTTP endpoint using Jolokia's [JSON-over-HTTP protocol](https://jolokia.org/reference/html/protocol.html).
## Jolokia Agent Configuration
The `jolokia2_agent` input plugin reads JMX metrics from one or more [Jolokia agent](https://jolokia.org/agent/jvm.html) REST endpoints.
```toml
[[inputs.jolokia2_agent]]
urls = ["http://agent:8080/jolokia"]
[[inputs.jolokia2_agent.metric]]
name = "jvm_runtime"
mbean = "java.lang:type=Runtime"
paths = ["Uptime"]
```
Optionally, specify SSL options for communicating with agents:
```toml
[[inputs.jolokia2_agent]]
urls = ["https://agent:8080/jolokia"]
ssl_ca = "/var/private/ca.pem"
ssl_cert = "/var/private/client.pem"
ssl_key = "/var/private/client-key.pem"
#insecure_skip_verify = false
[[inputs.jolokia2_agent.metric]]
name = "jvm_runtime"
mbean = "java.lang:type=Runtime"
paths = ["Uptime"]
```
## Jolokia Proxy Configuration
The `jolokia2_proxy` input plugin reads JMX metrics from one or more _targets_ by interacting with a [Jolokia proxy](https://jolokia.org/features/proxy.html) REST endpoint.
```toml
[[inputs.jolokia2_proxy]]
url = "http://proxy:8080/jolokia"
#default_target_username = ""
#default_target_password = ""
[[inputs.jolokia2_proxy.target]]
url = "service:jmx:rmi:///jndi/rmi://targethost:9999/jmxrmi"
# username = ""
# password = ""
[[inputs.jolokia2_proxy.metric]]
name = "jvm_runtime"
mbean = "java.lang:type=Runtime"
paths = ["Uptime"]
```
Optionally, specify SSL options for communicating with proxies:
```toml
[[inputs.jolokia2_proxy]]
url = "https://proxy:8080/jolokia"
ssl_ca = "/var/private/ca.pem"
ssl_cert = "/var/private/client.pem"
ssl_key = "/var/private/client-key.pem"
#insecure_skip_verify = false
#default_target_username = ""
#default_target_password = ""
[[inputs.jolokia2_proxy.target]]
url = "service:jmx:rmi:///jndi/rmi://targethost:9999/jmxrmi"
# username = ""
# password = ""
[[inputs.jolokia2_agent.metric]]
name = "jvm_runtime"
mbean = "java.lang:type=Runtime"
paths = ["Uptime"]
```
## Jolokia Metric Configuration
Each `metric` declaration generates a Jolokia request to fetch telemetry from a JMX MBean.
| Key | Required | Description |
|----------------|----------|-------------|
| `mbean` | yes | The object name of a JMX MBean. MBean property-key values can contain a wildcard `*`, allowing you to fetch multiple MBeans with one declaration. |
| `paths` | no | A list of MBean attributes to read. |
| `tag_keys` | no | A list of MBean property-key names to convert into tags. The property-key name becomes the tag name, while the property-key value becomes the tag value. |
| `tag_prefix` | no | A string to prepend to the tag names produced by this `metric` declaration. |
| `field_name` | no | A string to set as the name of the field produced by this metric; can contain substitutions. |
| `field_prefix` | no | A string to prepend to the field names produced by this `metric` declaration; can contain substitutions. |
Use `paths` to refine which fields to collect.
```toml
[[inputs.jolokia2_agent.metric]]
name = "jvm_memory"
mbean = "java.lang:type=Memory"
paths = ["HeapMemoryUsage", "NonHeapMemoryUsage", "ObjectPendingFinalizationCount"]
```
The preceeding `jvm_memory` `metric` declaration produces the following output:
```
jvm_memory HeapMemoryUsage.committed=4294967296,HeapMemoryUsage.init=4294967296,HeapMemoryUsage.max=4294967296,HeapMemoryUsage.used=1750658992,NonHeapMemoryUsage.committed=67350528,NonHeapMemoryUsage.init=2555904,NonHeapMemoryUsage.max=-1,NonHeapMemoryUsage.used=65821352,ObjectPendingFinalizationCount=0 1503762436000000000
```
Use `*` wildcards against `mbean` property-key values to create distinct series by capturing values into `tag_keys`.
```toml
[[inputs.jolokia2_agent.metric]]
name = "jvm_garbage_collector"
mbean = "java.lang:name=*,type=GarbageCollector"
paths = ["CollectionTime", "CollectionCount"]
tag_keys = ["name"]
```
Since `name=*` matches both `G1 Old Generation` and `G1 Young Generation`, and `name` is used as a tag, the preceeding `jvm_garbage_collector` `metric` declaration produces two metrics.
```
jvm_garbage_collector,name=G1\ Old\ Generation CollectionCount=0,CollectionTime=0 1503762520000000000
jvm_garbage_collector,name=G1\ Young\ Generation CollectionTime=32,CollectionCount=2 1503762520000000000
```
Use `tag_prefix` along with `tag_keys` to add detail to tag names.
```toml
[[inputs.jolokia2_agent.metric]]
name = "jvm_memory_pool"
mbean = "java.lang:name=*,type=MemoryPool"
paths = ["Usage", "PeakUsage", "CollectionUsage"]
tag_keys = ["name"]
tag_prefix = "pool_"
```
The preceeding `jvm_memory_pool` `metric` declaration produces six metrics, each with a distinct `pool_name` tag.
```
jvm_memory_pool,pool_name=Compressed\ Class\ Space PeakUsage.max=1073741824,PeakUsage.committed=3145728,PeakUsage.init=0,Usage.committed=3145728,Usage.init=0,PeakUsage.used=3017976,Usage.max=1073741824,Usage.used=3017976 1503764025000000000
jvm_memory_pool,pool_name=Code\ Cache PeakUsage.init=2555904,PeakUsage.committed=6291456,Usage.committed=6291456,PeakUsage.used=6202752,PeakUsage.max=251658240,Usage.used=6210368,Usage.max=251658240,Usage.init=2555904 1503764025000000000
jvm_memory_pool,pool_name=G1\ Eden\ Space CollectionUsage.max=-1,PeakUsage.committed=56623104,PeakUsage.init=56623104,PeakUsage.used=53477376,Usage.max=-1,Usage.committed=49283072,Usage.used=19922944,CollectionUsage.committed=49283072,CollectionUsage.init=56623104,CollectionUsage.used=0,PeakUsage.max=-1,Usage.init=56623104 1503764025000000000
jvm_memory_pool,pool_name=G1\ Old\ Gen CollectionUsage.max=1073741824,CollectionUsage.committed=0,PeakUsage.max=1073741824,PeakUsage.committed=1017118720,PeakUsage.init=1017118720,PeakUsage.used=137032208,Usage.max=1073741824,CollectionUsage.init=1017118720,Usage.committed=1017118720,Usage.init=1017118720,Usage.used=134708752,CollectionUsage.used=0 1503764025000000000
jvm_memory_pool,pool_name=G1\ Survivor\ Space Usage.max=-1,Usage.init=0,CollectionUsage.max=-1,CollectionUsage.committed=7340032,CollectionUsage.used=7340032,PeakUsage.committed=7340032,Usage.committed=7340032,Usage.used=7340032,CollectionUsage.init=0,PeakUsage.max=-1,PeakUsage.init=0,PeakUsage.used=7340032 1503764025000000000
jvm_memory_pool,pool_name=Metaspace PeakUsage.init=0,PeakUsage.used=21852224,PeakUsage.max=-1,Usage.max=-1,Usage.committed=22282240,Usage.init=0,Usage.used=21852224,PeakUsage.committed=22282240 1503764025000000000
```
Use substitutions to create fields and field prefixes with MBean property-keys captured by wildcards. In the following example, `$1` represents the value of the property-key `name`, and `$2` represents the value of the property-key `topic`.
```toml
[[inputs.jolokia2_agent.metric]]
name = "kafka_topic"
mbean = "kafka.server:name=*,topic=*,type=BrokerTopicMetrics"
field_prefix = "$1"
tag_keys = ["topic"]
```
The preceeding `kafka_topic` `metric` declaration produces a metric per Kafka topic. The `name` Mbean property-key is used as a field prefix to aid in gathering fields together into the single metric.
```
kafka_topic,topic=my-topic BytesOutPerSec.MeanRate=0,FailedProduceRequestsPerSec.MeanRate=0,BytesOutPerSec.EventType="bytes",BytesRejectedPerSec.Count=0,FailedProduceRequestsPerSec.RateUnit="SECONDS",FailedProduceRequestsPerSec.EventType="requests",MessagesInPerSec.RateUnit="SECONDS",BytesInPerSec.EventType="bytes",BytesOutPerSec.RateUnit="SECONDS",BytesInPerSec.OneMinuteRate=0,FailedFetchRequestsPerSec.EventType="requests",TotalFetchRequestsPerSec.MeanRate=146.301533938701,BytesOutPerSec.FifteenMinuteRate=0,TotalProduceRequestsPerSec.MeanRate=0,BytesRejectedPerSec.FifteenMinuteRate=0,MessagesInPerSec.FiveMinuteRate=0,BytesInPerSec.Count=0,BytesRejectedPerSec.MeanRate=0,FailedFetchRequestsPerSec.MeanRate=0,FailedFetchRequestsPerSec.FiveMinuteRate=0,FailedFetchRequestsPerSec.FifteenMinuteRate=0,FailedProduceRequestsPerSec.Count=0,TotalFetchRequestsPerSec.FifteenMinuteRate=128.59314292334466,TotalFetchRequestsPerSec.OneMinuteRate=126.71551273850747,TotalFetchRequestsPerSec.Count=1353483,TotalProduceRequestsPerSec.FifteenMinuteRate=0,FailedFetchRequestsPerSec.OneMinuteRate=0,FailedFetchRequestsPerSec.Count=0,FailedProduceRequestsPerSec.FifteenMinuteRate=0,TotalFetchRequestsPerSec.FiveMinuteRate=130.8516148751592,TotalFetchRequestsPerSec.RateUnit="SECONDS",BytesRejectedPerSec.RateUnit="SECONDS",BytesInPerSec.MeanRate=0,FailedFetchRequestsPerSec.RateUnit="SECONDS",BytesRejectedPerSec.OneMinuteRate=0,BytesOutPerSec.Count=0,BytesOutPerSec.OneMinuteRate=0,MessagesInPerSec.FifteenMinuteRate=0,MessagesInPerSec.MeanRate=0,BytesInPerSec.FiveMinuteRate=0,TotalProduceRequestsPerSec.RateUnit="SECONDS",FailedProduceRequestsPerSec.OneMinuteRate=0,TotalProduceRequestsPerSec.EventType="requests",BytesRejectedPerSec.FiveMinuteRate=0,BytesRejectedPerSec.EventType="bytes",BytesOutPerSec.FiveMinuteRate=0,FailedProduceRequestsPerSec.FiveMinuteRate=0,MessagesInPerSec.Count=0,TotalProduceRequestsPerSec.FiveMinuteRate=0,TotalProduceRequestsPerSec.OneMinuteRate=0,MessagesInPerSec.EventType="messages",MessagesInPerSec.OneMinuteRate=0,TotalFetchRequestsPerSec.EventType="requests",BytesInPerSec.RateUnit="SECONDS",BytesInPerSec.FifteenMinuteRate=0,TotalProduceRequestsPerSec.Count=0 1503767532000000000
```
Both `jolokia2_agent` and `jolokia2_proxy` plugins support default configurations that apply to every `metric` declaration.
| Key | Default Value | Description |
|---------------------------|---------------|-------------|
| `default_field_separator` | `.` | A character to use to join Mbean attributes when creating fields. |
| `default_field_prefix` | _None_ | A string to prepend to the field names produced by all `metric` declarations. |
| `default_tag_prefix` | _None_ | A string to prepend to the tag names produced by all `metric` declarations. |

View File

@ -0,0 +1,271 @@
package jolokia2
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"path"
"time"
"github.com/influxdata/telegraf/internal"
)
type Client struct {
URL string
client *http.Client
config *ClientConfig
}
type ClientConfig struct {
ResponseTimeout time.Duration
Username string
Password string
SSLCA string
SSLCert string
SSLKey string
InsecureSkipVerify bool
ProxyConfig *ProxyConfig
}
type ProxyConfig struct {
DefaultTargetUsername string
DefaultTargetPassword string
Targets []ProxyTargetConfig
}
type ProxyTargetConfig struct {
Username string
Password string
URL string
}
type ReadRequest struct {
Mbean string
Attributes []string
Path string
}
type ReadResponse struct {
Status int
Value interface{}
RequestMbean string
RequestAttributes []string
RequestPath string
RequestTarget string
}
// Jolokia JSON request object. Example: {
// "type": "read",
// "mbean: "java.lang:type="Runtime",
// "attribute": "Uptime",
// "target": {
// "url: "service:jmx:rmi:///jndi/rmi://target:9010/jmxrmi"
// }
// }
type jolokiaRequest struct {
Type string `json:"type"`
Mbean string `json:"mbean"`
Attribute interface{} `json:"attribute,omitempty"`
Path string `json:"path,omitempty"`
Target *jolokiaTarget `json:"target,omitempty"`
}
type jolokiaTarget struct {
URL string `json:"url"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
}
// Jolokia JSON response object. Example: {
// "request": {
// "type": "read"
// "mbean": "java.lang:type=Runtime",
// "attribute": "Uptime",
// "target": {
// "url": "service:jmx:rmi:///jndi/rmi://target:9010/jmxrmi"
// }
// },
// "value": 1214083,
// "timestamp": 1488059309,
// "status": 200
// }
type jolokiaResponse struct {
Request jolokiaRequest `json:"request"`
Value interface{} `json:"value"`
Status int `json:"status"`
}
func NewClient(url string, config *ClientConfig) (*Client, error) {
tlsConfig, err := internal.GetTLSConfig(
config.SSLCert, config.SSLKey, config.SSLCA, config.InsecureSkipVerify)
if err != nil {
return nil, err
}
transport := &http.Transport{
ResponseHeaderTimeout: config.ResponseTimeout,
TLSClientConfig: tlsConfig,
}
client := &http.Client{
Transport: transport,
Timeout: config.ResponseTimeout,
}
return &Client{
URL: url,
config: config,
client: client,
}, nil
}
func (c *Client) read(requests []ReadRequest) ([]ReadResponse, error) {
jrequests := makeJolokiaRequests(requests, c.config.ProxyConfig)
requestBody, err := json.Marshal(jrequests)
if err != nil {
return nil, err
}
requestUrl, err := formatReadUrl(c.URL, c.config.Username, c.config.Password)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", requestUrl, bytes.NewBuffer(requestBody))
req.Header.Add("Content-type", "application/json")
resp, err := c.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)",
c.URL, resp.StatusCode, http.StatusText(resp.StatusCode), http.StatusOK, http.StatusText(http.StatusOK))
}
responseBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var jresponses []jolokiaResponse
if err = json.Unmarshal([]byte(responseBody), &jresponses); err != nil {
return nil, fmt.Errorf("Error decoding JSON response: %s: %s", err, responseBody)
}
return makeReadResponses(jresponses), nil
}
func makeJolokiaRequests(rrequests []ReadRequest, proxyConfig *ProxyConfig) []jolokiaRequest {
jrequests := make([]jolokiaRequest, 0)
if proxyConfig == nil {
for _, rr := range rrequests {
jrequests = append(jrequests, makeJolokiaRequest(rr, nil))
}
} else {
for _, t := range proxyConfig.Targets {
if t.Username == "" {
t.Username = proxyConfig.DefaultTargetUsername
}
if t.Password == "" {
t.Password = proxyConfig.DefaultTargetPassword
}
for _, rr := range rrequests {
jtarget := &jolokiaTarget{
URL: t.URL,
User: t.Username,
Password: t.Password,
}
jrequests = append(jrequests, makeJolokiaRequest(rr, jtarget))
}
}
}
return jrequests
}
func makeJolokiaRequest(rrequest ReadRequest, jtarget *jolokiaTarget) jolokiaRequest {
jrequest := jolokiaRequest{
Type: "read",
Mbean: rrequest.Mbean,
Path: rrequest.Path,
Target: jtarget,
}
if len(rrequest.Attributes) == 1 {
jrequest.Attribute = rrequest.Attributes[0]
}
if len(rrequest.Attributes) > 1 {
jrequest.Attribute = rrequest.Attributes
}
return jrequest
}
func makeReadResponses(jresponses []jolokiaResponse) []ReadResponse {
rresponses := make([]ReadResponse, 0)
for _, jr := range jresponses {
rrequest := ReadRequest{
Mbean: jr.Request.Mbean,
Path: jr.Request.Path,
Attributes: []string{},
}
attrValue := jr.Request.Attribute
if attrValue != nil {
attribute, ok := attrValue.(string)
if ok {
rrequest.Attributes = []string{attribute}
} else {
attributes, _ := attrValue.([]interface{})
rrequest.Attributes = make([]string, len(attributes))
for i, attr := range attributes {
rrequest.Attributes[i] = attr.(string)
}
}
}
rresponse := ReadResponse{
Value: jr.Value,
Status: jr.Status,
RequestMbean: rrequest.Mbean,
RequestAttributes: rrequest.Attributes,
RequestPath: rrequest.Path,
}
if jtarget := jr.Request.Target; jtarget != nil {
rresponse.RequestTarget = jtarget.URL
}
rresponses = append(rresponses, rresponse)
}
return rresponses
}
func formatReadUrl(configUrl, username, password string) (string, error) {
parsedUrl, err := url.Parse(configUrl)
if err != nil {
return "", err
}
readUrl := url.URL{
Host: parsedUrl.Host,
Scheme: parsedUrl.Scheme,
}
if username != "" || password != "" {
readUrl.User = url.UserPassword(username, password)
}
readUrl.Path = path.Join(parsedUrl.Path, "read")
readUrl.Query().Add("ignoreErrors", "true")
return readUrl.String(), nil
}

View File

@ -0,0 +1,129 @@
package jolokia2
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"github.com/influxdata/telegraf/testutil"
)
func TestJolokia2_ClientAuthRequest(t *testing.T) {
var username string
var password string
var requests []map[string]interface{}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
username, password, _ = r.BasicAuth()
body, _ := ioutil.ReadAll(r.Body)
err := json.Unmarshal(body, &requests)
if err != nil {
t.Error(err)
}
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
plugin := setupPlugin(t, fmt.Sprintf(`
[jolokia2_agent]
urls = ["%s/jolokia"]
username = "sally"
password = "seashore"
[[jolokia2_agent.metric]]
name = "hello"
mbean = "hello:foo=bar"
`, server.URL))
var acc testutil.Accumulator
plugin.Gather(&acc)
if username != "sally" {
t.Errorf("Expected to post with username %s, but was %s", "sally", username)
}
if password != "seashore" {
t.Errorf("Expected to post with password %s, but was %s", "seashore", password)
}
if len(requests) == 0 {
t.Fatal("Expected to post a request body, but was empty.")
}
request := requests[0]
if expect := "hello:foo=bar"; request["mbean"] != expect {
t.Errorf("Expected to query mbean %s, but was %s", expect, request["mbean"])
}
}
func TestJolokia2_ClientProxyAuthRequest(t *testing.T) {
var requests []map[string]interface{}
var username string
var password string
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
username, password, _ = r.BasicAuth()
body, _ := ioutil.ReadAll(r.Body)
err := json.Unmarshal(body, &requests)
if err != nil {
t.Error(err)
}
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
plugin := setupPlugin(t, fmt.Sprintf(`
[jolokia2_proxy]
url = "%s/jolokia"
username = "sally"
password = "seashore"
[[jolokia2_proxy.target]]
url = "service:jmx:rmi:///jndi/rmi://target:9010/jmxrmi"
username = "jack"
password = "benimble"
[[jolokia2_proxy.metric]]
name = "hello"
mbean = "hello:foo=bar"
`, server.URL))
var acc testutil.Accumulator
plugin.Gather(&acc)
if username != "sally" {
t.Errorf("Expected to post with username %s, but was %s", "sally", username)
}
if password != "seashore" {
t.Errorf("Expected to post with password %s, but was %s", "seashore", password)
}
if len(requests) == 0 {
t.Fatal("Expected to post a request body, but was empty.")
}
request := requests[0]
if expect := "hello:foo=bar"; request["mbean"] != expect {
t.Errorf("Expected to query mbean %s, but was %s", expect, request["mbean"])
}
target, ok := request["target"].(map[string]interface{})
if !ok {
t.Fatal("Expected a proxy target, but was empty.")
}
if expect := "service:jmx:rmi:///jndi/rmi://target:9010/jmxrmi"; target["url"] != expect {
t.Errorf("Expected proxy target url %s, but was %s", expect, target["url"])
}
if expect := "jack"; target["user"] != expect {
t.Errorf("Expected proxy target username %s, but was %s", expect, target["user"])
}
if expect := "benimble"; target["password"] != expect {
t.Errorf("Expected proxy target password %s, but was %s", expect, target["password"])
}
}

View File

@ -0,0 +1,40 @@
[[inputs.jolokia2_agent]]
urls = ["http://localhost:8080/jolokia"]
[[inputs.jolokia2_agent.metric]]
name = "java_runtime"
mbean = "java.lang:type=Runtime"
paths = ["Uptime"]
[[inputs.jolokia2_agent.metric]]
name = "java_memory"
mbean = "java.lang:type=Memory"
paths = ["HeapMemoryUsage", "NonHeapMemoryUsage", "ObjectPendingFinalizationCount"]
[[inputs.jolokia2_agent.metric]]
name = "java_garbage_collector"
mbean = "java.lang:name=G1*,type=GarbageCollector"
paths = ["CollectionTime", "CollectionCount"]
tag_keys = ["name"]
[[inputs.jolokia2_agent.metric]]
name = "java_last_garbage_collection"
mbean = "java.lang:name=G1 Young Generation,type=GarbageCollector"
paths = ["LastGcInfo/duration", "LastGcInfo/GcThreadCount", "LastGcInfo/memoryUsageAfterGc"]
[[inputs.jolokia2_agent.metrics]]
name = "java_threading"
mbean = "java.lang:type=Threading"
paths = ["TotalStartedThreadCount", "ThreadCount", "DaemonThreadCount", "PeakThreadCount"]
[[inputs.jolokia2_agent.metrics]]
name = "java_class_loading"
mbean = "java.lang:type=ClassLoading"
paths = ["LoadedClassCount", "UnloadedClassCount", "TotalLoadedClassCount"]
[[inputs.jolokia2_agent.metrics]]
name = "java_memory_pool"
mbean = "java.lang:name=*,type=MemoryPool"
paths = ["Usage", "PeakUsage", "CollectionUsage"]
tag_keys = ["name"]

View File

@ -0,0 +1,55 @@
[[inputs.jolokia2_agent]]
name_prefix = "kafka_"
urls = ["http://localhost:8080/jolokia"]
[[inputs.jolokia2_agent.metric]]
name = "controller"
mbean = "kafka.controller:name=*,type=*"
field_prefix = "$1."
[[inputs.jolokia2_agent.metric]]
name = "replica_manager"
mbean = "kafka.server:name=*,type=ReplicaManager"
field_prefix = "$1."
[[inputs.jolokia2_agent.metric]]
name = "purgatory"
mbean = "kafka.server:delayedOperation=*,name=*,type=DelayedOperationPurgatory"
field_prefix = "$1."
field_name = "$2"
[[inputs.jolokia2_agent.metric]]
name = "client"
mbean = "kafka.server:client-id=*,type=*"
tag_keys = ["client-id", "type"]
[[inputs.jolokia2_agent.metric]]
name = "request"
mbean = "kafka.network:name=*,request=*,type=RequestMetrics"
field_prefix = "$1."
tag_keys = ["request"]
[[inputs.jolokia2_agent.metric]]
name = "topics"
mbean = "kafka.server:name=*,type=BrokerTopicMetrics"
field_prefix = "$1."
[[inputs.jolokia2_agent.metric]]
name = "topic"
mbean = "kafka.server:name=*,topic=*,type=BrokerTopicMetrics"
field_prefix = "$1."
tag_keys = ["topic"]
[[inputs.jolokia2_agent.metric]]
name = "partition"
mbean = "kafka.log:name=*,partition=*,topic=*,type=Log"
field_name = "$1"
tag_keys = ["topic", "partition"]
[[inputs.jolokia2_agent.metric]]
name = "partition"
mbean = "kafka.cluster:name=UnderReplicated,partition=*,topic=*,type=Partition"
field_name = "UnderReplicatedPartitions"
tag_keys = ["topic", "partition"]

View File

@ -0,0 +1,277 @@
package jolokia2
import (
"fmt"
"sort"
"strings"
"github.com/influxdata/telegraf"
)
const defaultFieldName = "value"
type Gatherer struct {
metrics []Metric
requests []ReadRequest
}
func NewGatherer(metrics []Metric) *Gatherer {
return &Gatherer{
metrics: metrics,
requests: makeReadRequests(metrics),
}
}
// Gather adds points to an accumulator from responses returned
// by a Jolokia agent.
func (g *Gatherer) Gather(client *Client, acc telegraf.Accumulator) error {
var tags map[string]string
if client.config.ProxyConfig != nil {
tags = map[string]string{"jolokia_proxy_url": client.URL}
} else {
tags = map[string]string{"jolokia_agent_url": client.URL}
}
requests := makeReadRequests(g.metrics)
responses, err := client.read(requests)
if err != nil {
return err
}
g.gatherResponses(responses, tags, acc)
return nil
}
// gatherReponses adds points to an accumulator from the ReadResponse objects
// returned by a Jolokia agent.
func (g *Gatherer) gatherResponses(responses []ReadResponse, tags map[string]string, acc telegraf.Accumulator) {
series := make(map[string][]point, 0)
for _, metric := range g.metrics {
points, ok := series[metric.Name]
if !ok {
points = make([]point, 0)
}
responsePoints, responseErrors := g.generatePoints(metric, responses)
for _, responsePoint := range responsePoints {
points = append(points, responsePoint)
}
for _, err := range responseErrors {
acc.AddError(err)
}
series[metric.Name] = points
}
for measurement, points := range series {
for _, point := range compactPoints(points) {
acc.AddFields(measurement,
point.Fields, mergeTags(point.Tags, tags))
}
}
}
// generatePoints creates points for the supplied metric from the ReadResponse
// objects returned by the Jolokia client.
func (g *Gatherer) generatePoints(metric Metric, responses []ReadResponse) ([]point, []error) {
points := make([]point, 0)
errors := make([]error, 0)
for _, response := range responses {
switch response.Status {
case 200:
break
case 404:
continue
default:
errors = append(errors, fmt.Errorf("Unexpected status in response from target %s: %d",
response.RequestTarget, response.Status))
continue
}
if !metricMatchesResponse(metric, response) {
continue
}
pb := newPointBuilder(metric, response.RequestAttributes, response.RequestPath)
for _, point := range pb.Build(metric.Mbean, response.Value) {
if response.RequestTarget != "" {
point.Tags["jolokia_agent_url"] = response.RequestTarget
}
points = append(points, point)
}
}
return points, errors
}
// mergeTags combines two tag sets into a single tag set.
func mergeTags(metricTags, outerTags map[string]string) map[string]string {
tags := make(map[string]string)
for k, v := range outerTags {
tags[k] = v
}
for k, v := range metricTags {
tags[k] = v
}
return tags
}
// metricMatchesResponse returns true when the name, attributes, and path
// of a Metric match the corresponding elements in a ReadResponse object
// returned by a Jolokia agent.
func metricMatchesResponse(metric Metric, response ReadResponse) bool {
if metric.Mbean != response.RequestMbean {
return false
}
if len(metric.Paths) == 0 {
return len(response.RequestAttributes) == 0
}
for _, fullPath := range metric.Paths {
segments := strings.SplitN(fullPath, "/", 2)
attribute := segments[0]
var path string
if len(segments) == 2 {
path = segments[1]
}
for _, rattr := range response.RequestAttributes {
if attribute == rattr && path == response.RequestPath {
return true
}
}
}
return false
}
// compactPoints attepts to remove points by compacting points
// with matching tag sets. When a match is found, the fields from
// one point are moved to another, and the empty point is removed.
func compactPoints(points []point) []point {
compactedPoints := make([]point, 0)
for _, sourcePoint := range points {
keepPoint := true
for _, compactPoint := range compactedPoints {
if !tagSetsMatch(sourcePoint.Tags, compactPoint.Tags) {
continue
}
keepPoint = false
for key, val := range sourcePoint.Fields {
compactPoint.Fields[key] = val
}
}
if keepPoint {
compactedPoints = append(compactedPoints, sourcePoint)
}
}
return compactedPoints
}
// tagSetsMatch returns true if two maps are equivalent.
func tagSetsMatch(a, b map[string]string) bool {
if len(a) != len(b) {
return false
}
for ak, av := range a {
bv, ok := b[ak]
if !ok {
return false
}
if av != bv {
return false
}
}
return true
}
// makeReadRequests creates ReadRequest objects from metrics definitions.
func makeReadRequests(metrics []Metric) []ReadRequest {
var requests []ReadRequest
for _, metric := range metrics {
if len(metric.Paths) == 0 {
requests = append(requests, ReadRequest{
Mbean: metric.Mbean,
Attributes: []string{},
})
} else {
attributes := make(map[string][]string)
for _, path := range metric.Paths {
segments := strings.Split(path, "/")
attribute := segments[0]
if _, ok := attributes[attribute]; !ok {
attributes[attribute] = make([]string, 0)
}
if len(segments) > 1 {
paths := attributes[attribute]
attributes[attribute] = append(paths, strings.Join(segments[1:], "/"))
}
}
rootAttributes := findRequestAttributesWithoutPaths(attributes)
if len(rootAttributes) > 0 {
requests = append(requests, ReadRequest{
Mbean: metric.Mbean,
Attributes: rootAttributes,
})
}
for _, deepAttribute := range findRequestAttributesWithPaths(attributes) {
for _, path := range attributes[deepAttribute] {
requests = append(requests, ReadRequest{
Mbean: metric.Mbean,
Attributes: []string{deepAttribute},
Path: path,
})
}
}
}
}
return requests
}
func findRequestAttributesWithoutPaths(attributes map[string][]string) []string {
results := make([]string, 0)
for attr, paths := range attributes {
if len(paths) == 0 {
results = append(results, attr)
}
}
sort.Strings(results)
return results
}
func findRequestAttributesWithPaths(attributes map[string][]string) []string {
results := make([]string, 0)
for attr, paths := range attributes {
if len(paths) != 0 {
results = append(results, attr)
}
}
sort.Strings(results)
return results
}

View File

@ -0,0 +1,104 @@
package jolokia2
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestJolokia2_makeReadRequests(t *testing.T) {
cases := []struct {
metric Metric
expected []ReadRequest
}{
{
metric: Metric{
Name: "object",
Mbean: "test:foo=bar",
},
expected: []ReadRequest{
ReadRequest{
Mbean: "test:foo=bar",
Attributes: []string{},
},
},
}, {
metric: Metric{
Name: "object_with_an_attribute",
Mbean: "test:foo=bar",
Paths: []string{"biz"},
},
expected: []ReadRequest{
ReadRequest{
Mbean: "test:foo=bar",
Attributes: []string{"biz"},
},
},
}, {
metric: Metric{
Name: "object_with_attributes",
Mbean: "test:foo=bar",
Paths: []string{"baz", "biz"},
},
expected: []ReadRequest{
ReadRequest{
Mbean: "test:foo=bar",
Attributes: []string{"baz", "biz"},
},
},
}, {
metric: Metric{
Name: "object_with_an_attribute_and_path",
Mbean: "test:foo=bar",
Paths: []string{"biz/baz"},
},
expected: []ReadRequest{
ReadRequest{
Mbean: "test:foo=bar",
Attributes: []string{"biz"},
Path: "baz",
},
},
}, {
metric: Metric{
Name: "object_with_an_attribute_and_a_deep_path",
Mbean: "test:foo=bar",
Paths: []string{"biz/baz/fiz/faz"},
},
expected: []ReadRequest{
ReadRequest{
Mbean: "test:foo=bar",
Attributes: []string{"biz"},
Path: "baz/fiz/faz",
},
},
}, {
metric: Metric{
Name: "object_with_attributes_and_paths",
Mbean: "test:foo=bar",
Paths: []string{"baz/biz", "faz/fiz"},
},
expected: []ReadRequest{
ReadRequest{
Mbean: "test:foo=bar",
Attributes: []string{"baz"},
Path: "biz",
},
ReadRequest{
Mbean: "test:foo=bar",
Attributes: []string{"faz"},
Path: "fiz",
},
},
},
}
for _, c := range cases {
payload := makeReadRequests([]Metric{c.metric})
assert.Equal(t, len(c.expected), len(payload), "Failing case: "+c.metric.Name)
for _, actual := range payload {
assert.Contains(t, c.expected, actual, "Failing case: "+c.metric.Name)
}
}
}

View File

@ -0,0 +1,21 @@
package jolokia2
import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
func init() {
inputs.Add("jolokia2_agent", func() telegraf.Input {
return &JolokiaAgent{
Metrics: []MetricConfig{},
DefaultFieldSeparator: ".",
}
})
inputs.Add("jolokia2_proxy", func() telegraf.Input {
return &JolokiaProxy{
Metrics: []MetricConfig{},
DefaultFieldSeparator: ".",
}
})
}

View File

@ -0,0 +1,112 @@
package jolokia2
import (
"fmt"
"sync"
"time"
"github.com/influxdata/telegraf"
)
type JolokiaAgent struct {
DefaultFieldPrefix string
DefaultFieldSeparator string
DefaultTagPrefix string
URLs []string `toml:"urls"`
Username string
Password string
ResponseTimeout time.Duration `toml:"response_timeout"`
SSLCA string `toml:"ssl_ca"`
SSLCert string `toml:"ssl_cert"`
SSLKey string `toml:"ssl_key"`
InsecureSkipVerify bool
Metrics []MetricConfig `toml:"metric"`
gatherer *Gatherer
}
func (ja *JolokiaAgent) SampleConfig() string {
return `
# default_tag_prefix = ""
# default_field_prefix = ""
# default_field_separator = "."
# Add agents URLs to query
urls = ["http://localhost:8080/jolokia"]
# username = ""
# password = ""
# response_timeout = "5s"
## Optional SSL config
# ssl_ca = "/var/private/ca.pem"
# ssl_cert = "/var/private/client.pem"
# ssl_key = "/var/private/client-key.pem"
# insecure_skip_verify = false
## Add metrics to read
[[inputs.jolokia2.metric]]
name = "java_runtime"
mbean = "java.lang:type=Runtime"
paths = ["Uptime"]
`
}
func (ja *JolokiaAgent) Description() string {
return "Read JMX metrics from a Jolokia REST agent endpoint"
}
func (ja *JolokiaAgent) Gather(acc telegraf.Accumulator) error {
if ja.gatherer == nil {
ja.gatherer = NewGatherer(ja.createMetrics())
}
var wg sync.WaitGroup
for _, url := range ja.URLs {
client, err := ja.createClient(url)
if err != nil {
acc.AddError(fmt.Errorf("Unable to create client for %s: %v", url, err))
continue
}
wg.Add(1)
go func(client *Client) {
defer wg.Done()
err = ja.gatherer.Gather(client, acc)
if err != nil {
acc.AddError(fmt.Errorf("Unable to gather metrics for %s: %v", client.URL, err))
}
}(client)
}
wg.Wait()
return nil
}
func (ja *JolokiaAgent) createMetrics() []Metric {
var metrics []Metric
for _, config := range ja.Metrics {
metrics = append(metrics, NewMetric(config,
ja.DefaultFieldPrefix, ja.DefaultFieldSeparator, ja.DefaultTagPrefix))
}
return metrics
}
func (ja *JolokiaAgent) createClient(url string) (*Client, error) {
return NewClient(url, &ClientConfig{
Username: ja.Username,
Password: ja.Password,
ResponseTimeout: ja.ResponseTimeout,
SSLCA: ja.SSLCA,
SSLCert: ja.SSLCert,
SSLKey: ja.SSLKey,
InsecureSkipVerify: ja.InsecureSkipVerify,
})
}

View File

@ -0,0 +1,129 @@
package jolokia2
import (
"time"
"github.com/influxdata/telegraf"
)
type JolokiaProxy struct {
DefaultFieldPrefix string
DefaultFieldSeparator string
DefaultTagPrefix string
URL string `toml:"url"`
DefaultTargetPassword string
DefaultTargetUsername string
Targets []JolokiaProxyTargetConfig `toml:"target"`
Username string
Password string
SSLCA string `toml:"ssl_ca"`
SSLCert string `toml:"ssl_cert"`
SSLKey string `toml:"ssl_key"`
InsecureSkipVerify bool
ResponseTimeout time.Duration `toml:"response_timeout"`
Metrics []MetricConfig `toml:"metric"`
client *Client
gatherer *Gatherer
}
type JolokiaProxyTargetConfig struct {
URL string `toml:"url"`
Username string
Password string
}
func (jp *JolokiaProxy) SampleConfig() string {
return `
# default_tag_prefix = ""
# default_field_prefix = ""
# default_field_separator = "."
## Proxy agent
url = "http://localhost:8080/jolokia"
# username = ""
# password = ""
# response_timeout = "5s"
## Optional SSL config
# ssl_ca = "/var/private/ca.pem"
# ssl_cert = "/var/private/client.pem"
# ssl_key = "/var/private/client-key.pem"
# insecure_skip_verify = false
## Add proxy targets to query
# default_target_username = ""
# default_target_password = ""
[[inputs.jolokia_proxy.target]]
url = "service:jmx:rmi:///jndi/rmi://targethost:9999/jmxrmi"
# username = ""
# password = ""
## Add metrics to read
[[inputs.jolokia_proxy.metric]]
name = "java_runtime"
mbean = "java.lang:type=Runtime"
paths = ["Uptime"]
`
}
func (jp *JolokiaProxy) Description() string {
return "Read JMX metrics from a Jolokia REST proxy endpoint"
}
func (jp *JolokiaProxy) Gather(acc telegraf.Accumulator) error {
if jp.gatherer == nil {
jp.gatherer = NewGatherer(jp.createMetrics())
}
if jp.client == nil {
client, err := jp.createClient()
if err != nil {
return err
}
jp.client = client
}
return jp.gatherer.Gather(jp.client, acc)
}
func (jp *JolokiaProxy) createMetrics() []Metric {
var metrics []Metric
for _, config := range jp.Metrics {
metrics = append(metrics, NewMetric(config,
jp.DefaultFieldPrefix, jp.DefaultFieldSeparator, jp.DefaultTagPrefix))
}
return metrics
}
func (jp *JolokiaProxy) createClient() (*Client, error) {
proxyConfig := &ProxyConfig{
DefaultTargetUsername: jp.DefaultTargetUsername,
DefaultTargetPassword: jp.DefaultTargetPassword,
}
for _, target := range jp.Targets {
proxyConfig.Targets = append(proxyConfig.Targets, ProxyTargetConfig{
URL: target.URL,
Username: target.Username,
Password: target.Password,
})
}
return NewClient(jp.URL, &ClientConfig{
Username: jp.Username,
Password: jp.Password,
ResponseTimeout: jp.ResponseTimeout,
SSLCA: jp.SSLCA,
SSLCert: jp.SSLCert,
SSLKey: jp.SSLKey,
InsecureSkipVerify: jp.InsecureSkipVerify,
ProxyConfig: proxyConfig,
})
}

View File

@ -0,0 +1,673 @@
package jolokia2
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/influxdata/toml"
"github.com/influxdata/toml/ast"
"github.com/stretchr/testify/assert"
)
func TestJolokia2_ScalarValues(t *testing.T) {
config := `
[jolokia2_agent]
urls = ["%s"]
[[jolokia2_agent.metric]]
name = "scalar_without_attribute"
mbean = "scalar_without_attribute"
[[jolokia2_agent.metric]]
name = "scalar_with_attribute"
mbean = "scalar_with_attribute"
paths = ["biz"]
[[jolokia2_agent.metric]]
name = "scalar_with_attribute_and_path"
mbean = "scalar_with_attribute_and_path"
paths = ["biz/baz"]
# This should return multiple series with different test tags.
[[jolokia2_agent.metric]]
name = "scalar_with_key_pattern"
mbean = "scalar_with_key_pattern:test=*"
tag_keys = ["test"]`
response := `[{
"request": {
"mbean": "scalar_without_attribute",
"type": "read"
},
"value": 123,
"status": 200
}, {
"request": {
"mbean": "scalar_with_attribute",
"attribute": "biz",
"type": "read"
},
"value": 456,
"status": 200
}, {
"request": {
"mbean": "scalar_with_attribute_and_path",
"attribute": "biz",
"path": "baz",
"type": "read"
},
"value": 789,
"status": 200
}, {
"request": {
"mbean": "scalar_with_key_pattern:test=*",
"type": "read"
},
"value": {
"scalar_with_key_pattern:test=foo": 123,
"scalar_with_key_pattern:test=bar": 456
},
"status": 200
}]`
server := setupServer(http.StatusOK, response)
defer server.Close()
plugin := setupPlugin(t, fmt.Sprintf(config, server.URL))
var acc testutil.Accumulator
assert.NoError(t, plugin.Gather(&acc))
acc.AssertContainsTaggedFields(t, "scalar_without_attribute", map[string]interface{}{
"value": 123.0,
}, map[string]string{
"jolokia_agent_url": server.URL,
})
acc.AssertContainsTaggedFields(t, "scalar_with_attribute", map[string]interface{}{
"biz": 456.0,
}, map[string]string{
"jolokia_agent_url": server.URL,
})
acc.AssertContainsTaggedFields(t, "scalar_with_attribute_and_path", map[string]interface{}{
"biz.baz": 789.0,
}, map[string]string{
"jolokia_agent_url": server.URL,
})
acc.AssertContainsTaggedFields(t, "scalar_with_key_pattern", map[string]interface{}{
"value": 123.0,
}, map[string]string{
"jolokia_agent_url": server.URL,
"test": "foo",
})
acc.AssertContainsTaggedFields(t, "scalar_with_key_pattern", map[string]interface{}{
"value": 456.0,
}, map[string]string{
"jolokia_agent_url": server.URL,
"test": "bar",
})
}
func TestJolokia2_ObjectValues(t *testing.T) {
config := `
[jolokia2_agent]
urls = ["%s"]
[[jolokia2_agent.metric]]
name = "object_without_attribute"
mbean = "object_without_attribute"
tag_keys = ["foo"]
[[jolokia2_agent.metric]]
name = "object_with_attribute"
mbean = "object_with_attribute"
paths = ["biz"]
[[jolokia2_agent.metric]]
name = "object_with_attribute_and_path"
mbean = "object_with_attribute_and_path"
paths = ["biz/baz"]
# This will generate two separate request objects.
[[jolokia2_agent.metric]]
name = "object_with_branching_paths"
mbean = "object_with_branching_paths"
paths = ["foo/fiz", "foo/faz"]
# This should return multiple series with different test tags.
[[jolokia2_agent.metric]]
name = "object_with_key_pattern"
mbean = "object_with_key_pattern:test=*"
tag_keys = ["test"]`
response := `[{
"request": {
"mbean": "object_without_attribute",
"type": "read"
},
"value": {
"biz": 123,
"baz": 456
},
"status": 200
}, {
"request": {
"mbean": "object_with_attribute",
"attribute": "biz",
"type": "read"
},
"value": {
"fiz": 123,
"faz": 456
},
"status": 200
}, {
"request": {
"mbean": "object_with_branching_paths",
"attribute": "foo",
"path": "fiz",
"type": "read"
},
"value": {
"bing": 123
},
"status": 200
}, {
"request": {
"mbean": "object_with_branching_paths",
"attribute": "foo",
"path": "faz",
"type": "read"
},
"value": {
"bang": 456
},
"status": 200
}, {
"request": {
"mbean": "object_with_attribute_and_path",
"attribute": "biz",
"path": "baz",
"type": "read"
},
"value": {
"bing": 123,
"bang": 456
},
"status": 200
}, {
"request": {
"mbean": "object_with_key_pattern:test=*",
"type": "read"
},
"value": {
"object_with_key_pattern:test=foo": {
"fiz": 123
},
"object_with_key_pattern:test=bar": {
"biz": 456
}
},
"status": 200
}]`
server := setupServer(http.StatusOK, response)
defer server.Close()
plugin := setupPlugin(t, fmt.Sprintf(config, server.URL))
var acc testutil.Accumulator
assert.NoError(t, plugin.Gather(&acc))
acc.AssertContainsTaggedFields(t, "object_without_attribute", map[string]interface{}{
"biz": 123.0,
"baz": 456.0,
}, map[string]string{
"jolokia_agent_url": server.URL,
})
acc.AssertContainsTaggedFields(t, "object_with_attribute", map[string]interface{}{
"biz.fiz": 123.0,
"biz.faz": 456.0,
}, map[string]string{
"jolokia_agent_url": server.URL,
})
acc.AssertContainsTaggedFields(t, "object_with_attribute_and_path", map[string]interface{}{
"biz.baz.bing": 123.0,
"biz.baz.bang": 456.0,
}, map[string]string{
"jolokia_agent_url": server.URL,
})
acc.AssertContainsTaggedFields(t, "object_with_branching_paths", map[string]interface{}{
"foo.fiz.bing": 123.0,
"foo.faz.bang": 456.0,
}, map[string]string{
"jolokia_agent_url": server.URL,
})
acc.AssertContainsTaggedFields(t, "object_with_key_pattern", map[string]interface{}{
"fiz": 123.0,
}, map[string]string{
"test": "foo",
"jolokia_agent_url": server.URL,
})
acc.AssertContainsTaggedFields(t, "object_with_key_pattern", map[string]interface{}{
"biz": 456.0,
}, map[string]string{
"test": "bar",
"jolokia_agent_url": server.URL,
})
}
func TestJolokia2_StatusCodes(t *testing.T) {
config := `
[jolokia2_agent]
urls = ["%s"]
[[jolokia2_agent.metric]]
name = "ok"
mbean = "ok"
[[jolokia2_agent.metric]]
name = "not_found"
mbean = "not_found"
[[jolokia2_agent.metric]]
name = "unknown"
mbean = "unknown"`
response := `[{
"request": {
"mbean": "ok",
"type": "read"
},
"value": 1,
"status": 200
}, {
"request": {
"mbean": "not_found",
"type": "read"
},
"status": 404
}, {
"request": {
"mbean": "unknown",
"type": "read"
},
"status": 500
}]`
server := setupServer(http.StatusOK, response)
defer server.Close()
plugin := setupPlugin(t, fmt.Sprintf(config, server.URL))
var acc testutil.Accumulator
assert.NoError(t, plugin.Gather(&acc))
acc.AssertContainsTaggedFields(t, "ok", map[string]interface{}{
"value": 1.0,
}, map[string]string{
"jolokia_agent_url": server.URL,
})
acc.AssertDoesNotContainMeasurement(t, "not_found")
acc.AssertDoesNotContainMeasurement(t, "unknown")
}
func TestJolokia2_TagRenaming(t *testing.T) {
config := `
[jolokia2_agent]
default_tag_prefix = "DEFAULT_PREFIX_"
urls = ["%s"]
[[jolokia2_agent.metric]]
name = "default_tag_prefix"
mbean = "default_tag_prefix:biz=baz,fiz=faz"
tag_keys = ["biz", "fiz"]
[[jolokia2_agent.metric]]
name = "custom_tag_prefix"
mbean = "custom_tag_prefix:biz=baz,fiz=faz"
tag_keys = ["biz", "fiz"]
tag_prefix = "CUSTOM_PREFIX_"`
response := `[{
"request": {
"mbean": "default_tag_prefix:biz=baz,fiz=faz",
"type": "read"
},
"value": 123,
"status": 200
}, {
"request": {
"mbean": "custom_tag_prefix:biz=baz,fiz=faz",
"type": "read"
},
"value": 123,
"status": 200
}]`
server := setupServer(http.StatusOK, response)
defer server.Close()
plugin := setupPlugin(t, fmt.Sprintf(config, server.URL))
var acc testutil.Accumulator
assert.NoError(t, plugin.Gather(&acc))
acc.AssertContainsTaggedFields(t, "default_tag_prefix", map[string]interface{}{
"value": 123.0,
}, map[string]string{
"DEFAULT_PREFIX_biz": "baz",
"DEFAULT_PREFIX_fiz": "faz",
"jolokia_agent_url": server.URL,
})
acc.AssertContainsTaggedFields(t, "custom_tag_prefix", map[string]interface{}{
"value": 123.0,
}, map[string]string{
"CUSTOM_PREFIX_biz": "baz",
"CUSTOM_PREFIX_fiz": "faz",
"jolokia_agent_url": server.URL,
})
}
func TestJolokia2_FieldRenaming(t *testing.T) {
config := `
[jolokia2_agent]
default_field_prefix = "DEFAULT_PREFIX_"
default_field_separator = "_DEFAULT_SEPARATOR_"
urls = ["%s"]
[[jolokia2_agent.metric]]
name = "default_field_modifiers"
mbean = "default_field_modifiers"
[[jolokia2_agent.metric]]
name = "custom_field_modifiers"
mbean = "custom_field_modifiers"
field_prefix = "CUSTOM_PREFIX_"
field_separator = "_CUSTOM_SEPARATOR_"
[[jolokia2_agent.metric]]
name = "field_prefix_substitution"
mbean = "field_prefix_substitution:foo=*"
field_prefix = "$1_"
[[jolokia2_agent.metric]]
name = "field_name_substitution"
mbean = "field_name_substitution:foo=*"
field_prefix = ""
field_name = "$1"`
response := `[{
"request": {
"mbean": "default_field_modifiers",
"type": "read"
},
"value": {
"hello": { "world": 123 }
},
"status": 200
}, {
"request": {
"mbean": "custom_field_modifiers",
"type": "read"
},
"value": {
"hello": { "world": 123 }
},
"status": 200
}, {
"request": {
"mbean": "field_prefix_substitution:foo=*",
"type": "read"
},
"value": {
"field_prefix_substitution:foo=biz": 123,
"field_prefix_substitution:foo=baz": 456
},
"status": 200
}, {
"request": {
"mbean": "field_name_substitution:foo=*",
"type": "read"
},
"value": {
"field_name_substitution:foo=biz": 123,
"field_name_substitution:foo=baz": 456
},
"status": 200
}]`
server := setupServer(http.StatusOK, response)
defer server.Close()
plugin := setupPlugin(t, fmt.Sprintf(config, server.URL))
var acc testutil.Accumulator
assert.NoError(t, plugin.Gather(&acc))
acc.AssertContainsTaggedFields(t, "default_field_modifiers", map[string]interface{}{
"DEFAULT_PREFIX_hello_DEFAULT_SEPARATOR_world": 123.0,
}, map[string]string{
"jolokia_agent_url": server.URL,
})
acc.AssertContainsTaggedFields(t, "custom_field_modifiers", map[string]interface{}{
"CUSTOM_PREFIX_hello_CUSTOM_SEPARATOR_world": 123.0,
}, map[string]string{
"jolokia_agent_url": server.URL,
})
acc.AssertContainsTaggedFields(t, "field_prefix_substitution", map[string]interface{}{
"biz_value": 123.0,
"baz_value": 456.0,
}, map[string]string{
"jolokia_agent_url": server.URL,
})
acc.AssertContainsTaggedFields(t, "field_name_substitution", map[string]interface{}{
"biz": 123.0,
"baz": 456.0,
}, map[string]string{
"jolokia_agent_url": server.URL,
})
}
func TestJolokia2_MetricCompaction(t *testing.T) {
config := `
[jolokia2_agent]
urls = ["%s"]
[[jolokia2_agent.metric]]
name = "compact_metric"
mbean = "scalar_value:flavor=chocolate"
tag_keys = ["flavor"]
[[jolokia2_agent.metric]]
name = "compact_metric"
mbean = "scalar_value:flavor=vanilla"
tag_keys = ["flavor"]
[[jolokia2_agent.metric]]
name = "compact_metric"
mbean = "object_value1:flavor=chocolate"
tag_keys = ["flavor"]
[[jolokia2_agent.metric]]
name = "compact_metric"
mbean = "object_value2:flavor=chocolate"
tag_keys = ["flavor"]`
response := `[{
"request": {
"mbean": "scalar_value:flavor=chocolate",
"type": "read"
},
"value": 123,
"status": 200
}, {
"request": {
"mbean": "scalar_value:flavor=vanilla",
"type": "read"
},
"value": 999,
"status": 200
}, {
"request": {
"mbean": "object_value1:flavor=chocolate",
"type": "read"
},
"value": {
"foo": 456
},
"status": 200
}, {
"request": {
"mbean": "object_value2:flavor=chocolate",
"type": "read"
},
"value": {
"bar": 789
},
"status": 200
}]`
server := setupServer(http.StatusOK, response)
defer server.Close()
plugin := setupPlugin(t, fmt.Sprintf(config, server.URL))
var acc testutil.Accumulator
assert.NoError(t, plugin.Gather(&acc))
acc.AssertContainsTaggedFields(t, "compact_metric", map[string]interface{}{
"value": 123.0,
"foo": 456.0,
"bar": 789.0,
}, map[string]string{
"flavor": "chocolate",
"jolokia_agent_url": server.URL,
})
acc.AssertContainsTaggedFields(t, "compact_metric", map[string]interface{}{
"value": 999.0,
}, map[string]string{
"flavor": "vanilla",
"jolokia_agent_url": server.URL,
})
}
func TestJolokia2_ProxyTargets(t *testing.T) {
config := `
[jolokia2_proxy]
url = "%s"
[[jolokia2_proxy.target]]
url = "service:jmx:rmi:///jndi/rmi://target1:9010/jmxrmi"
[[jolokia2_proxy.target]]
url = "service:jmx:rmi:///jndi/rmi://target2:9010/jmxrmi"
[[jolokia2_proxy.metric]]
name = "hello"
mbean = "hello:foo=bar"`
response := `[{
"request": {
"type": "read",
"mbean": "hello:foo=bar",
"target": {
"url": "service:jmx:rmi:///jndi/rmi://target1:9010/jmxrmi"
}
},
"value": 123,
"status": 200
}, {
"request": {
"type": "read",
"mbean": "hello:foo=bar",
"target": {
"url": "service:jmx:rmi:///jndi/rmi://target2:9010/jmxrmi"
}
},
"value": 456,
"status": 200
}]`
server := setupServer(http.StatusOK, response)
defer server.Close()
plugin := setupPlugin(t, fmt.Sprintf(config, server.URL))
var acc testutil.Accumulator
assert.NoError(t, plugin.Gather(&acc))
acc.AssertContainsTaggedFields(t, "hello", map[string]interface{}{
"value": 123.0,
}, map[string]string{
"jolokia_proxy_url": server.URL,
"jolokia_agent_url": "service:jmx:rmi:///jndi/rmi://target1:9010/jmxrmi",
})
acc.AssertContainsTaggedFields(t, "hello", map[string]interface{}{
"value": 456.0,
}, map[string]string{
"jolokia_proxy_url": server.URL,
"jolokia_agent_url": "service:jmx:rmi:///jndi/rmi://target2:9010/jmxrmi",
})
}
func setupServer(status int, resp string) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
//body, err := ioutil.ReadAll(r.Body)
//if err == nil {
// fmt.Println(string(body))
//}
fmt.Fprintln(w, resp)
}))
}
func setupPlugin(t *testing.T, conf string) telegraf.Input {
table, err := toml.Parse([]byte(conf))
if err != nil {
t.Fatalf("Unable to parse config! %v", err)
}
for name, _ := range table.Fields {
object := table.Fields[name]
switch name {
case "jolokia2_agent":
plugin := JolokiaAgent{
Metrics: []MetricConfig{},
DefaultFieldSeparator: ".",
}
if err := toml.UnmarshalTable(object.(*ast.Table), &plugin); err != nil {
t.Fatalf("Unable to parse jolokia_agent plugin config! %v", err)
}
return &plugin
case "jolokia2_proxy":
plugin := JolokiaProxy{
Metrics: []MetricConfig{},
DefaultFieldSeparator: ".",
}
if err := toml.UnmarshalTable(object.(*ast.Table), &plugin); err != nil {
t.Fatalf("Unable to parse jolokia_proxy plugin config! %v", err)
}
return &plugin
}
}
return nil
}

View File

@ -0,0 +1,61 @@
package jolokia2
// A MetricConfig represents a TOML form of
// a Metric with some optional fields.
type MetricConfig struct {
Name string
Mbean string
Paths []string
FieldName *string
FieldPrefix *string
FieldSeparator *string
TagPrefix *string
TagKeys []string
}
// A Metric represents a specification for a
// Jolokia read request, and the transformations
// to apply to points generated from the responses.
type Metric struct {
Name string
Mbean string
Paths []string
FieldName string
FieldPrefix string
FieldSeparator string
TagPrefix string
TagKeys []string
}
func NewMetric(config MetricConfig, defaultFieldPrefix, defaultFieldSeparator, defaultTagPrefix string) Metric {
metric := Metric{
Name: config.Name,
Mbean: config.Mbean,
Paths: config.Paths,
TagKeys: config.TagKeys,
}
if config.FieldName != nil {
metric.FieldName = *config.FieldName
}
if config.FieldPrefix == nil {
metric.FieldPrefix = defaultFieldPrefix
} else {
metric.FieldPrefix = *config.FieldPrefix
}
if config.FieldSeparator == nil {
metric.FieldSeparator = defaultFieldSeparator
} else {
metric.FieldSeparator = *config.FieldSeparator
}
if config.TagPrefix == nil {
metric.TagPrefix = defaultTagPrefix
} else {
metric.TagPrefix = *config.TagPrefix
}
return metric
}

View File

@ -0,0 +1,271 @@
package jolokia2
import (
"fmt"
"strings"
)
type point struct {
Tags map[string]string
Fields map[string]interface{}
}
type pointBuilder struct {
metric Metric
objectAttributes []string
objectPath string
substitutions []string
}
func newPointBuilder(metric Metric, attributes []string, path string) *pointBuilder {
return &pointBuilder{
metric: metric,
objectAttributes: attributes,
objectPath: path,
substitutions: makeSubstitutionList(metric.Mbean),
}
}
// Build generates a point for a given mbean name/pattern and value object.
func (pb *pointBuilder) Build(mbean string, value interface{}) []point {
hasPattern := strings.Contains(mbean, "*")
if !hasPattern {
value = map[string]interface{}{mbean: value}
}
valueMap, ok := value.(map[string]interface{})
if !ok { // FIXME: log it and move on.
panic(fmt.Sprintf("There should be a map here for %s!\n", mbean))
}
points := make([]point, 0)
for mbean, value := range valueMap {
points = append(points, point{
Tags: pb.extractTags(mbean),
Fields: pb.extractFields(mbean, value),
})
}
return compactPoints(points)
}
// extractTags generates the map of tags for a given mbean name/pattern.
func (pb *pointBuilder) extractTags(mbean string) map[string]string {
propertyMap := makePropertyMap(mbean)
tagMap := make(map[string]string)
for key, value := range propertyMap {
if pb.includeTag(key) {
tagName := pb.formatTagName(key)
tagMap[tagName] = value
}
}
return tagMap
}
func (pb *pointBuilder) includeTag(tagName string) bool {
for _, t := range pb.metric.TagKeys {
if tagName == t {
return true
}
}
return false
}
func (pb *pointBuilder) formatTagName(tagName string) string {
if tagName == "" {
return ""
}
if tagPrefix := pb.metric.TagPrefix; tagPrefix != "" {
return tagPrefix + tagName
}
return tagName
}
// extractFields generates the map of fields for a given mbean name
// and value object.
func (pb *pointBuilder) extractFields(mbean string, value interface{}) map[string]interface{} {
fieldMap := make(map[string]interface{})
valueMap, ok := value.(map[string]interface{})
if ok {
// complex value
if len(pb.objectAttributes) == 0 {
// if there were no attributes requested,
// then the keys are attributes
pb.fillFields("", valueMap, fieldMap)
} else if len(pb.objectAttributes) == 1 {
// if there was a single attribute requested,
// then the keys are the attribute's properties
fieldName := pb.formatFieldName(pb.objectAttributes[0], pb.objectPath)
pb.fillFields(fieldName, valueMap, fieldMap)
} else {
// if there were multiple attributes requested,
// then the keys are the attribute names
for _, attribute := range pb.objectAttributes {
fieldName := pb.formatFieldName(attribute, pb.objectPath)
pb.fillFields(fieldName, valueMap[attribute], fieldMap)
}
}
} else {
// scalar value
var fieldName string
if len(pb.objectAttributes) == 0 {
fieldName = pb.formatFieldName(defaultFieldName, pb.objectPath)
} else {
fieldName = pb.formatFieldName(pb.objectAttributes[0], pb.objectPath)
}
pb.fillFields(fieldName, value, fieldMap)
}
if len(pb.substitutions) > 1 {
pb.applySubstitutions(mbean, fieldMap)
}
return fieldMap
}
// formatFieldName generates a field name from the supplied attribute and
// path. The return value has the configured FieldPrefix and FieldSuffix
// instructions applied.
func (pb *pointBuilder) formatFieldName(attribute, path string) string {
fieldName := attribute
fieldPrefix := pb.metric.FieldPrefix
fieldSeparator := pb.metric.FieldSeparator
if fieldPrefix != "" {
fieldName = fieldPrefix + fieldName
}
if path != "" {
fieldName = fieldName + fieldSeparator + strings.Replace(path, "/", fieldSeparator, -1)
}
return fieldName
}
// fillFields recurses into the supplied value object, generating a named field
// for every value it discovers.
func (pb *pointBuilder) fillFields(name string, value interface{}, fieldMap map[string]interface{}) {
if valueMap, ok := value.(map[string]interface{}); ok {
// keep going until we get to something that is not a map
for key, innerValue := range valueMap {
var innerName string
if name == "" {
innerName = pb.metric.FieldPrefix + key
} else {
innerName = name + pb.metric.FieldSeparator + key
}
pb.fillFields(innerName, innerValue, fieldMap)
}
return
}
if pb.metric.FieldName != "" {
name = pb.metric.FieldName
if prefix := pb.metric.FieldPrefix; prefix != "" {
name = prefix + name
}
}
if name == "" {
name = defaultFieldName
}
fieldMap[name] = value
}
// applySubstitutions updates all the keys in the supplied map
// of fields to account for $1-style substitution instructions.
func (pb *pointBuilder) applySubstitutions(mbean string, fieldMap map[string]interface{}) {
properties := makePropertyMap(mbean)
for i, subKey := range pb.substitutions[1:] {
symbol := fmt.Sprintf("$%d", i+1)
substitution := properties[subKey]
for fieldName, fieldValue := range fieldMap {
newFieldName := strings.Replace(fieldName, symbol, substitution, -1)
if fieldName != newFieldName {
fieldMap[newFieldName] = fieldValue
delete(fieldMap, fieldName)
}
}
}
}
// makePropertyMap returns a the mbean property-key list as
// a dictionary. foo:x=y becomes map[string]string { "x": "y" }
func makePropertyMap(mbean string) map[string]string {
props := make(map[string]string)
object := strings.SplitN(mbean, ":", 2)
domain := object[0]
if domain != "" && len(object) == 2 {
list := object[1]
for _, keyProperty := range strings.Split(list, ",") {
pair := strings.SplitN(keyProperty, "=", 2)
if len(pair) != 2 {
continue
}
if key := pair[0]; key != "" {
props[key] = pair[1]
}
}
}
return props
}
// makeSubstitutionList returns an array of values to
// use as substitutions when renaming fields
// with the $1..$N syntax. The first item in the list
// is always the mbean domain.
func makeSubstitutionList(mbean string) []string {
subs := make([]string, 0)
object := strings.SplitN(mbean, ":", 2)
domain := object[0]
if domain != "" && len(object) == 2 {
subs = append(subs, domain)
list := object[1]
for _, keyProperty := range strings.Split(list, ",") {
pair := strings.SplitN(keyProperty, "=", 2)
if len(pair) != 2 {
continue
}
key := pair[0]
if key == "" {
continue
}
property := pair[1]
if !strings.Contains(property, "*") {
continue
}
subs = append(subs, key)
}
}
return subs
}