From d90e081f861b7711bde31e69b6d550b088d460e9 Mon Sep 17 00:00:00 2001 From: Simone Aiello Date: Wed, 28 Oct 2015 09:13:22 +0100 Subject: [PATCH 1/9] Jolokia plugin first commit --- plugins/all/all.go | 1 + plugins/jolokia/jolokia.go | 189 +++++++++++++++++++++++++++++++++++++ 2 files changed, 190 insertions(+) create mode 100644 plugins/jolokia/jolokia.go diff --git a/plugins/all/all.go b/plugins/all/all.go index f29a4987b..d714dd81e 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -9,6 +9,7 @@ import ( _ "github.com/influxdb/telegraf/plugins/exec" _ "github.com/influxdb/telegraf/plugins/haproxy" _ "github.com/influxdb/telegraf/plugins/httpjson" + _ "github.com/influxdb/telegraf/plugins/jolokia" _ "github.com/influxdb/telegraf/plugins/kafka_consumer" _ "github.com/influxdb/telegraf/plugins/leofs" _ "github.com/influxdb/telegraf/plugins/lustre2" diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go new file mode 100644 index 000000000..ee43246da --- /dev/null +++ b/plugins/jolokia/jolokia.go @@ -0,0 +1,189 @@ +package jolokia + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + // "net/url" + "strings" + // "sync" + + "github.com/influxdb/telegraf/plugins" +) + + +type Server struct { + Name string + Host string + Port string +} + +type Metric struct { + Name string + Jmx string + Pass []string + Drop []string +} + +type Jolokia struct { + + Context string + Servers []Server + Metrics []Metric + +} + + +func (j *Jolokia) SampleConfig() string { + return `[jolokia] + context = "/jolokia/read" + + [[jolokia.servers]] + name = "stable" + host = "192.168.103.2" + port = "8180" + + [[jolokia.metrics]] + name = "heap_memory_usage" + jmx = "/java.lang:type=Memory/HeapMemoryUsage" + pass = ["used"] + + [[jolokia.metrics]] + name = "memory_eden" + jmx = "/java.lang:type=MemoryPool,name=PS Eden Space/Usage" + pass = ["used"] + + [[jolokia.metrics]] + name = "heap_threads" + jmx = "/java.lang:type=Threading" + # drop = ["AllThread"] + pass = ["CurrentThreadCpuTime","CurrentThreadUserTime","DaemonThreadCount","ThreadCount","TotalStartedThreadCount"] +` +} + +func (j *Jolokia) Description() string { + return "Read JMX metrics through Jolokia" +} + + + +func getAttr(url string) (map[string]interface{}, error) { + //make request + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + // Process response + if resp.StatusCode != http.StatusOK { + err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)", + url, + resp.StatusCode, + http.StatusText(resp.StatusCode), + http.StatusOK, + http.StatusText(http.StatusOK)) + return nil, err + } + + // read body + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + // Unmarshal json + var jsonOut map[string]interface{} + if err = json.Unmarshal([]byte(body), &jsonOut); err != nil { + return nil, errors.New("Error decoding JSON response") + } + + return jsonOut, nil +} + +func (m *Metric) shouldPass(field string) bool { + + if m.Pass != nil { + + for _, pass := range m.Pass{ + if strings.HasPrefix(field, pass) { + return true + } + } + + return false + } + + if m.Drop != nil { + + for _, drop := range m.Drop{ + if strings.HasPrefix(field, drop) { + return false + } + } + + return true + } + + return true +} + +func (m *Metric) filterFields(fields map[string]interface{}) map[string]interface{} { + + for field, _ := range fields{ + if !m.shouldPass(field) { + delete(fields, field) + } + } + + return fields +} + + +func (j *Jolokia) Gather(acc plugins.Accumulator) error { + + context := j.Context //"/jolokia/read" + servers := j.Servers + metrics := j.Metrics + + var tags = map[string]string{ + "group": "application_server", + } + + for _, server := range servers { + for _, metric := range metrics { + + measurement := metric.Name + jmxPath := metric.Jmx + + tags["server"] = server.Name + tags["port"] = server.Port + tags["host"] = server.Host + + url := "http://" + server.Host + ":" + server.Port + context + jmxPath + //fmt.Println(url) + out, _ := getAttr(url) + + if values, ok := out["value"]; ok { + switch values.(type) { + case map[string]interface{}: + acc.AddFields(measurement, metric.filterFields(values.(map[string]interface{})), tags) + case interface{}: + acc.Add(measurement, values.(interface{}), tags) + } + }else{ + fmt.Println("Missing key value") + } + } + } + + return nil +} + +func init() { + plugins.Add("jolokia", func() plugins.Plugin { + return &Jolokia{} + }) +} From 9f1c88567863e518ec07e36e30366725877a6ca2 Mon Sep 17 00:00:00 2001 From: saiello Date: Wed, 28 Oct 2015 11:36:03 +0100 Subject: [PATCH 2/9] Added Tags as toml field --- plugins/jolokia/jolokia.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go index ee43246da..4dbd2fea0 100644 --- a/plugins/jolokia/jolokia.go +++ b/plugins/jolokia/jolokia.go @@ -32,7 +32,7 @@ type Jolokia struct { Context string Servers []Server Metrics []Metric - + Tags map[string]string } @@ -40,6 +40,9 @@ func (j *Jolokia) SampleConfig() string { return `[jolokia] context = "/jolokia/read" + [[jolokia.tags]] + group = "as" + [[jolokia.servers]] name = "stable" host = "192.168.103.2" @@ -147,10 +150,12 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error { context := j.Context //"/jolokia/read" servers := j.Servers metrics := j.Metrics + tags := j.Tags + + if tags == nil{ + tags = map[string]string{} + } - var tags = map[string]string{ - "group": "application_server", - } for _, server := range servers { for _, metric := range metrics { @@ -174,7 +179,7 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error { acc.Add(measurement, values.(interface{}), tags) } }else{ - fmt.Println("Missing key value") + fmt.Printf("Missing key 'value' in '%s' output response\n", url) } } } From a313750bd65498be2f90588e70a4677117b8c088 Mon Sep 17 00:00:00 2001 From: saiello Date: Thu, 29 Oct 2015 13:25:16 +0100 Subject: [PATCH 3/9] Use url.Parse to validate configuration params --- plugins/jolokia/jolokia.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go index 4dbd2fea0..7ac73f02d 100644 --- a/plugins/jolokia/jolokia.go +++ b/plugins/jolokia/jolokia.go @@ -6,7 +6,7 @@ import ( "fmt" "io/ioutil" "net/http" - // "net/url" + "net/url" "strings" // "sync" @@ -72,9 +72,9 @@ func (j *Jolokia) Description() string { -func getAttr(url string) (map[string]interface{}, error) { +func getAttr(requestUrl *url.URL) (map[string]interface{}, error) { //make request - resp, err := http.Get(url) + resp, err := http.Get(requestUrl.String()) if err != nil { return nil, err } @@ -83,7 +83,7 @@ func getAttr(url string) (map[string]interface{}, error) { // Process response if resp.StatusCode != http.StatusOK { err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)", - url, + requestUrl, resp.StatusCode, http.StatusText(resp.StatusCode), http.StatusOK, @@ -167,9 +167,13 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error { tags["port"] = server.Port tags["host"] = server.Host - url := "http://" + server.Host + ":" + server.Port + context + jmxPath - //fmt.Println(url) - out, _ := getAttr(url) + // Prepare URL + requestUrl, err := url.Parse("http://" + server.Host + ":" + server.Port + context + jmxPath) + if err != nil { + return err + } + + out, _ := getAttr(requestUrl) if values, ok := out["value"]; ok { switch values.(type) { @@ -179,7 +183,7 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error { acc.Add(measurement, values.(interface{}), tags) } }else{ - fmt.Printf("Missing key 'value' in '%s' output response\n", url) + fmt.Printf("Missing key 'value' in '%s' output response\n", requestUrl.String()) } } } From f765f4c018df1f94fd0f682453ece662f3c90731 Mon Sep 17 00:00:00 2001 From: saiello Date: Thu, 29 Oct 2015 14:48:39 +0100 Subject: [PATCH 4/9] go fmt run over jolokia.go --- plugins/jolokia/jolokia.go | 149 ++++++++++++++++++------------------- 1 file changed, 71 insertions(+), 78 deletions(-) diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go index 7ac73f02d..f33979b91 100644 --- a/plugins/jolokia/jolokia.go +++ b/plugins/jolokia/jolokia.go @@ -13,29 +13,26 @@ import ( "github.com/influxdb/telegraf/plugins" ) - type Server struct { - Name string - Host string - Port string + Name string + Host string + Port string } type Metric struct { - Name string - Jmx string - Pass []string - Drop []string + Name string + Jmx string + Pass []string + Drop []string } type Jolokia struct { - - Context string - Servers []Server - Metrics []Metric - Tags map[string]string + Context string + Servers []Server + Metrics []Metric + Tags map[string]string } - func (j *Jolokia) SampleConfig() string { return `[jolokia] context = "/jolokia/read" @@ -70,100 +67,96 @@ func (j *Jolokia) Description() string { return "Read JMX metrics through Jolokia" } - - func getAttr(requestUrl *url.URL) (map[string]interface{}, error) { - //make request + //make request resp, err := http.Get(requestUrl.String()) if err != nil { return nil, err } defer resp.Body.Close() - // Process response - if resp.StatusCode != http.StatusOK { - err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)", - requestUrl, - resp.StatusCode, - http.StatusText(resp.StatusCode), - http.StatusOK, - http.StatusText(http.StatusOK)) - return nil, err - } + // Process response + if resp.StatusCode != http.StatusOK { + err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)", + requestUrl, + resp.StatusCode, + http.StatusText(resp.StatusCode), + http.StatusOK, + http.StatusText(http.StatusOK)) + return nil, err + } - // read body - body, err := ioutil.ReadAll(resp.Body) + // read body + body, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, err } - // Unmarshal json - var jsonOut map[string]interface{} - if err = json.Unmarshal([]byte(body), &jsonOut); err != nil { - return nil, errors.New("Error decoding JSON response") - } + // Unmarshal json + var jsonOut map[string]interface{} + if err = json.Unmarshal([]byte(body), &jsonOut); err != nil { + return nil, errors.New("Error decoding JSON response") + } - return jsonOut, nil + return jsonOut, nil } func (m *Metric) shouldPass(field string) bool { - if m.Pass != nil { + if m.Pass != nil { - for _, pass := range m.Pass{ - if strings.HasPrefix(field, pass) { + for _, pass := range m.Pass { + if strings.HasPrefix(field, pass) { return true } - } + } - return false - } + return false + } - if m.Drop != nil { + if m.Drop != nil { - for _, drop := range m.Drop{ - if strings.HasPrefix(field, drop) { + for _, drop := range m.Drop { + if strings.HasPrefix(field, drop) { return false } - } + } - return true - } + return true + } - return true + return true } func (m *Metric) filterFields(fields map[string]interface{}) map[string]interface{} { - for field, _ := range fields{ - if !m.shouldPass(field) { + for field, _ := range fields { + if !m.shouldPass(field) { delete(fields, field) } - } + } - return fields + return fields } - func (j *Jolokia) Gather(acc plugins.Accumulator) error { - context := j.Context //"/jolokia/read" - servers := j.Servers - metrics := j.Metrics + context := j.Context //"/jolokia/read" + servers := j.Servers + metrics := j.Metrics tags := j.Tags - if tags == nil{ - tags = map[string]string{} + if tags == nil { + tags = map[string]string{} } + for _, server := range servers { + for _, metric := range metrics { - for _, server := range servers { - for _, metric := range metrics { + measurement := metric.Name + jmxPath := metric.Jmx - measurement := metric.Name - jmxPath := metric.Jmx - - tags["server"] = server.Name + tags["server"] = server.Name tags["port"] = server.Port tags["host"] = server.Host @@ -173,22 +166,22 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error { return err } - out, _ := getAttr(requestUrl) + out, _ := getAttr(requestUrl) - if values, ok := out["value"]; ok { - switch values.(type) { - case map[string]interface{}: - acc.AddFields(measurement, metric.filterFields(values.(map[string]interface{})), tags) - case interface{}: - acc.Add(measurement, values.(interface{}), tags) - } - }else{ - fmt.Printf("Missing key 'value' in '%s' output response\n", requestUrl.String()) - } - } - } + if values, ok := out["value"]; ok { + switch values.(type) { + case map[string]interface{}: + acc.AddFields(measurement, metric.filterFields(values.(map[string]interface{})), tags) + case interface{}: + acc.Add(measurement, values.(interface{}), tags) + } + } else { + fmt.Printf("Missing key 'value' in '%s' output response\n", requestUrl.String()) + } + } + } - return nil + return nil } func init() { From 6b13e32be4db06a8ef59d0ba1b1f8b4279c03fd0 Mon Sep 17 00:00:00 2001 From: saiello Date: Thu, 29 Oct 2015 14:51:15 +0100 Subject: [PATCH 5/9] Fixed sampleconfig --- plugins/jolokia/jolokia.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go index f33979b91..e9230208d 100644 --- a/plugins/jolokia/jolokia.go +++ b/plugins/jolokia/jolokia.go @@ -34,10 +34,10 @@ type Jolokia struct { } func (j *Jolokia) SampleConfig() string { - return `[jolokia] + return ` context = "/jolokia/read" - [[jolokia.tags]] + [jolokia.tags] group = "as" [[jolokia.servers]] From 7aece1fa3bea695ef002036f39d95df738ff650d Mon Sep 17 00:00:00 2001 From: saiello Date: Thu, 29 Oct 2015 17:00:26 +0100 Subject: [PATCH 6/9] Create a JolokiaClient. allowing to inject a stub implementation --- plugins/jolokia/jolokia.go | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go index e9230208d..a9ec11635 100644 --- a/plugins/jolokia/jolokia.go +++ b/plugins/jolokia/jolokia.go @@ -26,7 +26,20 @@ type Metric struct { Drop []string } +type JolokiaClient interface { + MakeRequest(req *http.Request) (*http.Response, error) +} + +type JolokiaClientImpl struct { + client *http.Client +} + +func (c JolokiaClientImpl) MakeRequest(req *http.Request) (*http.Response, error) { + return c.client.Do(req) +} + type Jolokia struct { + jClient JolokiaClient Context string Servers []Server Metrics []Metric @@ -67,9 +80,18 @@ func (j *Jolokia) Description() string { return "Read JMX metrics through Jolokia" } -func getAttr(requestUrl *url.URL) (map[string]interface{}, error) { - //make request - resp, err := http.Get(requestUrl.String()) +func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) { + // Create + send request + req, err := http.NewRequest("GET", requestUrl.String(), nil) + if err != nil { + return nil, err + } + + resp, err := j.jClient.MakeRequest(req) + if err != nil { + return nil, err + } + if err != nil { return nil, err } @@ -166,7 +188,7 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error { return err } - out, _ := getAttr(requestUrl) + out, _ := j.getAttr(requestUrl) if values, ok := out["value"]; ok { switch values.(type) { @@ -186,6 +208,6 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error { func init() { plugins.Add("jolokia", func() plugins.Plugin { - return &Jolokia{} + return &Jolokia{jClient: &JolokiaClientImpl{client: &http.Client{}}} }) } From 8428a31916f44c03b93b772838cc4c5b12fadecd Mon Sep 17 00:00:00 2001 From: saiello Date: Mon, 2 Nov 2015 12:09:30 +0100 Subject: [PATCH 7/9] Add fields value test methods --- testutil/accumulator.go | 52 ++++++++++++++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 1650045b8..02d9a06a8 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -106,15 +106,20 @@ func (a *Accumulator) Get(measurement string) (*Point, bool) { return nil, false } +// CheckValue calls CheckFieldsValue passing a single-value map as fields +func (a *Accumulator) CheckValue(measurement string, val interface{}) bool { + return a.CheckFieldsValue(measurement, map[string]interface{}{"value": val}) +} + // CheckValue checks that the accumulators point for the given measurement // is the same as the given value. -func (a *Accumulator) CheckValue(measurement string, val interface{}) bool { +func (a *Accumulator) CheckFieldsValue(measurement string, fields map[string]interface{}) bool { for _, p := range a.Points { if p.Measurement == measurement { - return p.Values["value"] == val + return reflect.DeepEqual(fields, p.Values) } } - fmt.Printf("CheckValue failed, measurement %s, value %s", measurement, val) + fmt.Printf("CheckFieldsValue failed, measurement %s, fields %s", measurement, fields) return false } @@ -127,12 +132,35 @@ func (a *Accumulator) CheckTaggedValue( return a.ValidateTaggedValue(measurement, val, tags) == nil } -// ValidateTaggedValue validates that the given measurement and value exist -// in the accumulator and with the given tags. +// ValidateTaggedValue calls ValidateTaggedFieldsValue passing a single-value map as fields func (a *Accumulator) ValidateTaggedValue( measurement string, val interface{}, tags map[string]string, +) error { + return a.ValidateTaggedFieldsValue(measurement, map[string]interface{}{"value": val}, tags) +} + +// ValidateValue calls ValidateTaggedValue +func (a *Accumulator) ValidateValue(measurement string, val interface{}) error { + return a.ValidateTaggedValue(measurement, val, nil) +} + +// CheckTaggedFieldsValue calls ValidateTaggedFieldsValue +func (a *Accumulator) CheckTaggedFieldsValue( + measurement string, + fields map[string]interface{}, + tags map[string]string, +) bool { + return a.ValidateTaggedFieldsValue(measurement, fields, tags) == nil +} + +// ValidateTaggedValue validates that the given measurement and value exist +// in the accumulator and with the given tags. +func (a *Accumulator) ValidateTaggedFieldsValue( + measurement string, + fields map[string]interface{}, + tags map[string]string, ) error { if tags == nil { tags = map[string]string{} @@ -143,9 +171,8 @@ func (a *Accumulator) ValidateTaggedValue( } if p.Measurement == measurement { - if p.Values["value"] != val { - return fmt.Errorf("%v (%T) != %v (%T)", - p.Values["value"], p.Values["value"], val, val) + if !reflect.DeepEqual(fields, p.Values) { + return fmt.Errorf("%v != %v ", fields, p.Values) } return nil } @@ -154,9 +181,12 @@ func (a *Accumulator) ValidateTaggedValue( return fmt.Errorf("unknown measurement %s with tags %v", measurement, tags) } -// ValidateValue calls ValidateTaggedValue -func (a *Accumulator) ValidateValue(measurement string, val interface{}) error { - return a.ValidateTaggedValue(measurement, val, nil) +// ValidateFieldsValue calls ValidateTaggedFieldsValue +func (a *Accumulator) ValidateFieldsValue( + measurement string, + fields map[string]interface{}, +) error { + return a.ValidateTaggedValue(measurement, fields, nil) } // HasIntValue returns true if the measurement has an Int value From abbdc3f8cf362c74b4e83538c54a7c2b7036e9e5 Mon Sep 17 00:00:00 2001 From: saiello Date: Mon, 2 Nov 2015 12:09:53 +0100 Subject: [PATCH 8/9] Test for jolokia plugin --- plugins/jolokia/jolokia_test.go | 147 ++++++++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 plugins/jolokia/jolokia_test.go diff --git a/plugins/jolokia/jolokia_test.go b/plugins/jolokia/jolokia_test.go new file mode 100644 index 000000000..95df76e7b --- /dev/null +++ b/plugins/jolokia/jolokia_test.go @@ -0,0 +1,147 @@ +package jolokia + +import ( + _ "fmt" + "io/ioutil" + "net/http" + "strings" + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + _ "github.com/stretchr/testify/require" +) + +const validMultiValueJSON = ` +{ + "request":{ + "mbean":"java.lang:type=Memory", + "attribute":"HeapMemoryUsage", + "type":"read" + }, + "value":{ + "init":67108864, + "committed":456130560, + "max":477626368, + "used":203288528 + }, + "timestamp":1446129191, + "status":200 +}` + +const validSingleValueJSON = ` +{ + "request":{ + "path":"used", + "mbean":"java.lang:type=Memory", + "attribute":"HeapMemoryUsage", + "type":"read" + }, + "value":209274376, + "timestamp":1446129256, + "status":200 +}` + +const invalidJSON = "I don't think this is JSON" + +const empty = "" + +var Servers = []Server{Server{Name: "as1", Host: "127.0.0.1", Port: "8080"}} +var HeapMetric = Metric{Name: "heap_memory_usage", Jmx: "/java.lang:type=Memory/HeapMemoryUsage"} +var UsedHeapMetric = Metric{Name: "heap_memory_usage", Jmx: "/java.lang:type=Memory/HeapMemoryUsage", Pass: []string{"used"}} + +type jolokiaClientStub struct { + responseBody string + statusCode int +} + +func (c jolokiaClientStub) MakeRequest(req *http.Request) (*http.Response, error) { + resp := http.Response{} + resp.StatusCode = c.statusCode + resp.Body = ioutil.NopCloser(strings.NewReader(c.responseBody)) + return &resp, nil +} + +// Generates a pointer to an HttpJson object that uses a mock HTTP client. +// Parameters: +// response : Body of the response that the mock HTTP client should return +// statusCode: HTTP status code the mock HTTP client should return +// +// Returns: +// *HttpJson: Pointer to an HttpJson object that uses the generated mock HTTP client +func genJolokiaClientStub(response string, statusCode int, servers []Server, metrics []Metric) *Jolokia { + return &Jolokia{ + jClient: jolokiaClientStub{responseBody: response, statusCode: statusCode}, + Servers: servers, + Metrics: metrics, + } +} + +// Test that the proper values are ignored or collected +func TestHttpJsonMultiValue(t *testing.T) { + + jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{HeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Points)) + + assert.True(t, acc.CheckFieldsValue("heap_memory_usage", map[string]interface{}{"init": 67108864.0, + "committed": 456130560.0, + "max": 477626368.0, + "used": 203288528.0})) +} + +// Test that the proper values are ignored or collected +func TestHttpJsonMultiValueWithPass(t *testing.T) { + + jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{UsedHeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Points)) + + assert.True(t, acc.CheckFieldsValue("heap_memory_usage", map[string]interface{}{"used": 203288528.0})) +} + +// Test that the proper values are ignored or collected +func TestHttpJsonMultiValueTags(t *testing.T) { + + jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{UsedHeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Points)) + assert.NoError(t, acc.ValidateTaggedFieldsValue("heap_memory_usage", map[string]interface{}{"used": 203288528.0}, map[string]string{"host": "127.0.0.1", "port": "8080", "server": "as1"})) +} + +// Test that the proper values are ignored or collected +func TestHttpJsonSingleValueTags(t *testing.T) { + + jolokia := genJolokiaClientStub(validSingleValueJSON, 200, Servers, []Metric{UsedHeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Points)) + assert.NoError(t, acc.ValidateTaggedFieldsValue("heap_memory_usage", map[string]interface{}{"value": 209274376.0}, map[string]string{"host": "127.0.0.1", "port": "8080", "server": "as1"})) +} + +// Test that the proper values are ignored or collected +func TestHttpJsonOn404(t *testing.T) { + + jolokia := genJolokiaClientStub(validMultiValueJSON, 404, Servers, []Metric{UsedHeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 0, len(acc.Points)) +} From fade65d0443699ff98a76ca5f9d870e45da3c1b1 Mon Sep 17 00:00:00 2001 From: saiello Date: Tue, 3 Nov 2015 22:00:23 +0100 Subject: [PATCH 9/9] Added jolokia README.md --- README.md | 1 + plugins/jolokia/README.md | 51 ++++++++++++++++++++++++++++++++++++++ plugins/jolokia/jolokia.go | 40 +++++++++++++++++++----------- 3 files changed, 77 insertions(+), 15 deletions(-) create mode 100644 plugins/jolokia/README.md diff --git a/README.md b/README.md index 6e5a3f171..8e244a6e5 100644 --- a/README.md +++ b/README.md @@ -173,6 +173,7 @@ Telegraf currently has support for collecting metrics from: * exec (generic JSON-emitting executable plugin) * haproxy * httpjson (generic JSON-emitting http service plugin) +* jolokia (remote JMX with JSON over HTTP) * kafka_consumer * leofs * lustre2 diff --git a/plugins/jolokia/README.md b/plugins/jolokia/README.md new file mode 100644 index 000000000..bda0c5f93 --- /dev/null +++ b/plugins/jolokia/README.md @@ -0,0 +1,51 @@ +# Telegraf plugin: Jolokia + +#### Plugin arguments: +- **context** string: Context root used of jolokia url +- **servers** []Server: List of servers + + **name** string: Server's logical name + + **host** string: Server's ip address or hostname + + **port** string: Server's listening port +- **metrics** []Metric + + **name** string: Name of the measure + + **jmx** string: Jmx path that identifies mbeans attributes + + **pass** []string: Attributes to retain when collecting values + + **drop** []string: Attributes to drop when collecting values + +#### Description + +The Jolokia plugin collects JVM metrics exposed as MBean's attributes through jolokia REST endpoint. All metrics +are collected for each server configured. + +See: https://jolokia.org/ + +# Measurements: +Jolokia plugin produces one measure for each metric configured, adding Server's `name`, `host` and `port` as tags. + +Given a configuration like: + +```ini +[jolokia] + +[[jolokia.servers]] + name = "as-service-1" + host = "127.0.0.1" + port = "8080" + +[[jolokia.servers]] + name = "as-service-2" + host = "127.0.0.1" + port = "8180" + +[[jolokia.metrics]] + name = "heap_memory_usage" + jmx = "/java.lang:type=Memory/HeapMemoryUsage" + pass = ["used", "max"] +``` + +The collected metrics will be: + +``` +jolokia_heap_memory_usage name=as-service-1,host=127.0.0.1,port=8080 used=xxx,max=yyy +jolokia_heap_memory_usage name=as-service-2,host=127.0.0.1,port=8180 used=vvv,max=zzz +``` diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go index a9ec11635..1ece12cf1 100644 --- a/plugins/jolokia/jolokia.go +++ b/plugins/jolokia/jolokia.go @@ -8,7 +8,6 @@ import ( "net/http" "net/url" "strings" - // "sync" "github.com/influxdb/telegraf/plugins" ) @@ -48,31 +47,42 @@ type Jolokia struct { func (j *Jolokia) SampleConfig() string { return ` + # This is the context root used to compose the jolokia url context = "/jolokia/read" + # Tags added to each measurements [jolokia.tags] group = "as" + # List of servers exposing jolokia read service [[jolokia.servers]] - name = "stable" - host = "192.168.103.2" - port = "8180" + name = "stable" + host = "192.168.103.2" + port = "8180" + # List of metrics collected on above servers + # Each metric consists in a name, a jmx path and either a pass or drop slice attributes + # This collect all heap memory usage metrics [[jolokia.metrics]] - name = "heap_memory_usage" - jmx = "/java.lang:type=Memory/HeapMemoryUsage" - pass = ["used"] + name = "heap_memory_usage" + jmx = "/java.lang:type=Memory/HeapMemoryUsage" - [[jolokia.metrics]] - name = "memory_eden" - jmx = "/java.lang:type=MemoryPool,name=PS Eden Space/Usage" - pass = ["used"] + # This drops the 'committed' value from Eden space measurement [[jolokia.metrics]] - name = "heap_threads" - jmx = "/java.lang:type=Threading" - # drop = ["AllThread"] - pass = ["CurrentThreadCpuTime","CurrentThreadUserTime","DaemonThreadCount","ThreadCount","TotalStartedThreadCount"] + name = "memory_eden" + jmx = "/java.lang:type=MemoryPool,name=PS Eden Space/Usage" + drop = [ "committed" ] + + + # This passes only DaemonThreadCount and ThreadCount + [[jolokia.metrics]] + name = "heap_threads" + jmx = "/java.lang:type=Threading" + pass = [ + "DaemonThreadCount", + "ThreadCount" + ] ` }