change jolokia input to use bulk requests (#2253)

This commit is contained in:
Patrick Hemmer 2017-04-18 16:00:41 -04:00 committed by Daniel Nelson
parent cc44150054
commit 0f5d49a7fd
3 changed files with 215 additions and 139 deletions

View File

@ -72,6 +72,7 @@ be deprecated eventually.
- [#1820](https://github.com/influxdata/telegraf/issues/1820): easier plugin testing without outputs - [#1820](https://github.com/influxdata/telegraf/issues/1820): easier plugin testing without outputs
- [#2493](https://github.com/influxdata/telegraf/pull/2493): Check signature in the GitHub webhook plugin - [#2493](https://github.com/influxdata/telegraf/pull/2493): Check signature in the GitHub webhook plugin
- [#2038](https://github.com/influxdata/telegraf/issues/2038): Add papertrail support to webhooks - [#2038](https://github.com/influxdata/telegraf/issues/2038): Add papertrail support to webhooks
- [#2253](https://github.com/influxdata/telegraf/pull/2253): Change jolokia plugin to use bulk requests.
### Bugfixes ### Bugfixes

View File

@ -3,7 +3,6 @@ package jolokia
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
@ -130,7 +129,7 @@ func (j *Jolokia) Description() string {
return "Read JMX metrics through Jolokia" return "Read JMX metrics through Jolokia"
} }
func (j *Jolokia) doRequest(req *http.Request) (map[string]interface{}, error) { func (j *Jolokia) doRequest(req *http.Request) ([]map[string]interface{}, error) {
resp, err := j.jClient.MakeRequest(req) resp, err := j.jClient.MakeRequest(req)
if err != nil { if err != nil {
return nil, err return nil, err
@ -155,27 +154,20 @@ func (j *Jolokia) doRequest(req *http.Request) (map[string]interface{}, error) {
} }
// Unmarshal json // Unmarshal json
var jsonOut map[string]interface{} var jsonOut []map[string]interface{}
if err = json.Unmarshal([]byte(body), &jsonOut); err != nil { if err = json.Unmarshal([]byte(body), &jsonOut); err != nil {
return nil, errors.New("Error decoding JSON response") return nil, fmt.Errorf("Error decoding JSON response: %s: %s", err, body)
}
if status, ok := jsonOut["status"]; ok {
if status != float64(200) {
return nil, fmt.Errorf("Not expected status value in response body: %3.f",
status)
}
} else {
return nil, fmt.Errorf("Missing status in response body")
} }
return jsonOut, nil return jsonOut, nil
} }
func (j *Jolokia) prepareRequest(server Server, metric Metric) (*http.Request, error) { func (j *Jolokia) prepareRequest(server Server, metrics []Metric) (*http.Request, error) {
var jolokiaUrl *url.URL var jolokiaUrl *url.URL
context := j.Context // Usually "/jolokia/" context := j.Context // Usually "/jolokia/"
var bulkBodyContent []map[string]interface{}
for _, metric := range metrics {
// Create bodyContent // Create bodyContent
bodyContent := map[string]interface{}{ bodyContent := map[string]interface{}{
"type": "read", "type": "read",
@ -233,7 +225,10 @@ func (j *Jolokia) prepareRequest(server Server, metric Metric) (*http.Request, e
jolokiaUrl = serverUrl jolokiaUrl = serverUrl
} }
requestBody, err := json.Marshal(bodyContent) bulkBodyContent = append(bulkBodyContent, bodyContent)
}
requestBody, err := json.Marshal(bulkBodyContent)
req, err := http.NewRequest("POST", jolokiaUrl.String(), bytes.NewBuffer(requestBody)) req, err := http.NewRequest("POST", jolokiaUrl.String(), bytes.NewBuffer(requestBody))
@ -276,25 +271,35 @@ func (j *Jolokia) Gather(acc telegraf.Accumulator) error {
tags["jolokia_host"] = server.Host tags["jolokia_host"] = server.Host
fields := make(map[string]interface{}) fields := make(map[string]interface{})
for _, metric := range metrics { req, err := j.prepareRequest(server, metrics)
measurement := metric.Name
req, err := j.prepareRequest(server, metric)
if err != nil { if err != nil {
return err acc.AddError(fmt.Errorf("unable to create request: %s", err))
continue
} }
out, err := j.doRequest(req) out, err := j.doRequest(req)
if err != nil { if err != nil {
fmt.Printf("Error handling response: %s\n", err) acc.AddError(fmt.Errorf("error performing request: %s", err))
} else { continue
if values, ok := out["value"]; ok {
j.extractValues(measurement, values, fields)
} else {
fmt.Printf("Missing key 'value' in output response\n")
} }
if len(out) != len(metrics) {
acc.AddError(fmt.Errorf("did not receive the correct number of metrics in response. expected %d, received %d", len(metrics), len(out)))
continue
}
for i, resp := range out {
if status, ok := resp["status"]; ok && status != float64(200) {
acc.AddError(fmt.Errorf("Not expected status value in response body (%s:%s mbean=\"%s\" attribute=\"%s\"): %3.f",
server.Host, server.Port, metrics[i].Mbean, metrics[i].Attribute, status))
continue
} else if !ok {
acc.AddError(fmt.Errorf("Missing status in response body"))
continue
}
if values, ok := resp["value"]; ok {
j.extractValues(metrics[i].Name, values, fields)
} else {
acc.AddError(fmt.Errorf("Missing key 'value' in output response\n"))
} }
} }

View File

@ -13,7 +13,8 @@ import (
) )
const validThreeLevelMultiValueJSON = ` const validThreeLevelMultiValueJSON = `
{ [
{
"request":{ "request":{
"mbean":"java.lang:type=*", "mbean":"java.lang:type=*",
"type":"read" "type":"read"
@ -41,10 +42,12 @@ const validThreeLevelMultiValueJSON = `
}, },
"timestamp":1446129191, "timestamp":1446129191,
"status":200 "status":200
}` }
]`
const validMultiValueJSON = ` const validBulkResponseJSON = `
{ [
{
"request":{ "request":{
"mbean":"java.lang:type=Memory", "mbean":"java.lang:type=Memory",
"attribute":"HeapMemoryUsage", "attribute":"HeapMemoryUsage",
@ -58,10 +61,46 @@ const validMultiValueJSON = `
}, },
"timestamp":1446129191, "timestamp":1446129191,
"status":200 "status":200
}` },
{
"request":{
"mbean":"java.lang:type=Memory",
"attribute":"NonHeapMemoryUsage",
"type":"read"
},
"value":{
"init":2555904,
"committed":51380224,
"max":-1,
"used":49944048
},
"timestamp":1446129191,
"status":200
}
]`
const validMultiValueJSON = `
[
{
"request":{
"mbean":"java.lang:type=Memory",
"attribute":"HeapMemoryUsage",
"type":"read"
},
"value":{
"init":67108864,
"committed":456130560,
"max":477626368,
"used":203288528
},
"timestamp":1446129191,
"status":200
}
]`
const validSingleValueJSON = ` const validSingleValueJSON = `
{ [
{
"request":{ "request":{
"path":"used", "path":"used",
"mbean":"java.lang:type=Memory", "mbean":"java.lang:type=Memory",
@ -71,7 +110,8 @@ const validSingleValueJSON = `
"value":209274376, "value":209274376,
"timestamp":1446129256, "timestamp":1446129256,
"status":200 "status":200
}` }
]`
const invalidJSON = "I don't think this is JSON" const invalidJSON = "I don't think this is JSON"
@ -82,6 +122,8 @@ var HeapMetric = Metric{Name: "heap_memory_usage",
Mbean: "java.lang:type=Memory", Attribute: "HeapMemoryUsage"} Mbean: "java.lang:type=Memory", Attribute: "HeapMemoryUsage"}
var UsedHeapMetric = Metric{Name: "heap_memory_usage", var UsedHeapMetric = Metric{Name: "heap_memory_usage",
Mbean: "java.lang:type=Memory", Attribute: "HeapMemoryUsage"} Mbean: "java.lang:type=Memory", Attribute: "HeapMemoryUsage"}
var NonHeapMetric = Metric{Name: "non_heap_memory_usage",
Mbean: "java.lang:type=Memory", Attribute: "NonHeapMemoryUsage"}
type jolokiaClientStub struct { type jolokiaClientStub struct {
responseBody string responseBody string
@ -135,6 +177,34 @@ func TestHttpJsonMultiValue(t *testing.T) {
acc.AssertContainsTaggedFields(t, "jolokia", fields, tags) acc.AssertContainsTaggedFields(t, "jolokia", fields, tags)
} }
// Test that bulk responses are handled
func TestHttpJsonBulkResponse(t *testing.T) {
jolokia := genJolokiaClientStub(validBulkResponseJSON, 200, Servers, []Metric{HeapMetric, NonHeapMetric})
var acc testutil.Accumulator
err := jolokia.Gather(&acc)
assert.Nil(t, err)
assert.Equal(t, 1, len(acc.Metrics))
fields := map[string]interface{}{
"heap_memory_usage_init": 67108864.0,
"heap_memory_usage_committed": 456130560.0,
"heap_memory_usage_max": 477626368.0,
"heap_memory_usage_used": 203288528.0,
"non_heap_memory_usage_init": 2555904.0,
"non_heap_memory_usage_committed": 51380224.0,
"non_heap_memory_usage_max": -1.0,
"non_heap_memory_usage_used": 49944048.0,
}
tags := map[string]string{
"jolokia_host": "127.0.0.1",
"jolokia_port": "8080",
"jolokia_name": "as1",
}
acc.AssertContainsTaggedFields(t, "jolokia", fields, tags)
}
// Test that the proper values are ignored or collected // Test that the proper values are ignored or collected
func TestHttpJsonThreeLevelMultiValue(t *testing.T) { func TestHttpJsonThreeLevelMultiValue(t *testing.T) {
jolokia := genJolokiaClientStub(validThreeLevelMultiValueJSON, 200, Servers, []Metric{HeapMetric}) jolokia := genJolokiaClientStub(validThreeLevelMultiValueJSON, 200, Servers, []Metric{HeapMetric})