diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 7281f5b7a..45d040503 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -36,6 +36,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/ipmi_sensor" _ "github.com/influxdata/telegraf/plugins/inputs/iptables" _ "github.com/influxdata/telegraf/plugins/inputs/jolokia" + _ "github.com/influxdata/telegraf/plugins/inputs/jolokia2" _ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer_legacy" _ "github.com/influxdata/telegraf/plugins/inputs/kapacitor" diff --git a/plugins/inputs/jolokia2/README.md b/plugins/inputs/jolokia2/README.md new file mode 100644 index 000000000..0ab985e5b --- /dev/null +++ b/plugins/inputs/jolokia2/README.md @@ -0,0 +1,169 @@ +# Jolokia2 Input Plugins + +The [Jolokia](http://jolokia.org) _agent_ and _proxy_ input plugins collect JMX metrics from an HTTP endpoint using Jolokia's [JSON-over-HTTP protocol](https://jolokia.org/reference/html/protocol.html). + +## Jolokia Agent Configuration + +The `jolokia2_agent` input plugin reads JMX metrics from one or more [Jolokia agent](https://jolokia.org/agent/jvm.html) REST endpoints. + +```toml +[[inputs.jolokia2_agent]] + urls = ["http://agent:8080/jolokia"] + + [[inputs.jolokia2_agent.metric]] + name = "jvm_runtime" + mbean = "java.lang:type=Runtime" + paths = ["Uptime"] +``` + +Optionally, specify SSL options for communicating with agents: + +```toml +[[inputs.jolokia2_agent]] + urls = ["https://agent:8080/jolokia"] + ssl_ca = "/var/private/ca.pem" + ssl_cert = "/var/private/client.pem" + ssl_key = "/var/private/client-key.pem" + #insecure_skip_verify = false + + [[inputs.jolokia2_agent.metric]] + name = "jvm_runtime" + mbean = "java.lang:type=Runtime" + paths = ["Uptime"] +``` + +## Jolokia Proxy Configuration + +The `jolokia2_proxy` input plugin reads JMX metrics from one or more _targets_ by interacting with a [Jolokia proxy](https://jolokia.org/features/proxy.html) REST endpoint. + +```toml +[[inputs.jolokia2_proxy]] + url = "http://proxy:8080/jolokia" + + #default_target_username = "" + #default_target_password = "" + [[inputs.jolokia2_proxy.target]] + url = "service:jmx:rmi:///jndi/rmi://targethost:9999/jmxrmi" + # username = "" + # password = "" + + [[inputs.jolokia2_proxy.metric]] + name = "jvm_runtime" + mbean = "java.lang:type=Runtime" + paths = ["Uptime"] +``` + +Optionally, specify SSL options for communicating with proxies: + +```toml +[[inputs.jolokia2_proxy]] + url = "https://proxy:8080/jolokia" + + ssl_ca = "/var/private/ca.pem" + ssl_cert = "/var/private/client.pem" + ssl_key = "/var/private/client-key.pem" + #insecure_skip_verify = false + + #default_target_username = "" + #default_target_password = "" + [[inputs.jolokia2_proxy.target]] + url = "service:jmx:rmi:///jndi/rmi://targethost:9999/jmxrmi" + # username = "" + # password = "" + + [[inputs.jolokia2_agent.metric]] + name = "jvm_runtime" + mbean = "java.lang:type=Runtime" + paths = ["Uptime"] +``` + +## Jolokia Metric Configuration + +Each `metric` declaration generates a Jolokia request to fetch telemetry from a JMX MBean. + +| Key | Required | Description | +|----------------|----------|-------------| +| `mbean` | yes | The object name of a JMX MBean. MBean property-key values can contain a wildcard `*`, allowing you to fetch multiple MBeans with one declaration. | +| `paths` | no | A list of MBean attributes to read. | +| `tag_keys` | no | A list of MBean property-key names to convert into tags. The property-key name becomes the tag name, while the property-key value becomes the tag value. | +| `tag_prefix` | no | A string to prepend to the tag names produced by this `metric` declaration. | +| `field_name` | no | A string to set as the name of the field produced by this metric; can contain substitutions. | +| `field_prefix` | no | A string to prepend to the field names produced by this `metric` declaration; can contain substitutions. | + +Use `paths` to refine which fields to collect. + +```toml +[[inputs.jolokia2_agent.metric]] + name = "jvm_memory" + mbean = "java.lang:type=Memory" + paths = ["HeapMemoryUsage", "NonHeapMemoryUsage", "ObjectPendingFinalizationCount"] +``` + +The preceeding `jvm_memory` `metric` declaration produces the following output: + +``` +jvm_memory HeapMemoryUsage.committed=4294967296,HeapMemoryUsage.init=4294967296,HeapMemoryUsage.max=4294967296,HeapMemoryUsage.used=1750658992,NonHeapMemoryUsage.committed=67350528,NonHeapMemoryUsage.init=2555904,NonHeapMemoryUsage.max=-1,NonHeapMemoryUsage.used=65821352,ObjectPendingFinalizationCount=0 1503762436000000000 +``` + +Use `*` wildcards against `mbean` property-key values to create distinct series by capturing values into `tag_keys`. + +```toml +[[inputs.jolokia2_agent.metric]] + name = "jvm_garbage_collector" + mbean = "java.lang:name=*,type=GarbageCollector" + paths = ["CollectionTime", "CollectionCount"] + tag_keys = ["name"] +``` + +Since `name=*` matches both `G1 Old Generation` and `G1 Young Generation`, and `name` is used as a tag, the preceeding `jvm_garbage_collector` `metric` declaration produces two metrics. + +``` +jvm_garbage_collector,name=G1\ Old\ Generation CollectionCount=0,CollectionTime=0 1503762520000000000 +jvm_garbage_collector,name=G1\ Young\ Generation CollectionTime=32,CollectionCount=2 1503762520000000000 +``` + +Use `tag_prefix` along with `tag_keys` to add detail to tag names. + +```toml +[[inputs.jolokia2_agent.metric]] + name = "jvm_memory_pool" + mbean = "java.lang:name=*,type=MemoryPool" + paths = ["Usage", "PeakUsage", "CollectionUsage"] + tag_keys = ["name"] + tag_prefix = "pool_" +``` + +The preceeding `jvm_memory_pool` `metric` declaration produces six metrics, each with a distinct `pool_name` tag. + +``` +jvm_memory_pool,pool_name=Compressed\ Class\ Space PeakUsage.max=1073741824,PeakUsage.committed=3145728,PeakUsage.init=0,Usage.committed=3145728,Usage.init=0,PeakUsage.used=3017976,Usage.max=1073741824,Usage.used=3017976 1503764025000000000 +jvm_memory_pool,pool_name=Code\ Cache PeakUsage.init=2555904,PeakUsage.committed=6291456,Usage.committed=6291456,PeakUsage.used=6202752,PeakUsage.max=251658240,Usage.used=6210368,Usage.max=251658240,Usage.init=2555904 1503764025000000000 +jvm_memory_pool,pool_name=G1\ Eden\ Space CollectionUsage.max=-1,PeakUsage.committed=56623104,PeakUsage.init=56623104,PeakUsage.used=53477376,Usage.max=-1,Usage.committed=49283072,Usage.used=19922944,CollectionUsage.committed=49283072,CollectionUsage.init=56623104,CollectionUsage.used=0,PeakUsage.max=-1,Usage.init=56623104 1503764025000000000 +jvm_memory_pool,pool_name=G1\ Old\ Gen CollectionUsage.max=1073741824,CollectionUsage.committed=0,PeakUsage.max=1073741824,PeakUsage.committed=1017118720,PeakUsage.init=1017118720,PeakUsage.used=137032208,Usage.max=1073741824,CollectionUsage.init=1017118720,Usage.committed=1017118720,Usage.init=1017118720,Usage.used=134708752,CollectionUsage.used=0 1503764025000000000 +jvm_memory_pool,pool_name=G1\ Survivor\ Space Usage.max=-1,Usage.init=0,CollectionUsage.max=-1,CollectionUsage.committed=7340032,CollectionUsage.used=7340032,PeakUsage.committed=7340032,Usage.committed=7340032,Usage.used=7340032,CollectionUsage.init=0,PeakUsage.max=-1,PeakUsage.init=0,PeakUsage.used=7340032 1503764025000000000 +jvm_memory_pool,pool_name=Metaspace PeakUsage.init=0,PeakUsage.used=21852224,PeakUsage.max=-1,Usage.max=-1,Usage.committed=22282240,Usage.init=0,Usage.used=21852224,PeakUsage.committed=22282240 1503764025000000000 +``` + +Use substitutions to create fields and field prefixes with MBean property-keys captured by wildcards. In the following example, `$1` represents the value of the property-key `name`, and `$2` represents the value of the property-key `topic`. + +```toml +[[inputs.jolokia2_agent.metric]] + name = "kafka_topic" + mbean = "kafka.server:name=*,topic=*,type=BrokerTopicMetrics" + field_prefix = "$1" + tag_keys = ["topic"] +``` + +The preceeding `kafka_topic` `metric` declaration produces a metric per Kafka topic. The `name` Mbean property-key is used as a field prefix to aid in gathering fields together into the single metric. + +``` +kafka_topic,topic=my-topic BytesOutPerSec.MeanRate=0,FailedProduceRequestsPerSec.MeanRate=0,BytesOutPerSec.EventType="bytes",BytesRejectedPerSec.Count=0,FailedProduceRequestsPerSec.RateUnit="SECONDS",FailedProduceRequestsPerSec.EventType="requests",MessagesInPerSec.RateUnit="SECONDS",BytesInPerSec.EventType="bytes",BytesOutPerSec.RateUnit="SECONDS",BytesInPerSec.OneMinuteRate=0,FailedFetchRequestsPerSec.EventType="requests",TotalFetchRequestsPerSec.MeanRate=146.301533938701,BytesOutPerSec.FifteenMinuteRate=0,TotalProduceRequestsPerSec.MeanRate=0,BytesRejectedPerSec.FifteenMinuteRate=0,MessagesInPerSec.FiveMinuteRate=0,BytesInPerSec.Count=0,BytesRejectedPerSec.MeanRate=0,FailedFetchRequestsPerSec.MeanRate=0,FailedFetchRequestsPerSec.FiveMinuteRate=0,FailedFetchRequestsPerSec.FifteenMinuteRate=0,FailedProduceRequestsPerSec.Count=0,TotalFetchRequestsPerSec.FifteenMinuteRate=128.59314292334466,TotalFetchRequestsPerSec.OneMinuteRate=126.71551273850747,TotalFetchRequestsPerSec.Count=1353483,TotalProduceRequestsPerSec.FifteenMinuteRate=0,FailedFetchRequestsPerSec.OneMinuteRate=0,FailedFetchRequestsPerSec.Count=0,FailedProduceRequestsPerSec.FifteenMinuteRate=0,TotalFetchRequestsPerSec.FiveMinuteRate=130.8516148751592,TotalFetchRequestsPerSec.RateUnit="SECONDS",BytesRejectedPerSec.RateUnit="SECONDS",BytesInPerSec.MeanRate=0,FailedFetchRequestsPerSec.RateUnit="SECONDS",BytesRejectedPerSec.OneMinuteRate=0,BytesOutPerSec.Count=0,BytesOutPerSec.OneMinuteRate=0,MessagesInPerSec.FifteenMinuteRate=0,MessagesInPerSec.MeanRate=0,BytesInPerSec.FiveMinuteRate=0,TotalProduceRequestsPerSec.RateUnit="SECONDS",FailedProduceRequestsPerSec.OneMinuteRate=0,TotalProduceRequestsPerSec.EventType="requests",BytesRejectedPerSec.FiveMinuteRate=0,BytesRejectedPerSec.EventType="bytes",BytesOutPerSec.FiveMinuteRate=0,FailedProduceRequestsPerSec.FiveMinuteRate=0,MessagesInPerSec.Count=0,TotalProduceRequestsPerSec.FiveMinuteRate=0,TotalProduceRequestsPerSec.OneMinuteRate=0,MessagesInPerSec.EventType="messages",MessagesInPerSec.OneMinuteRate=0,TotalFetchRequestsPerSec.EventType="requests",BytesInPerSec.RateUnit="SECONDS",BytesInPerSec.FifteenMinuteRate=0,TotalProduceRequestsPerSec.Count=0 1503767532000000000 +``` + +Both `jolokia2_agent` and `jolokia2_proxy` plugins support default configurations that apply to every `metric` declaration. + +| Key | Default Value | Description | +|---------------------------|---------------|-------------| +| `default_field_separator` | `.` | A character to use to join Mbean attributes when creating fields. | +| `default_field_prefix` | _None_ | A string to prepend to the field names produced by all `metric` declarations. | +| `default_tag_prefix` | _None_ | A string to prepend to the tag names produced by all `metric` declarations. | diff --git a/plugins/inputs/jolokia2/client.go b/plugins/inputs/jolokia2/client.go new file mode 100644 index 000000000..aa9a8f87b --- /dev/null +++ b/plugins/inputs/jolokia2/client.go @@ -0,0 +1,271 @@ +package jolokia2 + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "path" + "time" + + "github.com/influxdata/telegraf/internal" +) + +type Client struct { + URL string + client *http.Client + config *ClientConfig +} + +type ClientConfig struct { + ResponseTimeout time.Duration + Username string + Password string + SSLCA string + SSLCert string + SSLKey string + InsecureSkipVerify bool + + ProxyConfig *ProxyConfig +} + +type ProxyConfig struct { + DefaultTargetUsername string + DefaultTargetPassword string + Targets []ProxyTargetConfig +} + +type ProxyTargetConfig struct { + Username string + Password string + URL string +} + +type ReadRequest struct { + Mbean string + Attributes []string + Path string +} + +type ReadResponse struct { + Status int + Value interface{} + RequestMbean string + RequestAttributes []string + RequestPath string + RequestTarget string +} + +// Jolokia JSON request object. Example: { +// "type": "read", +// "mbean: "java.lang:type="Runtime", +// "attribute": "Uptime", +// "target": { +// "url: "service:jmx:rmi:///jndi/rmi://target:9010/jmxrmi" +// } +// } +type jolokiaRequest struct { + Type string `json:"type"` + Mbean string `json:"mbean"` + Attribute interface{} `json:"attribute,omitempty"` + Path string `json:"path,omitempty"` + Target *jolokiaTarget `json:"target,omitempty"` +} + +type jolokiaTarget struct { + URL string `json:"url"` + User string `json:"user,omitempty"` + Password string `json:"password,omitempty"` +} + +// Jolokia JSON response object. Example: { +// "request": { +// "type": "read" +// "mbean": "java.lang:type=Runtime", +// "attribute": "Uptime", +// "target": { +// "url": "service:jmx:rmi:///jndi/rmi://target:9010/jmxrmi" +// } +// }, +// "value": 1214083, +// "timestamp": 1488059309, +// "status": 200 +// } +type jolokiaResponse struct { + Request jolokiaRequest `json:"request"` + Value interface{} `json:"value"` + Status int `json:"status"` +} + +func NewClient(url string, config *ClientConfig) (*Client, error) { + tlsConfig, err := internal.GetTLSConfig( + config.SSLCert, config.SSLKey, config.SSLCA, config.InsecureSkipVerify) + if err != nil { + return nil, err + } + + transport := &http.Transport{ + ResponseHeaderTimeout: config.ResponseTimeout, + TLSClientConfig: tlsConfig, + } + + client := &http.Client{ + Transport: transport, + Timeout: config.ResponseTimeout, + } + + return &Client{ + URL: url, + config: config, + client: client, + }, nil +} + +func (c *Client) read(requests []ReadRequest) ([]ReadResponse, error) { + jrequests := makeJolokiaRequests(requests, c.config.ProxyConfig) + requestBody, err := json.Marshal(jrequests) + if err != nil { + return nil, err + } + + requestUrl, err := formatReadUrl(c.URL, c.config.Username, c.config.Password) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", requestUrl, bytes.NewBuffer(requestBody)) + req.Header.Add("Content-type", "application/json") + + resp, err := c.client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)", + c.URL, resp.StatusCode, http.StatusText(resp.StatusCode), http.StatusOK, http.StatusText(http.StatusOK)) + } + + responseBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + var jresponses []jolokiaResponse + if err = json.Unmarshal([]byte(responseBody), &jresponses); err != nil { + return nil, fmt.Errorf("Error decoding JSON response: %s: %s", err, responseBody) + } + + return makeReadResponses(jresponses), nil +} + +func makeJolokiaRequests(rrequests []ReadRequest, proxyConfig *ProxyConfig) []jolokiaRequest { + jrequests := make([]jolokiaRequest, 0) + if proxyConfig == nil { + for _, rr := range rrequests { + jrequests = append(jrequests, makeJolokiaRequest(rr, nil)) + } + } else { + for _, t := range proxyConfig.Targets { + if t.Username == "" { + t.Username = proxyConfig.DefaultTargetUsername + } + if t.Password == "" { + t.Password = proxyConfig.DefaultTargetPassword + } + + for _, rr := range rrequests { + jtarget := &jolokiaTarget{ + URL: t.URL, + User: t.Username, + Password: t.Password, + } + + jrequests = append(jrequests, makeJolokiaRequest(rr, jtarget)) + } + } + } + + return jrequests +} + +func makeJolokiaRequest(rrequest ReadRequest, jtarget *jolokiaTarget) jolokiaRequest { + jrequest := jolokiaRequest{ + Type: "read", + Mbean: rrequest.Mbean, + Path: rrequest.Path, + Target: jtarget, + } + + if len(rrequest.Attributes) == 1 { + jrequest.Attribute = rrequest.Attributes[0] + } + if len(rrequest.Attributes) > 1 { + jrequest.Attribute = rrequest.Attributes + } + + return jrequest +} + +func makeReadResponses(jresponses []jolokiaResponse) []ReadResponse { + rresponses := make([]ReadResponse, 0) + + for _, jr := range jresponses { + rrequest := ReadRequest{ + Mbean: jr.Request.Mbean, + Path: jr.Request.Path, + Attributes: []string{}, + } + + attrValue := jr.Request.Attribute + if attrValue != nil { + attribute, ok := attrValue.(string) + if ok { + rrequest.Attributes = []string{attribute} + } else { + attributes, _ := attrValue.([]interface{}) + rrequest.Attributes = make([]string, len(attributes)) + for i, attr := range attributes { + rrequest.Attributes[i] = attr.(string) + } + } + } + rresponse := ReadResponse{ + Value: jr.Value, + Status: jr.Status, + RequestMbean: rrequest.Mbean, + RequestAttributes: rrequest.Attributes, + RequestPath: rrequest.Path, + } + if jtarget := jr.Request.Target; jtarget != nil { + rresponse.RequestTarget = jtarget.URL + } + + rresponses = append(rresponses, rresponse) + } + + return rresponses +} + +func formatReadUrl(configUrl, username, password string) (string, error) { + parsedUrl, err := url.Parse(configUrl) + if err != nil { + return "", err + } + + readUrl := url.URL{ + Host: parsedUrl.Host, + Scheme: parsedUrl.Scheme, + } + + if username != "" || password != "" { + readUrl.User = url.UserPassword(username, password) + } + + readUrl.Path = path.Join(parsedUrl.Path, "read") + readUrl.Query().Add("ignoreErrors", "true") + return readUrl.String(), nil +} diff --git a/plugins/inputs/jolokia2/client_test.go b/plugins/inputs/jolokia2/client_test.go new file mode 100644 index 000000000..0c7cd4c01 --- /dev/null +++ b/plugins/inputs/jolokia2/client_test.go @@ -0,0 +1,129 @@ +package jolokia2 + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + + "github.com/influxdata/telegraf/testutil" +) + +func TestJolokia2_ClientAuthRequest(t *testing.T) { + var username string + var password string + var requests []map[string]interface{} + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + username, password, _ = r.BasicAuth() + + body, _ := ioutil.ReadAll(r.Body) + err := json.Unmarshal(body, &requests) + if err != nil { + t.Error(err) + } + + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + plugin := setupPlugin(t, fmt.Sprintf(` + [jolokia2_agent] + urls = ["%s/jolokia"] + username = "sally" + password = "seashore" + [[jolokia2_agent.metric]] + name = "hello" + mbean = "hello:foo=bar" + `, server.URL)) + + var acc testutil.Accumulator + plugin.Gather(&acc) + + if username != "sally" { + t.Errorf("Expected to post with username %s, but was %s", "sally", username) + } + if password != "seashore" { + t.Errorf("Expected to post with password %s, but was %s", "seashore", password) + } + if len(requests) == 0 { + t.Fatal("Expected to post a request body, but was empty.") + } + + request := requests[0] + if expect := "hello:foo=bar"; request["mbean"] != expect { + t.Errorf("Expected to query mbean %s, but was %s", expect, request["mbean"]) + } +} + +func TestJolokia2_ClientProxyAuthRequest(t *testing.T) { + var requests []map[string]interface{} + + var username string + var password string + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + username, password, _ = r.BasicAuth() + + body, _ := ioutil.ReadAll(r.Body) + err := json.Unmarshal(body, &requests) + if err != nil { + t.Error(err) + } + + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + plugin := setupPlugin(t, fmt.Sprintf(` + [jolokia2_proxy] + url = "%s/jolokia" + username = "sally" + password = "seashore" + + [[jolokia2_proxy.target]] + url = "service:jmx:rmi:///jndi/rmi://target:9010/jmxrmi" + username = "jack" + password = "benimble" + + [[jolokia2_proxy.metric]] + name = "hello" + mbean = "hello:foo=bar" + `, server.URL)) + + var acc testutil.Accumulator + plugin.Gather(&acc) + + if username != "sally" { + t.Errorf("Expected to post with username %s, but was %s", "sally", username) + } + if password != "seashore" { + t.Errorf("Expected to post with password %s, but was %s", "seashore", password) + } + if len(requests) == 0 { + t.Fatal("Expected to post a request body, but was empty.") + } + + request := requests[0] + if expect := "hello:foo=bar"; request["mbean"] != expect { + t.Errorf("Expected to query mbean %s, but was %s", expect, request["mbean"]) + } + + target, ok := request["target"].(map[string]interface{}) + if !ok { + t.Fatal("Expected a proxy target, but was empty.") + } + + if expect := "service:jmx:rmi:///jndi/rmi://target:9010/jmxrmi"; target["url"] != expect { + t.Errorf("Expected proxy target url %s, but was %s", expect, target["url"]) + } + + if expect := "jack"; target["user"] != expect { + t.Errorf("Expected proxy target username %s, but was %s", expect, target["user"]) + } + + if expect := "benimble"; target["password"] != expect { + t.Errorf("Expected proxy target password %s, but was %s", expect, target["password"]) + } +} diff --git a/plugins/inputs/jolokia2/examples/java.conf b/plugins/inputs/jolokia2/examples/java.conf new file mode 100644 index 000000000..361bce1d2 --- /dev/null +++ b/plugins/inputs/jolokia2/examples/java.conf @@ -0,0 +1,40 @@ + +[[inputs.jolokia2_agent]] + urls = ["http://localhost:8080/jolokia"] + + [[inputs.jolokia2_agent.metric]] + name = "java_runtime" + mbean = "java.lang:type=Runtime" + paths = ["Uptime"] + + [[inputs.jolokia2_agent.metric]] + name = "java_memory" + mbean = "java.lang:type=Memory" + paths = ["HeapMemoryUsage", "NonHeapMemoryUsage", "ObjectPendingFinalizationCount"] + + [[inputs.jolokia2_agent.metric]] + name = "java_garbage_collector" + mbean = "java.lang:name=G1*,type=GarbageCollector" + paths = ["CollectionTime", "CollectionCount"] + tag_keys = ["name"] + + [[inputs.jolokia2_agent.metric]] + name = "java_last_garbage_collection" + mbean = "java.lang:name=G1 Young Generation,type=GarbageCollector" + paths = ["LastGcInfo/duration", "LastGcInfo/GcThreadCount", "LastGcInfo/memoryUsageAfterGc"] + + [[inputs.jolokia2_agent.metrics]] + name = "java_threading" + mbean = "java.lang:type=Threading" + paths = ["TotalStartedThreadCount", "ThreadCount", "DaemonThreadCount", "PeakThreadCount"] + + [[inputs.jolokia2_agent.metrics]] + name = "java_class_loading" + mbean = "java.lang:type=ClassLoading" + paths = ["LoadedClassCount", "UnloadedClassCount", "TotalLoadedClassCount"] + + [[inputs.jolokia2_agent.metrics]] + name = "java_memory_pool" + mbean = "java.lang:name=*,type=MemoryPool" + paths = ["Usage", "PeakUsage", "CollectionUsage"] + tag_keys = ["name"] diff --git a/plugins/inputs/jolokia2/examples/kafka.conf b/plugins/inputs/jolokia2/examples/kafka.conf new file mode 100644 index 000000000..ae34831fc --- /dev/null +++ b/plugins/inputs/jolokia2/examples/kafka.conf @@ -0,0 +1,55 @@ + +[[inputs.jolokia2_agent]] + name_prefix = "kafka_" + + urls = ["http://localhost:8080/jolokia"] + + [[inputs.jolokia2_agent.metric]] + name = "controller" + mbean = "kafka.controller:name=*,type=*" + field_prefix = "$1." + + [[inputs.jolokia2_agent.metric]] + name = "replica_manager" + mbean = "kafka.server:name=*,type=ReplicaManager" + field_prefix = "$1." + + [[inputs.jolokia2_agent.metric]] + name = "purgatory" + mbean = "kafka.server:delayedOperation=*,name=*,type=DelayedOperationPurgatory" + field_prefix = "$1." + field_name = "$2" + + [[inputs.jolokia2_agent.metric]] + name = "client" + mbean = "kafka.server:client-id=*,type=*" + tag_keys = ["client-id", "type"] + + [[inputs.jolokia2_agent.metric]] + name = "request" + mbean = "kafka.network:name=*,request=*,type=RequestMetrics" + field_prefix = "$1." + tag_keys = ["request"] + + [[inputs.jolokia2_agent.metric]] + name = "topics" + mbean = "kafka.server:name=*,type=BrokerTopicMetrics" + field_prefix = "$1." + + [[inputs.jolokia2_agent.metric]] + name = "topic" + mbean = "kafka.server:name=*,topic=*,type=BrokerTopicMetrics" + field_prefix = "$1." + tag_keys = ["topic"] + + [[inputs.jolokia2_agent.metric]] + name = "partition" + mbean = "kafka.log:name=*,partition=*,topic=*,type=Log" + field_name = "$1" + tag_keys = ["topic", "partition"] + + [[inputs.jolokia2_agent.metric]] + name = "partition" + mbean = "kafka.cluster:name=UnderReplicated,partition=*,topic=*,type=Partition" + field_name = "UnderReplicatedPartitions" + tag_keys = ["topic", "partition"] diff --git a/plugins/inputs/jolokia2/gatherer.go b/plugins/inputs/jolokia2/gatherer.go new file mode 100644 index 000000000..3cc2e1217 --- /dev/null +++ b/plugins/inputs/jolokia2/gatherer.go @@ -0,0 +1,277 @@ +package jolokia2 + +import ( + "fmt" + "sort" + "strings" + + "github.com/influxdata/telegraf" +) + +const defaultFieldName = "value" + +type Gatherer struct { + metrics []Metric + requests []ReadRequest +} + +func NewGatherer(metrics []Metric) *Gatherer { + return &Gatherer{ + metrics: metrics, + requests: makeReadRequests(metrics), + } +} + +// Gather adds points to an accumulator from responses returned +// by a Jolokia agent. +func (g *Gatherer) Gather(client *Client, acc telegraf.Accumulator) error { + var tags map[string]string + + if client.config.ProxyConfig != nil { + tags = map[string]string{"jolokia_proxy_url": client.URL} + } else { + tags = map[string]string{"jolokia_agent_url": client.URL} + } + + requests := makeReadRequests(g.metrics) + responses, err := client.read(requests) + if err != nil { + return err + } + + g.gatherResponses(responses, tags, acc) + return nil +} + +// gatherReponses adds points to an accumulator from the ReadResponse objects +// returned by a Jolokia agent. +func (g *Gatherer) gatherResponses(responses []ReadResponse, tags map[string]string, acc telegraf.Accumulator) { + series := make(map[string][]point, 0) + + for _, metric := range g.metrics { + points, ok := series[metric.Name] + if !ok { + points = make([]point, 0) + } + + responsePoints, responseErrors := g.generatePoints(metric, responses) + + for _, responsePoint := range responsePoints { + points = append(points, responsePoint) + } + + for _, err := range responseErrors { + acc.AddError(err) + } + + series[metric.Name] = points + } + + for measurement, points := range series { + for _, point := range compactPoints(points) { + acc.AddFields(measurement, + point.Fields, mergeTags(point.Tags, tags)) + } + } +} + +// generatePoints creates points for the supplied metric from the ReadResponse +// objects returned by the Jolokia client. +func (g *Gatherer) generatePoints(metric Metric, responses []ReadResponse) ([]point, []error) { + points := make([]point, 0) + errors := make([]error, 0) + + for _, response := range responses { + switch response.Status { + case 200: + break + case 404: + continue + default: + errors = append(errors, fmt.Errorf("Unexpected status in response from target %s: %d", + response.RequestTarget, response.Status)) + continue + } + + if !metricMatchesResponse(metric, response) { + continue + } + + pb := newPointBuilder(metric, response.RequestAttributes, response.RequestPath) + for _, point := range pb.Build(metric.Mbean, response.Value) { + if response.RequestTarget != "" { + point.Tags["jolokia_agent_url"] = response.RequestTarget + } + + points = append(points, point) + } + } + + return points, errors +} + +// mergeTags combines two tag sets into a single tag set. +func mergeTags(metricTags, outerTags map[string]string) map[string]string { + tags := make(map[string]string) + for k, v := range outerTags { + tags[k] = v + } + for k, v := range metricTags { + tags[k] = v + } + + return tags +} + +// metricMatchesResponse returns true when the name, attributes, and path +// of a Metric match the corresponding elements in a ReadResponse object +// returned by a Jolokia agent. +func metricMatchesResponse(metric Metric, response ReadResponse) bool { + + if metric.Mbean != response.RequestMbean { + return false + } + + if len(metric.Paths) == 0 { + return len(response.RequestAttributes) == 0 + } + + for _, fullPath := range metric.Paths { + segments := strings.SplitN(fullPath, "/", 2) + attribute := segments[0] + + var path string + if len(segments) == 2 { + path = segments[1] + } + + for _, rattr := range response.RequestAttributes { + if attribute == rattr && path == response.RequestPath { + return true + } + } + } + + return false +} + +// compactPoints attepts to remove points by compacting points +// with matching tag sets. When a match is found, the fields from +// one point are moved to another, and the empty point is removed. +func compactPoints(points []point) []point { + compactedPoints := make([]point, 0) + + for _, sourcePoint := range points { + keepPoint := true + + for _, compactPoint := range compactedPoints { + if !tagSetsMatch(sourcePoint.Tags, compactPoint.Tags) { + continue + } + + keepPoint = false + for key, val := range sourcePoint.Fields { + compactPoint.Fields[key] = val + } + } + + if keepPoint { + compactedPoints = append(compactedPoints, sourcePoint) + } + } + + return compactedPoints +} + +// tagSetsMatch returns true if two maps are equivalent. +func tagSetsMatch(a, b map[string]string) bool { + if len(a) != len(b) { + return false + } + + for ak, av := range a { + bv, ok := b[ak] + if !ok { + return false + } + if av != bv { + return false + } + } + + return true +} + +// makeReadRequests creates ReadRequest objects from metrics definitions. +func makeReadRequests(metrics []Metric) []ReadRequest { + var requests []ReadRequest + for _, metric := range metrics { + + if len(metric.Paths) == 0 { + requests = append(requests, ReadRequest{ + Mbean: metric.Mbean, + Attributes: []string{}, + }) + } else { + attributes := make(map[string][]string) + + for _, path := range metric.Paths { + segments := strings.Split(path, "/") + attribute := segments[0] + + if _, ok := attributes[attribute]; !ok { + attributes[attribute] = make([]string, 0) + } + + if len(segments) > 1 { + paths := attributes[attribute] + attributes[attribute] = append(paths, strings.Join(segments[1:], "/")) + } + } + + rootAttributes := findRequestAttributesWithoutPaths(attributes) + if len(rootAttributes) > 0 { + requests = append(requests, ReadRequest{ + Mbean: metric.Mbean, + Attributes: rootAttributes, + }) + } + + for _, deepAttribute := range findRequestAttributesWithPaths(attributes) { + for _, path := range attributes[deepAttribute] { + requests = append(requests, ReadRequest{ + Mbean: metric.Mbean, + Attributes: []string{deepAttribute}, + Path: path, + }) + } + } + } + } + + return requests +} + +func findRequestAttributesWithoutPaths(attributes map[string][]string) []string { + results := make([]string, 0) + for attr, paths := range attributes { + if len(paths) == 0 { + results = append(results, attr) + } + } + + sort.Strings(results) + return results +} + +func findRequestAttributesWithPaths(attributes map[string][]string) []string { + results := make([]string, 0) + for attr, paths := range attributes { + if len(paths) != 0 { + results = append(results, attr) + } + } + + sort.Strings(results) + return results +} diff --git a/plugins/inputs/jolokia2/gatherer_test.go b/plugins/inputs/jolokia2/gatherer_test.go new file mode 100644 index 000000000..ca83cf0ac --- /dev/null +++ b/plugins/inputs/jolokia2/gatherer_test.go @@ -0,0 +1,104 @@ +package jolokia2 + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestJolokia2_makeReadRequests(t *testing.T) { + cases := []struct { + metric Metric + expected []ReadRequest + }{ + { + metric: Metric{ + Name: "object", + Mbean: "test:foo=bar", + }, + expected: []ReadRequest{ + ReadRequest{ + Mbean: "test:foo=bar", + Attributes: []string{}, + }, + }, + }, { + metric: Metric{ + Name: "object_with_an_attribute", + Mbean: "test:foo=bar", + Paths: []string{"biz"}, + }, + expected: []ReadRequest{ + ReadRequest{ + Mbean: "test:foo=bar", + Attributes: []string{"biz"}, + }, + }, + }, { + metric: Metric{ + Name: "object_with_attributes", + Mbean: "test:foo=bar", + Paths: []string{"baz", "biz"}, + }, + expected: []ReadRequest{ + ReadRequest{ + Mbean: "test:foo=bar", + Attributes: []string{"baz", "biz"}, + }, + }, + }, { + metric: Metric{ + Name: "object_with_an_attribute_and_path", + Mbean: "test:foo=bar", + Paths: []string{"biz/baz"}, + }, + expected: []ReadRequest{ + ReadRequest{ + Mbean: "test:foo=bar", + Attributes: []string{"biz"}, + Path: "baz", + }, + }, + }, { + metric: Metric{ + Name: "object_with_an_attribute_and_a_deep_path", + Mbean: "test:foo=bar", + Paths: []string{"biz/baz/fiz/faz"}, + }, + expected: []ReadRequest{ + ReadRequest{ + Mbean: "test:foo=bar", + Attributes: []string{"biz"}, + Path: "baz/fiz/faz", + }, + }, + }, { + metric: Metric{ + Name: "object_with_attributes_and_paths", + Mbean: "test:foo=bar", + Paths: []string{"baz/biz", "faz/fiz"}, + }, + expected: []ReadRequest{ + ReadRequest{ + Mbean: "test:foo=bar", + Attributes: []string{"baz"}, + Path: "biz", + }, + ReadRequest{ + Mbean: "test:foo=bar", + Attributes: []string{"faz"}, + Path: "fiz", + }, + }, + }, + } + + for _, c := range cases { + payload := makeReadRequests([]Metric{c.metric}) + + assert.Equal(t, len(c.expected), len(payload), "Failing case: "+c.metric.Name) + for _, actual := range payload { + assert.Contains(t, c.expected, actual, "Failing case: "+c.metric.Name) + } + } +} diff --git a/plugins/inputs/jolokia2/jolokia.go b/plugins/inputs/jolokia2/jolokia.go new file mode 100644 index 000000000..430f58741 --- /dev/null +++ b/plugins/inputs/jolokia2/jolokia.go @@ -0,0 +1,21 @@ +package jolokia2 + +import ( + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +func init() { + inputs.Add("jolokia2_agent", func() telegraf.Input { + return &JolokiaAgent{ + Metrics: []MetricConfig{}, + DefaultFieldSeparator: ".", + } + }) + inputs.Add("jolokia2_proxy", func() telegraf.Input { + return &JolokiaProxy{ + Metrics: []MetricConfig{}, + DefaultFieldSeparator: ".", + } + }) +} diff --git a/plugins/inputs/jolokia2/jolokia_agent.go b/plugins/inputs/jolokia2/jolokia_agent.go new file mode 100644 index 000000000..ff37fdaf6 --- /dev/null +++ b/plugins/inputs/jolokia2/jolokia_agent.go @@ -0,0 +1,112 @@ +package jolokia2 + +import ( + "fmt" + "sync" + "time" + + "github.com/influxdata/telegraf" +) + +type JolokiaAgent struct { + DefaultFieldPrefix string + DefaultFieldSeparator string + DefaultTagPrefix string + + URLs []string `toml:"urls"` + Username string + Password string + ResponseTimeout time.Duration `toml:"response_timeout"` + + SSLCA string `toml:"ssl_ca"` + SSLCert string `toml:"ssl_cert"` + SSLKey string `toml:"ssl_key"` + InsecureSkipVerify bool + + Metrics []MetricConfig `toml:"metric"` + gatherer *Gatherer +} + +func (ja *JolokiaAgent) SampleConfig() string { + return ` + # default_tag_prefix = "" + # default_field_prefix = "" + # default_field_separator = "." + + # Add agents URLs to query + urls = ["http://localhost:8080/jolokia"] + # username = "" + # password = "" + # response_timeout = "5s" + + ## Optional SSL config + # ssl_ca = "/var/private/ca.pem" + # ssl_cert = "/var/private/client.pem" + # ssl_key = "/var/private/client-key.pem" + # insecure_skip_verify = false + + ## Add metrics to read + [[inputs.jolokia2.metric]] + name = "java_runtime" + mbean = "java.lang:type=Runtime" + paths = ["Uptime"] +` +} + +func (ja *JolokiaAgent) Description() string { + return "Read JMX metrics from a Jolokia REST agent endpoint" +} + +func (ja *JolokiaAgent) Gather(acc telegraf.Accumulator) error { + if ja.gatherer == nil { + ja.gatherer = NewGatherer(ja.createMetrics()) + } + + var wg sync.WaitGroup + + for _, url := range ja.URLs { + client, err := ja.createClient(url) + if err != nil { + acc.AddError(fmt.Errorf("Unable to create client for %s: %v", url, err)) + continue + } + + wg.Add(1) + go func(client *Client) { + defer wg.Done() + + err = ja.gatherer.Gather(client, acc) + if err != nil { + acc.AddError(fmt.Errorf("Unable to gather metrics for %s: %v", client.URL, err)) + } + + }(client) + } + + wg.Wait() + + return nil +} + +func (ja *JolokiaAgent) createMetrics() []Metric { + var metrics []Metric + + for _, config := range ja.Metrics { + metrics = append(metrics, NewMetric(config, + ja.DefaultFieldPrefix, ja.DefaultFieldSeparator, ja.DefaultTagPrefix)) + } + + return metrics +} + +func (ja *JolokiaAgent) createClient(url string) (*Client, error) { + return NewClient(url, &ClientConfig{ + Username: ja.Username, + Password: ja.Password, + ResponseTimeout: ja.ResponseTimeout, + SSLCA: ja.SSLCA, + SSLCert: ja.SSLCert, + SSLKey: ja.SSLKey, + InsecureSkipVerify: ja.InsecureSkipVerify, + }) +} diff --git a/plugins/inputs/jolokia2/jolokia_proxy.go b/plugins/inputs/jolokia2/jolokia_proxy.go new file mode 100644 index 000000000..f19a74e84 --- /dev/null +++ b/plugins/inputs/jolokia2/jolokia_proxy.go @@ -0,0 +1,129 @@ +package jolokia2 + +import ( + "time" + + "github.com/influxdata/telegraf" +) + +type JolokiaProxy struct { + DefaultFieldPrefix string + DefaultFieldSeparator string + DefaultTagPrefix string + + URL string `toml:"url"` + DefaultTargetPassword string + DefaultTargetUsername string + Targets []JolokiaProxyTargetConfig `toml:"target"` + + Username string + Password string + SSLCA string `toml:"ssl_ca"` + SSLCert string `toml:"ssl_cert"` + SSLKey string `toml:"ssl_key"` + InsecureSkipVerify bool + ResponseTimeout time.Duration `toml:"response_timeout"` + + Metrics []MetricConfig `toml:"metric"` + client *Client + gatherer *Gatherer +} + +type JolokiaProxyTargetConfig struct { + URL string `toml:"url"` + Username string + Password string +} + +func (jp *JolokiaProxy) SampleConfig() string { + return ` + # default_tag_prefix = "" + # default_field_prefix = "" + # default_field_separator = "." + + ## Proxy agent + url = "http://localhost:8080/jolokia" + # username = "" + # password = "" + # response_timeout = "5s" + + ## Optional SSL config + # ssl_ca = "/var/private/ca.pem" + # ssl_cert = "/var/private/client.pem" + # ssl_key = "/var/private/client-key.pem" + # insecure_skip_verify = false + + ## Add proxy targets to query + # default_target_username = "" + # default_target_password = "" + [[inputs.jolokia_proxy.target]] + url = "service:jmx:rmi:///jndi/rmi://targethost:9999/jmxrmi" + # username = "" + # password = "" + + ## Add metrics to read + [[inputs.jolokia_proxy.metric]] + name = "java_runtime" + mbean = "java.lang:type=Runtime" + paths = ["Uptime"] +` +} + +func (jp *JolokiaProxy) Description() string { + return "Read JMX metrics from a Jolokia REST proxy endpoint" +} + +func (jp *JolokiaProxy) Gather(acc telegraf.Accumulator) error { + if jp.gatherer == nil { + jp.gatherer = NewGatherer(jp.createMetrics()) + } + + if jp.client == nil { + client, err := jp.createClient() + + if err != nil { + return err + } + + jp.client = client + } + + return jp.gatherer.Gather(jp.client, acc) +} + +func (jp *JolokiaProxy) createMetrics() []Metric { + var metrics []Metric + + for _, config := range jp.Metrics { + metrics = append(metrics, NewMetric(config, + jp.DefaultFieldPrefix, jp.DefaultFieldSeparator, jp.DefaultTagPrefix)) + } + + return metrics +} + +func (jp *JolokiaProxy) createClient() (*Client, error) { + proxyConfig := &ProxyConfig{ + DefaultTargetUsername: jp.DefaultTargetUsername, + DefaultTargetPassword: jp.DefaultTargetPassword, + } + + for _, target := range jp.Targets { + proxyConfig.Targets = append(proxyConfig.Targets, ProxyTargetConfig{ + URL: target.URL, + Username: target.Username, + Password: target.Password, + }) + } + + return NewClient(jp.URL, &ClientConfig{ + Username: jp.Username, + Password: jp.Password, + ResponseTimeout: jp.ResponseTimeout, + SSLCA: jp.SSLCA, + SSLCert: jp.SSLCert, + SSLKey: jp.SSLKey, + InsecureSkipVerify: jp.InsecureSkipVerify, + ProxyConfig: proxyConfig, + }) +} diff --git a/plugins/inputs/jolokia2/jolokia_test.go b/plugins/inputs/jolokia2/jolokia_test.go new file mode 100644 index 000000000..dfdc4bef9 --- /dev/null +++ b/plugins/inputs/jolokia2/jolokia_test.go @@ -0,0 +1,673 @@ +package jolokia2 + +import ( + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + "github.com/influxdata/toml" + "github.com/influxdata/toml/ast" + "github.com/stretchr/testify/assert" +) + +func TestJolokia2_ScalarValues(t *testing.T) { + config := ` + [jolokia2_agent] + urls = ["%s"] + + [[jolokia2_agent.metric]] + name = "scalar_without_attribute" + mbean = "scalar_without_attribute" + + [[jolokia2_agent.metric]] + name = "scalar_with_attribute" + mbean = "scalar_with_attribute" + paths = ["biz"] + + [[jolokia2_agent.metric]] + name = "scalar_with_attribute_and_path" + mbean = "scalar_with_attribute_and_path" + paths = ["biz/baz"] + + # This should return multiple series with different test tags. + [[jolokia2_agent.metric]] + name = "scalar_with_key_pattern" + mbean = "scalar_with_key_pattern:test=*" + tag_keys = ["test"]` + + response := `[{ + "request": { + "mbean": "scalar_without_attribute", + "type": "read" + }, + "value": 123, + "status": 200 + }, { + "request": { + "mbean": "scalar_with_attribute", + "attribute": "biz", + "type": "read" + }, + "value": 456, + "status": 200 + }, { + "request": { + "mbean": "scalar_with_attribute_and_path", + "attribute": "biz", + "path": "baz", + "type": "read" + }, + "value": 789, + "status": 200 + }, { + "request": { + "mbean": "scalar_with_key_pattern:test=*", + "type": "read" + }, + "value": { + "scalar_with_key_pattern:test=foo": 123, + "scalar_with_key_pattern:test=bar": 456 + }, + "status": 200 + }]` + + server := setupServer(http.StatusOK, response) + defer server.Close() + plugin := setupPlugin(t, fmt.Sprintf(config, server.URL)) + + var acc testutil.Accumulator + assert.NoError(t, plugin.Gather(&acc)) + + acc.AssertContainsTaggedFields(t, "scalar_without_attribute", map[string]interface{}{ + "value": 123.0, + }, map[string]string{ + "jolokia_agent_url": server.URL, + }) + + acc.AssertContainsTaggedFields(t, "scalar_with_attribute", map[string]interface{}{ + "biz": 456.0, + }, map[string]string{ + "jolokia_agent_url": server.URL, + }) + + acc.AssertContainsTaggedFields(t, "scalar_with_attribute_and_path", map[string]interface{}{ + "biz.baz": 789.0, + }, map[string]string{ + "jolokia_agent_url": server.URL, + }) + + acc.AssertContainsTaggedFields(t, "scalar_with_key_pattern", map[string]interface{}{ + "value": 123.0, + }, map[string]string{ + "jolokia_agent_url": server.URL, + "test": "foo", + }) + acc.AssertContainsTaggedFields(t, "scalar_with_key_pattern", map[string]interface{}{ + "value": 456.0, + }, map[string]string{ + "jolokia_agent_url": server.URL, + "test": "bar", + }) +} + +func TestJolokia2_ObjectValues(t *testing.T) { + config := ` + [jolokia2_agent] + urls = ["%s"] + + [[jolokia2_agent.metric]] + name = "object_without_attribute" + mbean = "object_without_attribute" + tag_keys = ["foo"] + + [[jolokia2_agent.metric]] + name = "object_with_attribute" + mbean = "object_with_attribute" + paths = ["biz"] + + [[jolokia2_agent.metric]] + name = "object_with_attribute_and_path" + mbean = "object_with_attribute_and_path" + paths = ["biz/baz"] + + # This will generate two separate request objects. + [[jolokia2_agent.metric]] + name = "object_with_branching_paths" + mbean = "object_with_branching_paths" + paths = ["foo/fiz", "foo/faz"] + + # This should return multiple series with different test tags. + [[jolokia2_agent.metric]] + name = "object_with_key_pattern" + mbean = "object_with_key_pattern:test=*" + tag_keys = ["test"]` + + response := `[{ + "request": { + "mbean": "object_without_attribute", + "type": "read" + }, + "value": { + "biz": 123, + "baz": 456 + }, + "status": 200 + }, { + "request": { + "mbean": "object_with_attribute", + "attribute": "biz", + "type": "read" + }, + "value": { + "fiz": 123, + "faz": 456 + }, + "status": 200 + }, { + "request": { + "mbean": "object_with_branching_paths", + "attribute": "foo", + "path": "fiz", + "type": "read" + }, + "value": { + "bing": 123 + }, + "status": 200 + }, { + "request": { + "mbean": "object_with_branching_paths", + "attribute": "foo", + "path": "faz", + "type": "read" + }, + "value": { + "bang": 456 + }, + "status": 200 + }, { + "request": { + "mbean": "object_with_attribute_and_path", + "attribute": "biz", + "path": "baz", + "type": "read" + }, + "value": { + "bing": 123, + "bang": 456 + }, + "status": 200 + }, { + "request": { + "mbean": "object_with_key_pattern:test=*", + "type": "read" + }, + "value": { + "object_with_key_pattern:test=foo": { + "fiz": 123 + }, + "object_with_key_pattern:test=bar": { + "biz": 456 + } + }, + "status": 200 + }]` + + server := setupServer(http.StatusOK, response) + defer server.Close() + plugin := setupPlugin(t, fmt.Sprintf(config, server.URL)) + + var acc testutil.Accumulator + assert.NoError(t, plugin.Gather(&acc)) + + acc.AssertContainsTaggedFields(t, "object_without_attribute", map[string]interface{}{ + "biz": 123.0, + "baz": 456.0, + }, map[string]string{ + "jolokia_agent_url": server.URL, + }) + + acc.AssertContainsTaggedFields(t, "object_with_attribute", map[string]interface{}{ + "biz.fiz": 123.0, + "biz.faz": 456.0, + }, map[string]string{ + "jolokia_agent_url": server.URL, + }) + + acc.AssertContainsTaggedFields(t, "object_with_attribute_and_path", map[string]interface{}{ + "biz.baz.bing": 123.0, + "biz.baz.bang": 456.0, + }, map[string]string{ + "jolokia_agent_url": server.URL, + }) + + acc.AssertContainsTaggedFields(t, "object_with_branching_paths", map[string]interface{}{ + "foo.fiz.bing": 123.0, + "foo.faz.bang": 456.0, + }, map[string]string{ + "jolokia_agent_url": server.URL, + }) + + acc.AssertContainsTaggedFields(t, "object_with_key_pattern", map[string]interface{}{ + "fiz": 123.0, + }, map[string]string{ + "test": "foo", + "jolokia_agent_url": server.URL, + }) + + acc.AssertContainsTaggedFields(t, "object_with_key_pattern", map[string]interface{}{ + "biz": 456.0, + }, map[string]string{ + "test": "bar", + "jolokia_agent_url": server.URL, + }) +} + +func TestJolokia2_StatusCodes(t *testing.T) { + config := ` + [jolokia2_agent] + urls = ["%s"] + + [[jolokia2_agent.metric]] + name = "ok" + mbean = "ok" + + [[jolokia2_agent.metric]] + name = "not_found" + mbean = "not_found" + + [[jolokia2_agent.metric]] + name = "unknown" + mbean = "unknown"` + + response := `[{ + "request": { + "mbean": "ok", + "type": "read" + }, + "value": 1, + "status": 200 + }, { + "request": { + "mbean": "not_found", + "type": "read" + }, + "status": 404 + }, { + "request": { + "mbean": "unknown", + "type": "read" + }, + "status": 500 + }]` + + server := setupServer(http.StatusOK, response) + defer server.Close() + plugin := setupPlugin(t, fmt.Sprintf(config, server.URL)) + + var acc testutil.Accumulator + assert.NoError(t, plugin.Gather(&acc)) + + acc.AssertContainsTaggedFields(t, "ok", map[string]interface{}{ + "value": 1.0, + }, map[string]string{ + "jolokia_agent_url": server.URL, + }) + + acc.AssertDoesNotContainMeasurement(t, "not_found") + acc.AssertDoesNotContainMeasurement(t, "unknown") +} + +func TestJolokia2_TagRenaming(t *testing.T) { + config := ` + [jolokia2_agent] + default_tag_prefix = "DEFAULT_PREFIX_" + urls = ["%s"] + + [[jolokia2_agent.metric]] + name = "default_tag_prefix" + mbean = "default_tag_prefix:biz=baz,fiz=faz" + tag_keys = ["biz", "fiz"] + + [[jolokia2_agent.metric]] + name = "custom_tag_prefix" + mbean = "custom_tag_prefix:biz=baz,fiz=faz" + tag_keys = ["biz", "fiz"] + tag_prefix = "CUSTOM_PREFIX_"` + + response := `[{ + "request": { + "mbean": "default_tag_prefix:biz=baz,fiz=faz", + "type": "read" + }, + "value": 123, + "status": 200 + }, { + "request": { + "mbean": "custom_tag_prefix:biz=baz,fiz=faz", + "type": "read" + }, + "value": 123, + "status": 200 + }]` + + server := setupServer(http.StatusOK, response) + defer server.Close() + plugin := setupPlugin(t, fmt.Sprintf(config, server.URL)) + + var acc testutil.Accumulator + assert.NoError(t, plugin.Gather(&acc)) + + acc.AssertContainsTaggedFields(t, "default_tag_prefix", map[string]interface{}{ + "value": 123.0, + }, map[string]string{ + "DEFAULT_PREFIX_biz": "baz", + "DEFAULT_PREFIX_fiz": "faz", + "jolokia_agent_url": server.URL, + }) + + acc.AssertContainsTaggedFields(t, "custom_tag_prefix", map[string]interface{}{ + "value": 123.0, + }, map[string]string{ + "CUSTOM_PREFIX_biz": "baz", + "CUSTOM_PREFIX_fiz": "faz", + "jolokia_agent_url": server.URL, + }) +} + +func TestJolokia2_FieldRenaming(t *testing.T) { + config := ` + [jolokia2_agent] + default_field_prefix = "DEFAULT_PREFIX_" + default_field_separator = "_DEFAULT_SEPARATOR_" + + urls = ["%s"] + + [[jolokia2_agent.metric]] + name = "default_field_modifiers" + mbean = "default_field_modifiers" + + [[jolokia2_agent.metric]] + name = "custom_field_modifiers" + mbean = "custom_field_modifiers" + field_prefix = "CUSTOM_PREFIX_" + field_separator = "_CUSTOM_SEPARATOR_" + + [[jolokia2_agent.metric]] + name = "field_prefix_substitution" + mbean = "field_prefix_substitution:foo=*" + field_prefix = "$1_" + + [[jolokia2_agent.metric]] + name = "field_name_substitution" + mbean = "field_name_substitution:foo=*" + field_prefix = "" + field_name = "$1"` + + response := `[{ + "request": { + "mbean": "default_field_modifiers", + "type": "read" + }, + "value": { + "hello": { "world": 123 } + }, + "status": 200 + }, { + "request": { + "mbean": "custom_field_modifiers", + "type": "read" + }, + "value": { + "hello": { "world": 123 } + }, + "status": 200 + }, { + "request": { + "mbean": "field_prefix_substitution:foo=*", + "type": "read" + }, + "value": { + "field_prefix_substitution:foo=biz": 123, + "field_prefix_substitution:foo=baz": 456 + }, + "status": 200 + }, { + "request": { + "mbean": "field_name_substitution:foo=*", + "type": "read" + }, + "value": { + "field_name_substitution:foo=biz": 123, + "field_name_substitution:foo=baz": 456 + }, + "status": 200 + }]` + + server := setupServer(http.StatusOK, response) + defer server.Close() + plugin := setupPlugin(t, fmt.Sprintf(config, server.URL)) + + var acc testutil.Accumulator + assert.NoError(t, plugin.Gather(&acc)) + + acc.AssertContainsTaggedFields(t, "default_field_modifiers", map[string]interface{}{ + "DEFAULT_PREFIX_hello_DEFAULT_SEPARATOR_world": 123.0, + }, map[string]string{ + "jolokia_agent_url": server.URL, + }) + + acc.AssertContainsTaggedFields(t, "custom_field_modifiers", map[string]interface{}{ + "CUSTOM_PREFIX_hello_CUSTOM_SEPARATOR_world": 123.0, + }, map[string]string{ + "jolokia_agent_url": server.URL, + }) + + acc.AssertContainsTaggedFields(t, "field_prefix_substitution", map[string]interface{}{ + "biz_value": 123.0, + "baz_value": 456.0, + }, map[string]string{ + "jolokia_agent_url": server.URL, + }) + + acc.AssertContainsTaggedFields(t, "field_name_substitution", map[string]interface{}{ + "biz": 123.0, + "baz": 456.0, + }, map[string]string{ + "jolokia_agent_url": server.URL, + }) +} + +func TestJolokia2_MetricCompaction(t *testing.T) { + config := ` + [jolokia2_agent] + urls = ["%s"] + + [[jolokia2_agent.metric]] + name = "compact_metric" + mbean = "scalar_value:flavor=chocolate" + tag_keys = ["flavor"] + + [[jolokia2_agent.metric]] + name = "compact_metric" + mbean = "scalar_value:flavor=vanilla" + tag_keys = ["flavor"] + + [[jolokia2_agent.metric]] + name = "compact_metric" + mbean = "object_value1:flavor=chocolate" + tag_keys = ["flavor"] + + [[jolokia2_agent.metric]] + name = "compact_metric" + mbean = "object_value2:flavor=chocolate" + tag_keys = ["flavor"]` + + response := `[{ + "request": { + "mbean": "scalar_value:flavor=chocolate", + "type": "read" + }, + "value": 123, + "status": 200 + }, { + "request": { + "mbean": "scalar_value:flavor=vanilla", + "type": "read" + }, + "value": 999, + "status": 200 + }, { + "request": { + "mbean": "object_value1:flavor=chocolate", + "type": "read" + }, + "value": { + "foo": 456 + }, + "status": 200 + }, { + "request": { + "mbean": "object_value2:flavor=chocolate", + "type": "read" + }, + "value": { + "bar": 789 + }, + "status": 200 + }]` + + server := setupServer(http.StatusOK, response) + defer server.Close() + plugin := setupPlugin(t, fmt.Sprintf(config, server.URL)) + + var acc testutil.Accumulator + assert.NoError(t, plugin.Gather(&acc)) + + acc.AssertContainsTaggedFields(t, "compact_metric", map[string]interface{}{ + "value": 123.0, + "foo": 456.0, + "bar": 789.0, + }, map[string]string{ + "flavor": "chocolate", + "jolokia_agent_url": server.URL, + }) + + acc.AssertContainsTaggedFields(t, "compact_metric", map[string]interface{}{ + "value": 999.0, + }, map[string]string{ + "flavor": "vanilla", + "jolokia_agent_url": server.URL, + }) +} + +func TestJolokia2_ProxyTargets(t *testing.T) { + config := ` + [jolokia2_proxy] + url = "%s" + + [[jolokia2_proxy.target]] + url = "service:jmx:rmi:///jndi/rmi://target1:9010/jmxrmi" + + [[jolokia2_proxy.target]] + url = "service:jmx:rmi:///jndi/rmi://target2:9010/jmxrmi" + + [[jolokia2_proxy.metric]] + name = "hello" + mbean = "hello:foo=bar"` + + response := `[{ + "request": { + "type": "read", + "mbean": "hello:foo=bar", + "target": { + "url": "service:jmx:rmi:///jndi/rmi://target1:9010/jmxrmi" + } + }, + "value": 123, + "status": 200 + }, { + "request": { + "type": "read", + "mbean": "hello:foo=bar", + "target": { + "url": "service:jmx:rmi:///jndi/rmi://target2:9010/jmxrmi" + } + }, + "value": 456, + "status": 200 + }]` + + server := setupServer(http.StatusOK, response) + defer server.Close() + plugin := setupPlugin(t, fmt.Sprintf(config, server.URL)) + + var acc testutil.Accumulator + assert.NoError(t, plugin.Gather(&acc)) + + acc.AssertContainsTaggedFields(t, "hello", map[string]interface{}{ + "value": 123.0, + }, map[string]string{ + "jolokia_proxy_url": server.URL, + "jolokia_agent_url": "service:jmx:rmi:///jndi/rmi://target1:9010/jmxrmi", + }) + acc.AssertContainsTaggedFields(t, "hello", map[string]interface{}{ + "value": 456.0, + }, map[string]string{ + "jolokia_proxy_url": server.URL, + "jolokia_agent_url": "service:jmx:rmi:///jndi/rmi://target2:9010/jmxrmi", + }) +} + +func setupServer(status int, resp string) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + //body, err := ioutil.ReadAll(r.Body) + //if err == nil { + // fmt.Println(string(body)) + //} + + fmt.Fprintln(w, resp) + })) +} + +func setupPlugin(t *testing.T, conf string) telegraf.Input { + table, err := toml.Parse([]byte(conf)) + if err != nil { + t.Fatalf("Unable to parse config! %v", err) + } + + for name, _ := range table.Fields { + object := table.Fields[name] + switch name { + case "jolokia2_agent": + plugin := JolokiaAgent{ + Metrics: []MetricConfig{}, + DefaultFieldSeparator: ".", + } + + if err := toml.UnmarshalTable(object.(*ast.Table), &plugin); err != nil { + t.Fatalf("Unable to parse jolokia_agent plugin config! %v", err) + } + + return &plugin + + case "jolokia2_proxy": + plugin := JolokiaProxy{ + Metrics: []MetricConfig{}, + DefaultFieldSeparator: ".", + } + + if err := toml.UnmarshalTable(object.(*ast.Table), &plugin); err != nil { + t.Fatalf("Unable to parse jolokia_proxy plugin config! %v", err) + } + + return &plugin + } + } + + return nil +} diff --git a/plugins/inputs/jolokia2/metric.go b/plugins/inputs/jolokia2/metric.go new file mode 100644 index 000000000..03baea124 --- /dev/null +++ b/plugins/inputs/jolokia2/metric.go @@ -0,0 +1,61 @@ +package jolokia2 + +// A MetricConfig represents a TOML form of +// a Metric with some optional fields. +type MetricConfig struct { + Name string + Mbean string + Paths []string + FieldName *string + FieldPrefix *string + FieldSeparator *string + TagPrefix *string + TagKeys []string +} + +// A Metric represents a specification for a +// Jolokia read request, and the transformations +// to apply to points generated from the responses. +type Metric struct { + Name string + Mbean string + Paths []string + FieldName string + FieldPrefix string + FieldSeparator string + TagPrefix string + TagKeys []string +} + +func NewMetric(config MetricConfig, defaultFieldPrefix, defaultFieldSeparator, defaultTagPrefix string) Metric { + metric := Metric{ + Name: config.Name, + Mbean: config.Mbean, + Paths: config.Paths, + TagKeys: config.TagKeys, + } + + if config.FieldName != nil { + metric.FieldName = *config.FieldName + } + + if config.FieldPrefix == nil { + metric.FieldPrefix = defaultFieldPrefix + } else { + metric.FieldPrefix = *config.FieldPrefix + } + + if config.FieldSeparator == nil { + metric.FieldSeparator = defaultFieldSeparator + } else { + metric.FieldSeparator = *config.FieldSeparator + } + + if config.TagPrefix == nil { + metric.TagPrefix = defaultTagPrefix + } else { + metric.TagPrefix = *config.TagPrefix + } + + return metric +} diff --git a/plugins/inputs/jolokia2/point_builder.go b/plugins/inputs/jolokia2/point_builder.go new file mode 100644 index 000000000..02877ea70 --- /dev/null +++ b/plugins/inputs/jolokia2/point_builder.go @@ -0,0 +1,271 @@ +package jolokia2 + +import ( + "fmt" + "strings" +) + +type point struct { + Tags map[string]string + Fields map[string]interface{} +} + +type pointBuilder struct { + metric Metric + objectAttributes []string + objectPath string + substitutions []string +} + +func newPointBuilder(metric Metric, attributes []string, path string) *pointBuilder { + return &pointBuilder{ + metric: metric, + objectAttributes: attributes, + objectPath: path, + substitutions: makeSubstitutionList(metric.Mbean), + } +} + +// Build generates a point for a given mbean name/pattern and value object. +func (pb *pointBuilder) Build(mbean string, value interface{}) []point { + hasPattern := strings.Contains(mbean, "*") + if !hasPattern { + value = map[string]interface{}{mbean: value} + } + + valueMap, ok := value.(map[string]interface{}) + if !ok { // FIXME: log it and move on. + panic(fmt.Sprintf("There should be a map here for %s!\n", mbean)) + } + + points := make([]point, 0) + for mbean, value := range valueMap { + + points = append(points, point{ + Tags: pb.extractTags(mbean), + Fields: pb.extractFields(mbean, value), + }) + } + + return compactPoints(points) +} + +// extractTags generates the map of tags for a given mbean name/pattern. +func (pb *pointBuilder) extractTags(mbean string) map[string]string { + propertyMap := makePropertyMap(mbean) + tagMap := make(map[string]string) + + for key, value := range propertyMap { + if pb.includeTag(key) { + tagName := pb.formatTagName(key) + tagMap[tagName] = value + } + } + + return tagMap +} + +func (pb *pointBuilder) includeTag(tagName string) bool { + for _, t := range pb.metric.TagKeys { + if tagName == t { + return true + } + } + + return false +} + +func (pb *pointBuilder) formatTagName(tagName string) string { + if tagName == "" { + return "" + } + + if tagPrefix := pb.metric.TagPrefix; tagPrefix != "" { + return tagPrefix + tagName + } + + return tagName +} + +// extractFields generates the map of fields for a given mbean name +// and value object. +func (pb *pointBuilder) extractFields(mbean string, value interface{}) map[string]interface{} { + fieldMap := make(map[string]interface{}) + valueMap, ok := value.(map[string]interface{}) + + if ok { + // complex value + if len(pb.objectAttributes) == 0 { + // if there were no attributes requested, + // then the keys are attributes + pb.fillFields("", valueMap, fieldMap) + + } else if len(pb.objectAttributes) == 1 { + // if there was a single attribute requested, + // then the keys are the attribute's properties + fieldName := pb.formatFieldName(pb.objectAttributes[0], pb.objectPath) + pb.fillFields(fieldName, valueMap, fieldMap) + + } else { + // if there were multiple attributes requested, + // then the keys are the attribute names + for _, attribute := range pb.objectAttributes { + fieldName := pb.formatFieldName(attribute, pb.objectPath) + pb.fillFields(fieldName, valueMap[attribute], fieldMap) + } + } + } else { + // scalar value + var fieldName string + if len(pb.objectAttributes) == 0 { + fieldName = pb.formatFieldName(defaultFieldName, pb.objectPath) + } else { + fieldName = pb.formatFieldName(pb.objectAttributes[0], pb.objectPath) + } + + pb.fillFields(fieldName, value, fieldMap) + } + + if len(pb.substitutions) > 1 { + pb.applySubstitutions(mbean, fieldMap) + } + + return fieldMap +} + +// formatFieldName generates a field name from the supplied attribute and +// path. The return value has the configured FieldPrefix and FieldSuffix +// instructions applied. +func (pb *pointBuilder) formatFieldName(attribute, path string) string { + fieldName := attribute + fieldPrefix := pb.metric.FieldPrefix + fieldSeparator := pb.metric.FieldSeparator + + if fieldPrefix != "" { + fieldName = fieldPrefix + fieldName + } + + if path != "" { + fieldName = fieldName + fieldSeparator + strings.Replace(path, "/", fieldSeparator, -1) + } + + return fieldName +} + +// fillFields recurses into the supplied value object, generating a named field +// for every value it discovers. +func (pb *pointBuilder) fillFields(name string, value interface{}, fieldMap map[string]interface{}) { + if valueMap, ok := value.(map[string]interface{}); ok { + // keep going until we get to something that is not a map + for key, innerValue := range valueMap { + var innerName string + + if name == "" { + innerName = pb.metric.FieldPrefix + key + } else { + innerName = name + pb.metric.FieldSeparator + key + } + + pb.fillFields(innerName, innerValue, fieldMap) + } + + return + } + + if pb.metric.FieldName != "" { + name = pb.metric.FieldName + if prefix := pb.metric.FieldPrefix; prefix != "" { + name = prefix + name + } + } + + if name == "" { + name = defaultFieldName + } + + fieldMap[name] = value +} + +// applySubstitutions updates all the keys in the supplied map +// of fields to account for $1-style substitution instructions. +func (pb *pointBuilder) applySubstitutions(mbean string, fieldMap map[string]interface{}) { + properties := makePropertyMap(mbean) + + for i, subKey := range pb.substitutions[1:] { + + symbol := fmt.Sprintf("$%d", i+1) + substitution := properties[subKey] + + for fieldName, fieldValue := range fieldMap { + newFieldName := strings.Replace(fieldName, symbol, substitution, -1) + if fieldName != newFieldName { + fieldMap[newFieldName] = fieldValue + delete(fieldMap, fieldName) + } + } + } +} + +// makePropertyMap returns a the mbean property-key list as +// a dictionary. foo:x=y becomes map[string]string { "x": "y" } +func makePropertyMap(mbean string) map[string]string { + props := make(map[string]string) + object := strings.SplitN(mbean, ":", 2) + domain := object[0] + + if domain != "" && len(object) == 2 { + list := object[1] + + for _, keyProperty := range strings.Split(list, ",") { + pair := strings.SplitN(keyProperty, "=", 2) + + if len(pair) != 2 { + continue + } + + if key := pair[0]; key != "" { + props[key] = pair[1] + } + } + } + + return props +} + +// makeSubstitutionList returns an array of values to +// use as substitutions when renaming fields +// with the $1..$N syntax. The first item in the list +// is always the mbean domain. +func makeSubstitutionList(mbean string) []string { + subs := make([]string, 0) + + object := strings.SplitN(mbean, ":", 2) + domain := object[0] + + if domain != "" && len(object) == 2 { + subs = append(subs, domain) + list := object[1] + + for _, keyProperty := range strings.Split(list, ",") { + pair := strings.SplitN(keyProperty, "=", 2) + + if len(pair) != 2 { + continue + } + + key := pair[0] + if key == "" { + continue + } + + property := pair[1] + if !strings.Contains(property, "*") { + continue + } + + subs = append(subs, key) + } + } + + return subs +}