diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c7b7c2fd..7437e4ad5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -72,6 +72,7 @@ be deprecated eventually. - [#1820](https://github.com/influxdata/telegraf/issues/1820): easier plugin testing without outputs - [#2493](https://github.com/influxdata/telegraf/pull/2493): Check signature in the GitHub webhook plugin - [#2038](https://github.com/influxdata/telegraf/issues/2038): Add papertrail support to webhooks +- [#2253](https://github.com/influxdata/telegraf/pull/2253): Change jolokia plugin to use bulk requests. ### Bugfixes diff --git a/plugins/inputs/jolokia/jolokia.go b/plugins/inputs/jolokia/jolokia.go index 7f371c935..0a9122b87 100644 --- a/plugins/inputs/jolokia/jolokia.go +++ b/plugins/inputs/jolokia/jolokia.go @@ -3,7 +3,6 @@ package jolokia import ( "bytes" "encoding/json" - "errors" "fmt" "io/ioutil" "net/http" @@ -130,7 +129,7 @@ func (j *Jolokia) Description() string { return "Read JMX metrics through Jolokia" } -func (j *Jolokia) doRequest(req *http.Request) (map[string]interface{}, error) { +func (j *Jolokia) doRequest(req *http.Request) ([]map[string]interface{}, error) { resp, err := j.jClient.MakeRequest(req) if err != nil { return nil, err @@ -155,85 +154,81 @@ func (j *Jolokia) doRequest(req *http.Request) (map[string]interface{}, error) { } // Unmarshal json - var jsonOut map[string]interface{} + var jsonOut []map[string]interface{} if err = json.Unmarshal([]byte(body), &jsonOut); err != nil { - return nil, errors.New("Error decoding JSON response") - } - - if status, ok := jsonOut["status"]; ok { - if status != float64(200) { - return nil, fmt.Errorf("Not expected status value in response body: %3.f", - status) - } - } else { - return nil, fmt.Errorf("Missing status in response body") + return nil, fmt.Errorf("Error decoding JSON response: %s: %s", err, body) } return jsonOut, nil } -func (j *Jolokia) prepareRequest(server Server, metric Metric) (*http.Request, error) { +func (j *Jolokia) prepareRequest(server Server, metrics []Metric) (*http.Request, error) { var jolokiaUrl *url.URL context := j.Context // Usually "/jolokia/" - // Create bodyContent - bodyContent := map[string]interface{}{ - "type": "read", - "mbean": metric.Mbean, + var bulkBodyContent []map[string]interface{} + for _, metric := range metrics { + // Create bodyContent + bodyContent := map[string]interface{}{ + "type": "read", + "mbean": metric.Mbean, + } + + if metric.Attribute != "" { + bodyContent["attribute"] = metric.Attribute + if metric.Path != "" { + bodyContent["path"] = metric.Path + } + } + + // Add target, only in proxy mode + if j.Mode == "proxy" { + serviceUrl := fmt.Sprintf("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi", + server.Host, server.Port) + + target := map[string]string{ + "url": serviceUrl, + } + + if server.Username != "" { + target["user"] = server.Username + } + + if server.Password != "" { + target["password"] = server.Password + } + + bodyContent["target"] = target + + proxy := j.Proxy + + // Prepare ProxyURL + proxyUrl, err := url.Parse("http://" + proxy.Host + ":" + proxy.Port + context) + if err != nil { + return nil, err + } + if proxy.Username != "" || proxy.Password != "" { + proxyUrl.User = url.UserPassword(proxy.Username, proxy.Password) + } + + jolokiaUrl = proxyUrl + + } else { + serverUrl, err := url.Parse("http://" + server.Host + ":" + server.Port + context) + if err != nil { + return nil, err + } + if server.Username != "" || server.Password != "" { + serverUrl.User = url.UserPassword(server.Username, server.Password) + } + + jolokiaUrl = serverUrl + } + + bulkBodyContent = append(bulkBodyContent, bodyContent) } - if metric.Attribute != "" { - bodyContent["attribute"] = metric.Attribute - if metric.Path != "" { - bodyContent["path"] = metric.Path - } - } - - // Add target, only in proxy mode - if j.Mode == "proxy" { - serviceUrl := fmt.Sprintf("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi", - server.Host, server.Port) - - target := map[string]string{ - "url": serviceUrl, - } - - if server.Username != "" { - target["user"] = server.Username - } - - if server.Password != "" { - target["password"] = server.Password - } - - bodyContent["target"] = target - - proxy := j.Proxy - - // Prepare ProxyURL - proxyUrl, err := url.Parse("http://" + proxy.Host + ":" + proxy.Port + context) - if err != nil { - return nil, err - } - if proxy.Username != "" || proxy.Password != "" { - proxyUrl.User = url.UserPassword(proxy.Username, proxy.Password) - } - - jolokiaUrl = proxyUrl - - } else { - serverUrl, err := url.Parse("http://" + server.Host + ":" + server.Port + context) - if err != nil { - return nil, err - } - if server.Username != "" || server.Password != "" { - serverUrl.User = url.UserPassword(server.Username, server.Password) - } - - jolokiaUrl = serverUrl - } - - requestBody, err := json.Marshal(bodyContent) + requestBody, err := json.Marshal(bulkBodyContent) req, err := http.NewRequest("POST", jolokiaUrl.String(), bytes.NewBuffer(requestBody)) @@ -276,25 +271,35 @@ func (j *Jolokia) Gather(acc telegraf.Accumulator) error { tags["jolokia_host"] = server.Host fields := make(map[string]interface{}) - for _, metric := range metrics { - measurement := metric.Name + req, err := j.prepareRequest(server, metrics) + if err != nil { + acc.AddError(fmt.Errorf("unable to create request: %s", err)) + continue + } + out, err := j.doRequest(req) + if err != nil { + acc.AddError(fmt.Errorf("error performing request: %s", err)) + continue + } - req, err := j.prepareRequest(server, metric) - if err != nil { - return err + if len(out) != len(metrics) { + acc.AddError(fmt.Errorf("did not receive the correct number of metrics in response. expected %d, received %d", len(metrics), len(out))) + continue + } + for i, resp := range out { + if status, ok := resp["status"]; ok && status != float64(200) { + acc.AddError(fmt.Errorf("Not expected status value in response body (%s:%s mbean=\"%s\" attribute=\"%s\"): %3.f", + server.Host, server.Port, metrics[i].Mbean, metrics[i].Attribute, status)) + continue + } else if !ok { + acc.AddError(fmt.Errorf("Missing status in response body")) + continue } - out, err := j.doRequest(req) - - if err != nil { - fmt.Printf("Error handling response: %s\n", err) + if values, ok := resp["value"]; ok { + j.extractValues(metrics[i].Name, values, fields) } else { - if values, ok := out["value"]; ok { - j.extractValues(measurement, values, fields) - } else { - fmt.Printf("Missing key 'value' in output response\n") - } - + acc.AddError(fmt.Errorf("Missing key 'value' in output response\n")) } } diff --git a/plugins/inputs/jolokia/jolokia_test.go b/plugins/inputs/jolokia/jolokia_test.go index 3c4fc2561..cf415f36f 100644 --- a/plugins/inputs/jolokia/jolokia_test.go +++ b/plugins/inputs/jolokia/jolokia_test.go @@ -13,65 +13,105 @@ import ( ) const validThreeLevelMultiValueJSON = ` -{ - "request":{ - "mbean":"java.lang:type=*", - "type":"read" +[ + { + "request":{ + "mbean":"java.lang:type=*", + "type":"read" + }, + "value":{ + "java.lang:type=Memory":{ + "ObjectPendingFinalizationCount":0, + "Verbose":false, + "HeapMemoryUsage":{ + "init":134217728, + "committed":173015040, + "max":1908932608, + "used":16840016 + }, + "NonHeapMemoryUsage":{ + "init":2555904, + "committed":51380224, + "max":-1, + "used":49944048 + }, + "ObjectName":{ + "objectName":"java.lang:type=Memory" + } + } + }, + "timestamp":1446129191, + "status":200 + } +]` + +const validBulkResponseJSON = ` +[ + { + "request":{ + "mbean":"java.lang:type=Memory", + "attribute":"HeapMemoryUsage", + "type":"read" + }, + "value":{ + "init":67108864, + "committed":456130560, + "max":477626368, + "used":203288528 + }, + "timestamp":1446129191, + "status":200 }, - "value":{ - "java.lang:type=Memory":{ - "ObjectPendingFinalizationCount":0, - "Verbose":false, - "HeapMemoryUsage":{ - "init":134217728, - "committed":173015040, - "max":1908932608, - "used":16840016 - }, - "NonHeapMemoryUsage":{ - "init":2555904, - "committed":51380224, - "max":-1, - "used":49944048 - }, - "ObjectName":{ - "objectName":"java.lang:type=Memory" - } - } - }, - "timestamp":1446129191, - "status":200 -}` + { + "request":{ + "mbean":"java.lang:type=Memory", + "attribute":"NonHeapMemoryUsage", + "type":"read" + }, + "value":{ + "init":2555904, + "committed":51380224, + "max":-1, + "used":49944048 + }, + "timestamp":1446129191, + "status":200 + } +]` const validMultiValueJSON = ` -{ - "request":{ - "mbean":"java.lang:type=Memory", - "attribute":"HeapMemoryUsage", - "type":"read" - }, - "value":{ - "init":67108864, - "committed":456130560, - "max":477626368, - "used":203288528 - }, - "timestamp":1446129191, - "status":200 -}` +[ + { + "request":{ + "mbean":"java.lang:type=Memory", + "attribute":"HeapMemoryUsage", + "type":"read" + }, + "value":{ + "init":67108864, + "committed":456130560, + "max":477626368, + "used":203288528 + }, + "timestamp":1446129191, + "status":200 + } +]` const validSingleValueJSON = ` -{ - "request":{ - "path":"used", - "mbean":"java.lang:type=Memory", - "attribute":"HeapMemoryUsage", - "type":"read" - }, - "value":209274376, - "timestamp":1446129256, - "status":200 -}` +[ + { + "request":{ + "path":"used", + "mbean":"java.lang:type=Memory", + "attribute":"HeapMemoryUsage", + "type":"read" + }, + "value":209274376, + "timestamp":1446129256, + "status":200 + } +]` const invalidJSON = "I don't think this is JSON" @@ -82,6 +122,8 @@ var HeapMetric = Metric{Name: "heap_memory_usage", Mbean: "java.lang:type=Memory", Attribute: "HeapMemoryUsage"} var UsedHeapMetric = Metric{Name: "heap_memory_usage", Mbean: "java.lang:type=Memory", Attribute: "HeapMemoryUsage"} +var NonHeapMetric = Metric{Name: "non_heap_memory_usage", + Mbean: "java.lang:type=Memory", Attribute: "NonHeapMemoryUsage"} type jolokiaClientStub struct { responseBody string @@ -135,6 +177,34 @@ func TestHttpJsonMultiValue(t *testing.T) { acc.AssertContainsTaggedFields(t, "jolokia", fields, tags) } +// Test that bulk responses are handled +func TestHttpJsonBulkResponse(t *testing.T) { + jolokia := genJolokiaClientStub(validBulkResponseJSON, 200, Servers, []Metric{HeapMetric, NonHeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Metrics)) + + fields := map[string]interface{}{ + "heap_memory_usage_init": 67108864.0, + "heap_memory_usage_committed": 456130560.0, + "heap_memory_usage_max": 477626368.0, + "heap_memory_usage_used": 203288528.0, + "non_heap_memory_usage_init": 2555904.0, + "non_heap_memory_usage_committed": 51380224.0, + "non_heap_memory_usage_max": -1.0, + "non_heap_memory_usage_used": 49944048.0, + } + tags := map[string]string{ + "jolokia_host": "127.0.0.1", + "jolokia_port": "8080", + "jolokia_name": "as1", + } + acc.AssertContainsTaggedFields(t, "jolokia", fields, tags) +} + // Test that the proper values are ignored or collected func TestHttpJsonThreeLevelMultiValue(t *testing.T) { jolokia := genJolokiaClientStub(validThreeLevelMultiValueJSON, 200, Servers, []Metric{HeapMetric})