From dc160b307eafdb5db77f8337934df26ff695952e Mon Sep 17 00:00:00 2001 From: Simone Aiello Date: Thu, 14 Apr 2016 23:00:41 +0200 Subject: [PATCH] jolokia: add proxy mode --- plugins/inputs/jolokia/README.md | 32 ++-- plugins/inputs/jolokia/jolokia.go | 222 +++++++++++++++++++------ plugins/inputs/jolokia/jolokia_test.go | 21 ++- 3 files changed, 212 insertions(+), 63 deletions(-) diff --git a/plugins/inputs/jolokia/README.md b/plugins/inputs/jolokia/README.md index 5c7db6230..cec3c95ce 100644 --- a/plugins/inputs/jolokia/README.md +++ b/plugins/inputs/jolokia/README.md @@ -5,13 +5,22 @@ ```toml [[inputs.jolokia]] ## This is the context root used to compose the jolokia url - context = "/jolokia/read" + context = "/jolokia" - ## List of servers exposing jolokia read service + # This specifies the mode used + # mode = "proxy" + # + # When in proxy mode this section is used to specify further proxy address configurations. + # Remember to change servers addresses + # [inputs.jolokia.proxy] + # host = "127.0.0.1" + # port = "8080" + + # List of servers exposing jolokia read service [[inputs.jolokia.servers]] - name = "stable" - host = "192.168.103.2" - port = "8180" + name = "as-server-01" + host = "127.0.0.1" + port = "8080" # username = "myuser" # password = "mypassword" @@ -21,17 +30,20 @@ ## This collect all heap memory usage metrics. [[inputs.jolokia.metrics]] name = "heap_memory_usage" - jmx = "/java.lang:type=Memory/HeapMemoryUsage" - + mbean = "java.lang:type=Memory" + attribute = "HeapMemoryUsage" + ## This collect thread counts metrics. [[inputs.jolokia.metrics]] name = "thread_count" - jmx = "/java.lang:type=Threading/TotalStartedThreadCount,ThreadCount,DaemonThreadCount,PeakThreadCount" - + mbean = "java.lang:type=Threading" + attribute = "TotalStartedThreadCount,ThreadCount,DaemonThreadCount,PeakThreadCount" + ## This collect number of class loaded/unloaded counts metrics. [[inputs.jolokia.metrics]] name = "class_count" - jmx = "/java.lang:type=ClassLoading/LoadedClassCount,UnloadedClassCount,TotalLoadedClassCount" + mbean = "java.lang:type=ClassLoading" + attribute = "LoadedClassCount,UnloadedClassCount,TotalLoadedClassCount" ``` #### Description diff --git a/plugins/inputs/jolokia/jolokia.go b/plugins/inputs/jolokia/jolokia.go index 15a01d5de..64835366e 100644 --- a/plugins/inputs/jolokia/jolokia.go +++ b/plugins/inputs/jolokia/jolokia.go @@ -8,6 +8,7 @@ import ( "net/http" "net/url" "time" + "bytes" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" @@ -23,7 +24,9 @@ type Server struct { type Metric struct { Name string - Jmx string + Mbean string + Attribute string + Path string } type JolokiaClient interface { @@ -41,20 +44,32 @@ func (c JolokiaClientImpl) MakeRequest(req *http.Request) (*http.Response, error type Jolokia struct { jClient JolokiaClient Context string + Mode string Servers []Server Metrics []Metric + Proxy Server } func (j *Jolokia) SampleConfig() string { return ` - ## This is the context root used to compose the jolokia url - context = "/jolokia/read" + # This is the context root used to compose the jolokia url + context = "/jolokia" - ## List of servers exposing jolokia read service + # This specifies the mode used + # mode = "proxy" + # + # When in proxy mode this section is used to specify further proxy address configurations. + # Remember to change servers addresses + # [inputs.jolokia.proxy] + # host = "127.0.0.1" + # port = "8080" + + + # List of servers exposing jolokia read service [[inputs.jolokia.servers]] - name = "stable" - host = "192.168.103.2" - port = "8180" + name = "as-server-01" + host = "127.0.0.1" + port = "8080" # username = "myuser" # password = "mypassword" @@ -64,17 +79,20 @@ func (j *Jolokia) SampleConfig() string { ## This collect all heap memory usage metrics. [[inputs.jolokia.metrics]] name = "heap_memory_usage" - jmx = "/java.lang:type=Memory/HeapMemoryUsage" - + mbean = "java.lang:type=Memory" + attribute = "HeapMemoryUsage" + ## This collect thread counts metrics. [[inputs.jolokia.metrics]] name = "thread_count" - jmx = "/java.lang:type=Threading/TotalStartedThreadCount,ThreadCount,DaemonThreadCount,PeakThreadCount" - + mbean = "java.lang:type=Threading" + attribute = "TotalStartedThreadCount,ThreadCount,DaemonThreadCount,PeakThreadCount" + ## This collect number of class loaded/unloaded counts metrics. [[inputs.jolokia.metrics]] name = "class_count" - jmx = "/java.lang:type=ClassLoading/LoadedClassCount,UnloadedClassCount,TotalLoadedClassCount" + mbean = "java.lang:type=ClassLoading" + attribute = "LoadedClassCount,UnloadedClassCount,TotalLoadedClassCount" ` } @@ -82,12 +100,7 @@ func (j *Jolokia) Description() string { return "Read JMX metrics through Jolokia" } -func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) { - // Create + send request - req, err := http.NewRequest("GET", requestUrl.String(), nil) - if err != nil { - return nil, err - } +func (j *Jolokia) doRequest(req *http.Request) (map[string]interface{}, error) { resp, err := j.jClient.MakeRequest(req) if err != nil { @@ -98,7 +111,7 @@ func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) { // Process response if resp.StatusCode != http.StatusOK { err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)", - requestUrl, + req.RequestURI, resp.StatusCode, http.StatusText(resp.StatusCode), http.StatusOK, @@ -118,52 +131,159 @@ func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) { return nil, errors.New("Error decoding JSON response") } + if status, ok := jsonOut["status"]; ok { + if status != float64(200) { + return nil, fmt.Errorf("Not expected status value in response body: %3.f", status) + } + } else { + return nil, fmt.Errorf("Missing status in response body") + } + return jsonOut, nil } +func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) { + // Create + send request + req, err := http.NewRequest("GET", requestUrl.String(), nil) + if err != nil { + return nil, err + } + + return j.doRequest(req) +} + + +func (j *Jolokia) collectMeasurement(measurement string, out map[string]interface{}, fields map[string]interface{}) { + + if values, ok := out["value"]; ok { + switch t := values.(type) { + case map[string]interface{}: + for k, v := range t { + fields[measurement+"_"+k] = v + } + case interface{}: + fields[measurement] = t + } + } else { + fmt.Printf("Missing key 'value' in output response\n") + } + +} + + func (j *Jolokia) Gather(acc telegraf.Accumulator) error { - context := j.Context //"/jolokia/read" + context := j.Context // Usually "/jolokia" servers := j.Servers metrics := j.Metrics tags := make(map[string]string) + mode := j.Mode - for _, server := range servers { - tags["server"] = server.Name - tags["port"] = server.Port - tags["host"] = server.Host - fields := make(map[string]interface{}) - for _, metric := range metrics { + if( mode == "agent" || mode == ""){ - measurement := metric.Name - jmxPath := metric.Jmx + for _, server := range servers { + tags["server"] = server.Name + tags["port"] = server.Port + tags["host"] = server.Host + fields := make(map[string]interface{}) + for _, metric := range metrics { + + measurement := metric.Name + jmxPath := "/" + metric.Mbean + if metric.Attribute != "" { + jmxPath = jmxPath + "/" + metric.Attribute + + if metric.Path != "" { + jmxPath = jmxPath + "/" + metric.Path + } + } // Prepare URL - requestUrl, err := url.Parse("http://" + server.Host + ":" + - server.Port + context + jmxPath) - if err != nil { - return err - } - if server.Username != "" || server.Password != "" { - requestUrl.User = url.UserPassword(server.Username, server.Password) - } - - out, _ := j.getAttr(requestUrl) - - if values, ok := out["value"]; ok { - switch t := values.(type) { - case map[string]interface{}: - for k, v := range t { - fields[measurement+"_"+k] = v - } - case interface{}: - fields[measurement] = t + requestUrl, err := url.Parse("http://" + server.Host + ":" + + server.Port + context + "/read" + jmxPath) + if err != nil { + return err } - } else { - fmt.Printf("Missing key 'value' in '%s' output response\n", - requestUrl.String()) + if server.Username != "" || server.Password != "" { + requestUrl.User = url.UserPassword(server.Username, server.Password) + } + out, _ := j.getAttr(requestUrl) + j.collectMeasurement(measurement, out, fields) } + acc.AddFields("jolokia", fields, tags) } - acc.AddFields("jolokia", fields, tags) + + } else if ( mode == "proxy") { + + proxy := j.Proxy + + // Prepare ProxyURL + proxyURL, err := url.Parse("http://" + proxy.Host + ":" + + proxy.Port + context) + if err != nil { + return err + } + if proxy.Username != "" || proxy.Password != "" { + proxyURL.User = url.UserPassword(proxy.Username, proxy.Password) + } + + for _, server := range servers { + tags["server"] = server.Name + tags["port"] = server.Port + tags["host"] = server.Host + fields := make(map[string]interface{}) + for _, metric := range metrics { + + measurement := metric.Name + // Prepare URL + serviceUrl := fmt.Sprintf("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi", server.Host, server.Port) + + target := map[string]string{ + "url": serviceUrl, + } + + if server.Username != "" { + target["user"] = server.Username + } + + if server.Password != "" { + target["password"] = server.Password + } + + // Create + send request + bodyContent := map[string]interface{}{ + "type": "read", + "mbean": metric.Mbean, + "target": target, + } + + if metric.Attribute != "" { + bodyContent["attribute"] = metric.Attribute + if metric.Path != "" { + bodyContent["path"] = metric.Path + } + } + + requestBody, err := json.Marshal(bodyContent) + + req, err := http.NewRequest("POST", proxyURL.String(), bytes.NewBuffer(requestBody)) + + if err != nil { + return err + } + + req.Header.Add("Content-type", "application/json") + + out, err := j.doRequest(req) + + if err != nil { + fmt.Printf("Error handling response: %s\n", err) + }else { + j.collectMeasurement(measurement, out, fields) + } + } + acc.AddFields("jolokia", fields, tags) + } + } return nil diff --git a/plugins/inputs/jolokia/jolokia_test.go b/plugins/inputs/jolokia/jolokia_test.go index 961ba7055..eb8fb12da 100644 --- a/plugins/inputs/jolokia/jolokia_test.go +++ b/plugins/inputs/jolokia/jolokia_test.go @@ -47,8 +47,10 @@ const invalidJSON = "I don't think this is JSON" const empty = "" var Servers = []Server{Server{Name: "as1", Host: "127.0.0.1", Port: "8080"}} -var HeapMetric = Metric{Name: "heap_memory_usage", Jmx: "/java.lang:type=Memory/HeapMemoryUsage"} -var UsedHeapMetric = Metric{Name: "heap_memory_usage", Jmx: "/java.lang:type=Memory/HeapMemoryUsage"} +var HeapMetric = Metric{Name: "heap_memory_usage", + Mbean: "java.lang:type=Memory", Attribute: "HeapMemoryUsage" } +var UsedHeapMetric = Metric{Name: "heap_memory_usage", + Mbean: "java.lang:type=Memory", Attribute: "HeapMemoryUsage"} type jolokiaClientStub struct { responseBody string @@ -114,3 +116,18 @@ func TestHttpJsonOn404(t *testing.T) { assert.Nil(t, err) assert.Equal(t, 0, len(acc.Metrics)) } + + +// Test that the proper values are ignored or collected +func TestHttpInvalidJson(t *testing.T) { + + jolokia := genJolokiaClientStub(invalidJSON, 200, Servers, + []Metric{UsedHeapMetric}) + + var acc testutil.Accumulator + acc.SetDebug(true) + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 0, len(acc.Metrics)) +}