From f05d89ed72a6b08d4c1c0fa1eb31147f800aa077 Mon Sep 17 00:00:00 2001 From: cornerot Date: Wed, 4 Nov 2015 14:03:43 +0300 Subject: [PATCH 01/11] removed "panic" from bcache plugin closes #343 --- plugins/bcache/bcache.go | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/plugins/bcache/bcache.go b/plugins/bcache/bcache.go index ee63f3c48..76e638ea4 100644 --- a/plugins/bcache/bcache.go +++ b/plugins/bcache/bcache.go @@ -1,6 +1,7 @@ package bcache import ( + "errors" "io/ioutil" "os" "path/filepath" @@ -34,17 +35,6 @@ func (b *Bcache) Description() string { return "Read metrics of bcache from stats_total and dirty_data" } -func getBackingDevs(bcachePath string) []string { - bdevs, err := filepath.Glob(bcachePath + "/*/bdev*") - if len(bdevs) < 1 { - panic("Can't found any bcache device") - } - if err != nil { - panic(err) - } - return bdevs -} - func getTags(bdev string) map[string]string { backingDevFile, _ := os.Readlink(bdev) backingDevPath := strings.Split(backingDevFile, "/") @@ -83,11 +73,11 @@ func (b *Bcache) gatherBcache(bdev string, acc plugins.Accumulator) error { tags := getTags(bdev) metrics, err := filepath.Glob(bdev + "/stats_total/*") if len(metrics) < 0 { - panic("Can't read any stats file") + return errors.New("Can't read any stats file") } file, err := ioutil.ReadFile(bdev + "/dirty_data") if err != nil { - panic(err) + return err } rawValue := strings.TrimSpace(string(file)) value := prettyToBytes(rawValue) @@ -98,7 +88,7 @@ func (b *Bcache) gatherBcache(bdev string, acc plugins.Accumulator) error { file, err := ioutil.ReadFile(path) rawValue := strings.TrimSpace(string(file)) if err != nil { - panic(err) + return err } if key == "bypassed" { value := prettyToBytes(rawValue) @@ -125,7 +115,11 @@ func (b *Bcache) Gather(acc plugins.Accumulator) error { if len(bcachePath) == 0 { bcachePath = "/sys/fs/bcache" } - for _, bdev := range getBackingDevs(bcachePath) { + bdevs, _ := filepath.Glob(bcachePath + "/*/bdev*") + if len(bdevs) < 1 { + return errors.New("Can't found any bcache device") + } + for _, bdev := range bdevs { if restrictDevs { bcacheDev := getTags(bdev)["bcache_dev"] if !bcacheDevsChecked[bcacheDev] { From 25fd4297a8d908af2a755fd6fb6d13865532be6b Mon Sep 17 00:00:00 2001 From: Simone Aiello Date: Wed, 28 Oct 2015 09:13:22 +0100 Subject: [PATCH 02/11] Jolokia plugin first commit --- plugins/all/all.go | 1 + plugins/jolokia/jolokia.go | 189 +++++++++++++++++++++++++++++++++++++ 2 files changed, 190 insertions(+) create mode 100644 plugins/jolokia/jolokia.go diff --git a/plugins/all/all.go b/plugins/all/all.go index f29a4987b..d714dd81e 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -9,6 +9,7 @@ import ( _ "github.com/influxdb/telegraf/plugins/exec" _ "github.com/influxdb/telegraf/plugins/haproxy" _ "github.com/influxdb/telegraf/plugins/httpjson" + _ "github.com/influxdb/telegraf/plugins/jolokia" _ "github.com/influxdb/telegraf/plugins/kafka_consumer" _ "github.com/influxdb/telegraf/plugins/leofs" _ "github.com/influxdb/telegraf/plugins/lustre2" diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go new file mode 100644 index 000000000..ee43246da --- /dev/null +++ b/plugins/jolokia/jolokia.go @@ -0,0 +1,189 @@ +package jolokia + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + // "net/url" + "strings" + // "sync" + + "github.com/influxdb/telegraf/plugins" +) + + +type Server struct { + Name string + Host string + Port string +} + +type Metric struct { + Name string + Jmx string + Pass []string + Drop []string +} + +type Jolokia struct { + + Context string + Servers []Server + Metrics []Metric + +} + + +func (j *Jolokia) SampleConfig() string { + return `[jolokia] + context = "/jolokia/read" + + [[jolokia.servers]] + name = "stable" + host = "192.168.103.2" + port = "8180" + + [[jolokia.metrics]] + name = "heap_memory_usage" + jmx = "/java.lang:type=Memory/HeapMemoryUsage" + pass = ["used"] + + [[jolokia.metrics]] + name = "memory_eden" + jmx = "/java.lang:type=MemoryPool,name=PS Eden Space/Usage" + pass = ["used"] + + [[jolokia.metrics]] + name = "heap_threads" + jmx = "/java.lang:type=Threading" + # drop = ["AllThread"] + pass = ["CurrentThreadCpuTime","CurrentThreadUserTime","DaemonThreadCount","ThreadCount","TotalStartedThreadCount"] +` +} + +func (j *Jolokia) Description() string { + return "Read JMX metrics through Jolokia" +} + + + +func getAttr(url string) (map[string]interface{}, error) { + //make request + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + // Process response + if resp.StatusCode != http.StatusOK { + err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)", + url, + resp.StatusCode, + http.StatusText(resp.StatusCode), + http.StatusOK, + http.StatusText(http.StatusOK)) + return nil, err + } + + // read body + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + // Unmarshal json + var jsonOut map[string]interface{} + if err = json.Unmarshal([]byte(body), &jsonOut); err != nil { + return nil, errors.New("Error decoding JSON response") + } + + return jsonOut, nil +} + +func (m *Metric) shouldPass(field string) bool { + + if m.Pass != nil { + + for _, pass := range m.Pass{ + if strings.HasPrefix(field, pass) { + return true + } + } + + return false + } + + if m.Drop != nil { + + for _, drop := range m.Drop{ + if strings.HasPrefix(field, drop) { + return false + } + } + + return true + } + + return true +} + +func (m *Metric) filterFields(fields map[string]interface{}) map[string]interface{} { + + for field, _ := range fields{ + if !m.shouldPass(field) { + delete(fields, field) + } + } + + return fields +} + + +func (j *Jolokia) Gather(acc plugins.Accumulator) error { + + context := j.Context //"/jolokia/read" + servers := j.Servers + metrics := j.Metrics + + var tags = map[string]string{ + "group": "application_server", + } + + for _, server := range servers { + for _, metric := range metrics { + + measurement := metric.Name + jmxPath := metric.Jmx + + tags["server"] = server.Name + tags["port"] = server.Port + tags["host"] = server.Host + + url := "http://" + server.Host + ":" + server.Port + context + jmxPath + //fmt.Println(url) + out, _ := getAttr(url) + + if values, ok := out["value"]; ok { + switch values.(type) { + case map[string]interface{}: + acc.AddFields(measurement, metric.filterFields(values.(map[string]interface{})), tags) + case interface{}: + acc.Add(measurement, values.(interface{}), tags) + } + }else{ + fmt.Println("Missing key value") + } + } + } + + return nil +} + +func init() { + plugins.Add("jolokia", func() plugins.Plugin { + return &Jolokia{} + }) +} From 2daa9ff260e4196feedefaa986629898b643dfce Mon Sep 17 00:00:00 2001 From: saiello Date: Wed, 28 Oct 2015 11:36:03 +0100 Subject: [PATCH 03/11] Added Tags as toml field --- plugins/jolokia/jolokia.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go index ee43246da..4dbd2fea0 100644 --- a/plugins/jolokia/jolokia.go +++ b/plugins/jolokia/jolokia.go @@ -32,7 +32,7 @@ type Jolokia struct { Context string Servers []Server Metrics []Metric - + Tags map[string]string } @@ -40,6 +40,9 @@ func (j *Jolokia) SampleConfig() string { return `[jolokia] context = "/jolokia/read" + [[jolokia.tags]] + group = "as" + [[jolokia.servers]] name = "stable" host = "192.168.103.2" @@ -147,10 +150,12 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error { context := j.Context //"/jolokia/read" servers := j.Servers metrics := j.Metrics + tags := j.Tags + + if tags == nil{ + tags = map[string]string{} + } - var tags = map[string]string{ - "group": "application_server", - } for _, server := range servers { for _, metric := range metrics { @@ -174,7 +179,7 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error { acc.Add(measurement, values.(interface{}), tags) } }else{ - fmt.Println("Missing key value") + fmt.Printf("Missing key 'value' in '%s' output response\n", url) } } } From 40d8aeecb096fc463b3c2148dff2de47663e43c4 Mon Sep 17 00:00:00 2001 From: saiello Date: Thu, 29 Oct 2015 13:25:16 +0100 Subject: [PATCH 04/11] Use url.Parse to validate configuration params --- plugins/jolokia/jolokia.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go index 4dbd2fea0..7ac73f02d 100644 --- a/plugins/jolokia/jolokia.go +++ b/plugins/jolokia/jolokia.go @@ -6,7 +6,7 @@ import ( "fmt" "io/ioutil" "net/http" - // "net/url" + "net/url" "strings" // "sync" @@ -72,9 +72,9 @@ func (j *Jolokia) Description() string { -func getAttr(url string) (map[string]interface{}, error) { +func getAttr(requestUrl *url.URL) (map[string]interface{}, error) { //make request - resp, err := http.Get(url) + resp, err := http.Get(requestUrl.String()) if err != nil { return nil, err } @@ -83,7 +83,7 @@ func getAttr(url string) (map[string]interface{}, error) { // Process response if resp.StatusCode != http.StatusOK { err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)", - url, + requestUrl, resp.StatusCode, http.StatusText(resp.StatusCode), http.StatusOK, @@ -167,9 +167,13 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error { tags["port"] = server.Port tags["host"] = server.Host - url := "http://" + server.Host + ":" + server.Port + context + jmxPath - //fmt.Println(url) - out, _ := getAttr(url) + // Prepare URL + requestUrl, err := url.Parse("http://" + server.Host + ":" + server.Port + context + jmxPath) + if err != nil { + return err + } + + out, _ := getAttr(requestUrl) if values, ok := out["value"]; ok { switch values.(type) { @@ -179,7 +183,7 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error { acc.Add(measurement, values.(interface{}), tags) } }else{ - fmt.Printf("Missing key 'value' in '%s' output response\n", url) + fmt.Printf("Missing key 'value' in '%s' output response\n", requestUrl.String()) } } } From 62270a369777bf309de0b38e06cfa9191e948453 Mon Sep 17 00:00:00 2001 From: saiello Date: Thu, 29 Oct 2015 14:48:39 +0100 Subject: [PATCH 05/11] go fmt run over jolokia.go --- plugins/jolokia/jolokia.go | 149 ++++++++++++++++++------------------- 1 file changed, 71 insertions(+), 78 deletions(-) diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go index 7ac73f02d..f33979b91 100644 --- a/plugins/jolokia/jolokia.go +++ b/plugins/jolokia/jolokia.go @@ -13,29 +13,26 @@ import ( "github.com/influxdb/telegraf/plugins" ) - type Server struct { - Name string - Host string - Port string + Name string + Host string + Port string } type Metric struct { - Name string - Jmx string - Pass []string - Drop []string + Name string + Jmx string + Pass []string + Drop []string } type Jolokia struct { - - Context string - Servers []Server - Metrics []Metric - Tags map[string]string + Context string + Servers []Server + Metrics []Metric + Tags map[string]string } - func (j *Jolokia) SampleConfig() string { return `[jolokia] context = "/jolokia/read" @@ -70,100 +67,96 @@ func (j *Jolokia) Description() string { return "Read JMX metrics through Jolokia" } - - func getAttr(requestUrl *url.URL) (map[string]interface{}, error) { - //make request + //make request resp, err := http.Get(requestUrl.String()) if err != nil { return nil, err } defer resp.Body.Close() - // Process response - if resp.StatusCode != http.StatusOK { - err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)", - requestUrl, - resp.StatusCode, - http.StatusText(resp.StatusCode), - http.StatusOK, - http.StatusText(http.StatusOK)) - return nil, err - } + // Process response + if resp.StatusCode != http.StatusOK { + err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)", + requestUrl, + resp.StatusCode, + http.StatusText(resp.StatusCode), + http.StatusOK, + http.StatusText(http.StatusOK)) + return nil, err + } - // read body - body, err := ioutil.ReadAll(resp.Body) + // read body + body, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, err } - // Unmarshal json - var jsonOut map[string]interface{} - if err = json.Unmarshal([]byte(body), &jsonOut); err != nil { - return nil, errors.New("Error decoding JSON response") - } + // Unmarshal json + var jsonOut map[string]interface{} + if err = json.Unmarshal([]byte(body), &jsonOut); err != nil { + return nil, errors.New("Error decoding JSON response") + } - return jsonOut, nil + return jsonOut, nil } func (m *Metric) shouldPass(field string) bool { - if m.Pass != nil { + if m.Pass != nil { - for _, pass := range m.Pass{ - if strings.HasPrefix(field, pass) { + for _, pass := range m.Pass { + if strings.HasPrefix(field, pass) { return true } - } + } - return false - } + return false + } - if m.Drop != nil { + if m.Drop != nil { - for _, drop := range m.Drop{ - if strings.HasPrefix(field, drop) { + for _, drop := range m.Drop { + if strings.HasPrefix(field, drop) { return false } - } + } - return true - } + return true + } - return true + return true } func (m *Metric) filterFields(fields map[string]interface{}) map[string]interface{} { - for field, _ := range fields{ - if !m.shouldPass(field) { + for field, _ := range fields { + if !m.shouldPass(field) { delete(fields, field) } - } + } - return fields + return fields } - func (j *Jolokia) Gather(acc plugins.Accumulator) error { - context := j.Context //"/jolokia/read" - servers := j.Servers - metrics := j.Metrics + context := j.Context //"/jolokia/read" + servers := j.Servers + metrics := j.Metrics tags := j.Tags - if tags == nil{ - tags = map[string]string{} + if tags == nil { + tags = map[string]string{} } + for _, server := range servers { + for _, metric := range metrics { - for _, server := range servers { - for _, metric := range metrics { + measurement := metric.Name + jmxPath := metric.Jmx - measurement := metric.Name - jmxPath := metric.Jmx - - tags["server"] = server.Name + tags["server"] = server.Name tags["port"] = server.Port tags["host"] = server.Host @@ -173,22 +166,22 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error { return err } - out, _ := getAttr(requestUrl) + out, _ := getAttr(requestUrl) - if values, ok := out["value"]; ok { - switch values.(type) { - case map[string]interface{}: - acc.AddFields(measurement, metric.filterFields(values.(map[string]interface{})), tags) - case interface{}: - acc.Add(measurement, values.(interface{}), tags) - } - }else{ - fmt.Printf("Missing key 'value' in '%s' output response\n", requestUrl.String()) - } - } - } + if values, ok := out["value"]; ok { + switch values.(type) { + case map[string]interface{}: + acc.AddFields(measurement, metric.filterFields(values.(map[string]interface{})), tags) + case interface{}: + acc.Add(measurement, values.(interface{}), tags) + } + } else { + fmt.Printf("Missing key 'value' in '%s' output response\n", requestUrl.String()) + } + } + } - return nil + return nil } func init() { From eabc0875deb6f9b1faaed0230f9b4a454736748f Mon Sep 17 00:00:00 2001 From: saiello Date: Thu, 29 Oct 2015 14:51:15 +0100 Subject: [PATCH 06/11] Fixed sampleconfig --- plugins/jolokia/jolokia.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go index f33979b91..e9230208d 100644 --- a/plugins/jolokia/jolokia.go +++ b/plugins/jolokia/jolokia.go @@ -34,10 +34,10 @@ type Jolokia struct { } func (j *Jolokia) SampleConfig() string { - return `[jolokia] + return ` context = "/jolokia/read" - [[jolokia.tags]] + [jolokia.tags] group = "as" [[jolokia.servers]] From 55c598f9ff4d4090c2ffb4a1eec2430a975b9158 Mon Sep 17 00:00:00 2001 From: saiello Date: Thu, 29 Oct 2015 17:00:26 +0100 Subject: [PATCH 07/11] Create a JolokiaClient. allowing to inject a stub implementation --- plugins/jolokia/jolokia.go | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go index e9230208d..a9ec11635 100644 --- a/plugins/jolokia/jolokia.go +++ b/plugins/jolokia/jolokia.go @@ -26,7 +26,20 @@ type Metric struct { Drop []string } +type JolokiaClient interface { + MakeRequest(req *http.Request) (*http.Response, error) +} + +type JolokiaClientImpl struct { + client *http.Client +} + +func (c JolokiaClientImpl) MakeRequest(req *http.Request) (*http.Response, error) { + return c.client.Do(req) +} + type Jolokia struct { + jClient JolokiaClient Context string Servers []Server Metrics []Metric @@ -67,9 +80,18 @@ func (j *Jolokia) Description() string { return "Read JMX metrics through Jolokia" } -func getAttr(requestUrl *url.URL) (map[string]interface{}, error) { - //make request - resp, err := http.Get(requestUrl.String()) +func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) { + // Create + send request + req, err := http.NewRequest("GET", requestUrl.String(), nil) + if err != nil { + return nil, err + } + + resp, err := j.jClient.MakeRequest(req) + if err != nil { + return nil, err + } + if err != nil { return nil, err } @@ -166,7 +188,7 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error { return err } - out, _ := getAttr(requestUrl) + out, _ := j.getAttr(requestUrl) if values, ok := out["value"]; ok { switch values.(type) { @@ -186,6 +208,6 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error { func init() { plugins.Add("jolokia", func() plugins.Plugin { - return &Jolokia{} + return &Jolokia{jClient: &JolokiaClientImpl{client: &http.Client{}}} }) } From b2e22cbc597e95960d372a311a650039cd4368bd Mon Sep 17 00:00:00 2001 From: saiello Date: Mon, 2 Nov 2015 12:09:30 +0100 Subject: [PATCH 08/11] Add fields value test methods --- testutil/accumulator.go | 52 ++++++++++++++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 361f81e9d..d8f44ddf8 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -106,15 +106,20 @@ func (a *Accumulator) Get(measurement string) (*Point, bool) { return nil, false } +// CheckValue calls CheckFieldsValue passing a single-value map as fields +func (a *Accumulator) CheckValue(measurement string, val interface{}) bool { + return a.CheckFieldsValue(measurement, map[string]interface{}{"value": val}) +} + // CheckValue checks that the accumulators point for the given measurement // is the same as the given value. -func (a *Accumulator) CheckValue(measurement string, val interface{}) bool { +func (a *Accumulator) CheckFieldsValue(measurement string, fields map[string]interface{}) bool { for _, p := range a.Points { if p.Measurement == measurement { - return p.Values["value"] == val + return reflect.DeepEqual(fields, p.Values) } } - fmt.Printf("CheckValue failed, measurement %s, value %s", measurement, val) + fmt.Printf("CheckFieldsValue failed, measurement %s, fields %s", measurement, fields) return false } @@ -127,12 +132,35 @@ func (a *Accumulator) CheckTaggedValue( return a.ValidateTaggedValue(measurement, val, tags) == nil } -// ValidateTaggedValue validates that the given measurement and value exist -// in the accumulator and with the given tags. +// ValidateTaggedValue calls ValidateTaggedFieldsValue passing a single-value map as fields func (a *Accumulator) ValidateTaggedValue( measurement string, val interface{}, tags map[string]string, +) error { + return a.ValidateTaggedFieldsValue(measurement, map[string]interface{}{"value": val}, tags) +} + +// ValidateValue calls ValidateTaggedValue +func (a *Accumulator) ValidateValue(measurement string, val interface{}) error { + return a.ValidateTaggedValue(measurement, val, nil) +} + +// CheckTaggedFieldsValue calls ValidateTaggedFieldsValue +func (a *Accumulator) CheckTaggedFieldsValue( + measurement string, + fields map[string]interface{}, + tags map[string]string, +) bool { + return a.ValidateTaggedFieldsValue(measurement, fields, tags) == nil +} + +// ValidateTaggedValue validates that the given measurement and value exist +// in the accumulator and with the given tags. +func (a *Accumulator) ValidateTaggedFieldsValue( + measurement string, + fields map[string]interface{}, + tags map[string]string, ) error { if tags == nil { tags = map[string]string{} @@ -143,9 +171,8 @@ func (a *Accumulator) ValidateTaggedValue( } if p.Measurement == measurement { - if p.Values["value"] != val { - return fmt.Errorf("%v (%T) != %v (%T)", - p.Values["value"], p.Values["value"], val, val) + if !reflect.DeepEqual(fields, p.Values) { + return fmt.Errorf("%v != %v ", fields, p.Values) } return nil } @@ -154,9 +181,12 @@ func (a *Accumulator) ValidateTaggedValue( return fmt.Errorf("unknown measurement %s with tags %v", measurement, tags) } -// ValidateValue calls ValidateTaggedValue -func (a *Accumulator) ValidateValue(measurement string, val interface{}) error { - return a.ValidateTaggedValue(measurement, val, nil) +// ValidateFieldsValue calls ValidateTaggedFieldsValue +func (a *Accumulator) ValidateFieldsValue( + measurement string, + fields map[string]interface{}, +) error { + return a.ValidateTaggedValue(measurement, fields, nil) } func (a *Accumulator) ValidateTaggedFields( From 921ffb7bdb974cab286ae2082e7fb4c6f75a3676 Mon Sep 17 00:00:00 2001 From: saiello Date: Mon, 2 Nov 2015 12:09:53 +0100 Subject: [PATCH 09/11] Test for jolokia plugin --- plugins/jolokia/jolokia_test.go | 147 ++++++++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 plugins/jolokia/jolokia_test.go diff --git a/plugins/jolokia/jolokia_test.go b/plugins/jolokia/jolokia_test.go new file mode 100644 index 000000000..95df76e7b --- /dev/null +++ b/plugins/jolokia/jolokia_test.go @@ -0,0 +1,147 @@ +package jolokia + +import ( + _ "fmt" + "io/ioutil" + "net/http" + "strings" + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + _ "github.com/stretchr/testify/require" +) + +const validMultiValueJSON = ` +{ + "request":{ + "mbean":"java.lang:type=Memory", + "attribute":"HeapMemoryUsage", + "type":"read" + }, + "value":{ + "init":67108864, + "committed":456130560, + "max":477626368, + "used":203288528 + }, + "timestamp":1446129191, + "status":200 +}` + +const validSingleValueJSON = ` +{ + "request":{ + "path":"used", + "mbean":"java.lang:type=Memory", + "attribute":"HeapMemoryUsage", + "type":"read" + }, + "value":209274376, + "timestamp":1446129256, + "status":200 +}` + +const invalidJSON = "I don't think this is JSON" + +const empty = "" + +var Servers = []Server{Server{Name: "as1", Host: "127.0.0.1", Port: "8080"}} +var HeapMetric = Metric{Name: "heap_memory_usage", Jmx: "/java.lang:type=Memory/HeapMemoryUsage"} +var UsedHeapMetric = Metric{Name: "heap_memory_usage", Jmx: "/java.lang:type=Memory/HeapMemoryUsage", Pass: []string{"used"}} + +type jolokiaClientStub struct { + responseBody string + statusCode int +} + +func (c jolokiaClientStub) MakeRequest(req *http.Request) (*http.Response, error) { + resp := http.Response{} + resp.StatusCode = c.statusCode + resp.Body = ioutil.NopCloser(strings.NewReader(c.responseBody)) + return &resp, nil +} + +// Generates a pointer to an HttpJson object that uses a mock HTTP client. +// Parameters: +// response : Body of the response that the mock HTTP client should return +// statusCode: HTTP status code the mock HTTP client should return +// +// Returns: +// *HttpJson: Pointer to an HttpJson object that uses the generated mock HTTP client +func genJolokiaClientStub(response string, statusCode int, servers []Server, metrics []Metric) *Jolokia { + return &Jolokia{ + jClient: jolokiaClientStub{responseBody: response, statusCode: statusCode}, + Servers: servers, + Metrics: metrics, + } +} + +// Test that the proper values are ignored or collected +func TestHttpJsonMultiValue(t *testing.T) { + + jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{HeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Points)) + + assert.True(t, acc.CheckFieldsValue("heap_memory_usage", map[string]interface{}{"init": 67108864.0, + "committed": 456130560.0, + "max": 477626368.0, + "used": 203288528.0})) +} + +// Test that the proper values are ignored or collected +func TestHttpJsonMultiValueWithPass(t *testing.T) { + + jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{UsedHeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Points)) + + assert.True(t, acc.CheckFieldsValue("heap_memory_usage", map[string]interface{}{"used": 203288528.0})) +} + +// Test that the proper values are ignored or collected +func TestHttpJsonMultiValueTags(t *testing.T) { + + jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{UsedHeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Points)) + assert.NoError(t, acc.ValidateTaggedFieldsValue("heap_memory_usage", map[string]interface{}{"used": 203288528.0}, map[string]string{"host": "127.0.0.1", "port": "8080", "server": "as1"})) +} + +// Test that the proper values are ignored or collected +func TestHttpJsonSingleValueTags(t *testing.T) { + + jolokia := genJolokiaClientStub(validSingleValueJSON, 200, Servers, []Metric{UsedHeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Points)) + assert.NoError(t, acc.ValidateTaggedFieldsValue("heap_memory_usage", map[string]interface{}{"value": 209274376.0}, map[string]string{"host": "127.0.0.1", "port": "8080", "server": "as1"})) +} + +// Test that the proper values are ignored or collected +func TestHttpJsonOn404(t *testing.T) { + + jolokia := genJolokiaClientStub(validMultiValueJSON, 404, Servers, []Metric{UsedHeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 0, len(acc.Points)) +} From acf1da4d3057919a29721d697c241ed128c2bd8a Mon Sep 17 00:00:00 2001 From: saiello Date: Tue, 3 Nov 2015 22:00:23 +0100 Subject: [PATCH 10/11] Added jolokia README.md closes #337 --- CHANGELOG.md | 1 + README.md | 1 + plugins/jolokia/README.md | 51 ++++++++++++++++++++++++++++++++++++++ plugins/jolokia/jolokia.go | 40 +++++++++++++++++++----------- 4 files changed, 78 insertions(+), 15 deletions(-) create mode 100644 plugins/jolokia/README.md diff --git a/CHANGELOG.md b/CHANGELOG.md index eee4a9ad0..ddb7388b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ changed to just run docker commands in the Makefile. See `make docker-run` and - [#325](https://github.com/influxdb/telegraf/pull/325): NSQ output. Thanks @jrxFive! - [#318](https://github.com/influxdb/telegraf/pull/318): Prometheus output. Thanks @oldmantaiter! - [#338](https://github.com/influxdb/telegraf/pull/338): Restart Telegraf on package upgrade. Thanks @linsomniac! +- [#337](https://github.com/influxdb/telegraf/pull/337): Jolokia plugin, thanks @saiello! ### Bugfixes - [#331](https://github.com/influxdb/telegraf/pull/331): Dont overwrite host tag in redis plugin. diff --git a/README.md b/README.md index 4b6bd636f..ae89e57d2 100644 --- a/README.md +++ b/README.md @@ -173,6 +173,7 @@ Telegraf currently has support for collecting metrics from: * exec (generic JSON-emitting executable plugin) * haproxy * httpjson (generic JSON-emitting http service plugin) +* jolokia (remote JMX with JSON over HTTP) * kafka_consumer * leofs * lustre2 diff --git a/plugins/jolokia/README.md b/plugins/jolokia/README.md new file mode 100644 index 000000000..bda0c5f93 --- /dev/null +++ b/plugins/jolokia/README.md @@ -0,0 +1,51 @@ +# Telegraf plugin: Jolokia + +#### Plugin arguments: +- **context** string: Context root used of jolokia url +- **servers** []Server: List of servers + + **name** string: Server's logical name + + **host** string: Server's ip address or hostname + + **port** string: Server's listening port +- **metrics** []Metric + + **name** string: Name of the measure + + **jmx** string: Jmx path that identifies mbeans attributes + + **pass** []string: Attributes to retain when collecting values + + **drop** []string: Attributes to drop when collecting values + +#### Description + +The Jolokia plugin collects JVM metrics exposed as MBean's attributes through jolokia REST endpoint. All metrics +are collected for each server configured. + +See: https://jolokia.org/ + +# Measurements: +Jolokia plugin produces one measure for each metric configured, adding Server's `name`, `host` and `port` as tags. + +Given a configuration like: + +```ini +[jolokia] + +[[jolokia.servers]] + name = "as-service-1" + host = "127.0.0.1" + port = "8080" + +[[jolokia.servers]] + name = "as-service-2" + host = "127.0.0.1" + port = "8180" + +[[jolokia.metrics]] + name = "heap_memory_usage" + jmx = "/java.lang:type=Memory/HeapMemoryUsage" + pass = ["used", "max"] +``` + +The collected metrics will be: + +``` +jolokia_heap_memory_usage name=as-service-1,host=127.0.0.1,port=8080 used=xxx,max=yyy +jolokia_heap_memory_usage name=as-service-2,host=127.0.0.1,port=8180 used=vvv,max=zzz +``` diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go index a9ec11635..1ece12cf1 100644 --- a/plugins/jolokia/jolokia.go +++ b/plugins/jolokia/jolokia.go @@ -8,7 +8,6 @@ import ( "net/http" "net/url" "strings" - // "sync" "github.com/influxdb/telegraf/plugins" ) @@ -48,31 +47,42 @@ type Jolokia struct { func (j *Jolokia) SampleConfig() string { return ` + # This is the context root used to compose the jolokia url context = "/jolokia/read" + # Tags added to each measurements [jolokia.tags] group = "as" + # List of servers exposing jolokia read service [[jolokia.servers]] - name = "stable" - host = "192.168.103.2" - port = "8180" + name = "stable" + host = "192.168.103.2" + port = "8180" + # List of metrics collected on above servers + # Each metric consists in a name, a jmx path and either a pass or drop slice attributes + # This collect all heap memory usage metrics [[jolokia.metrics]] - name = "heap_memory_usage" - jmx = "/java.lang:type=Memory/HeapMemoryUsage" - pass = ["used"] + name = "heap_memory_usage" + jmx = "/java.lang:type=Memory/HeapMemoryUsage" - [[jolokia.metrics]] - name = "memory_eden" - jmx = "/java.lang:type=MemoryPool,name=PS Eden Space/Usage" - pass = ["used"] + # This drops the 'committed' value from Eden space measurement [[jolokia.metrics]] - name = "heap_threads" - jmx = "/java.lang:type=Threading" - # drop = ["AllThread"] - pass = ["CurrentThreadCpuTime","CurrentThreadUserTime","DaemonThreadCount","ThreadCount","TotalStartedThreadCount"] + name = "memory_eden" + jmx = "/java.lang:type=MemoryPool,name=PS Eden Space/Usage" + drop = [ "committed" ] + + + # This passes only DaemonThreadCount and ThreadCount + [[jolokia.metrics]] + name = "heap_threads" + jmx = "/java.lang:type=Threading" + pass = [ + "DaemonThreadCount", + "ThreadCount" + ] ` } From 00614026b3561e86d6d74f2b9700744d7857a03e Mon Sep 17 00:00:00 2001 From: Subhachandra Chandra Date: Wed, 4 Nov 2015 17:21:42 -0800 Subject: [PATCH 11/11] Added parameters "Devices" and "SkipSerialNumber to DiskIO plugin. "Devices" can be used to specify storage devices on which stats should be reported. "SkipSerialNumber" can be used to omit the device serial number. Added tests to verify the new parameters. closes #344 --- plugins/system/disk.go | 34 +++++++++++++++--- plugins/system/system_test.go | 68 +++++++++++++++++++++++++++++------ 2 files changed, 87 insertions(+), 15 deletions(-) diff --git a/plugins/system/disk.go b/plugins/system/disk.go index 718d79949..784dcdb13 100644 --- a/plugins/system/disk.go +++ b/plugins/system/disk.go @@ -18,8 +18,7 @@ func (_ *DiskStats) Description() string { var diskSampleConfig = ` # By default, telegraf gather stats for all mountpoints. - # Setting mountpoints will restrict the stats to the specified ones. - # mountpoints. + # Setting mountpoints will restrict the stats to the specified mountpoints. # Mountpoints=["/"] ` @@ -64,13 +63,27 @@ func (s *DiskStats) Gather(acc plugins.Accumulator) error { type DiskIOStats struct { ps PS + + Devices []string + SkipSerialNumber bool } func (_ *DiskIOStats) Description() string { return "Read metrics about disk IO by device" } -func (_ *DiskIOStats) SampleConfig() string { return "" } +var diskIoSampleConfig = ` + # By default, telegraf will gather stats for all devices including + # disk partitions. + # Setting devices will restrict the stats to the specified devcies. + # Devices=["sda","sdb"] + # Uncomment the following line if you do not need disk serial numbers. + # SkipSerialNumber = true +` + +func (_ *DiskIOStats) SampleConfig() string { + return diskIoSampleConfig +} func (s *DiskIOStats) Gather(acc plugins.Accumulator) error { diskio, err := s.ps.DiskIO() @@ -78,12 +91,25 @@ func (s *DiskIOStats) Gather(acc plugins.Accumulator) error { return fmt.Errorf("error getting disk io info: %s", err) } + var restrictDevices bool + devices := make(map[string]bool) + if len(s.Devices) != 0 { + restrictDevices = true + for _, dev := range s.Devices { + devices[dev] = true + } + } + for _, io := range diskio { + _, member := devices[io.Name] + if restrictDevices && !member { + continue + } tags := map[string]string{} if len(io.Name) != 0 { tags["name"] = io.Name } - if len(io.SerialNumber) != 0 { + if len(io.SerialNumber) != 0 && !s.SkipSerialNumber { tags["serial"] = io.SerialNumber } diff --git a/plugins/system/system_test.go b/plugins/system/system_test.go index 5839c8c60..389965e3b 100644 --- a/plugins/system/system_test.go +++ b/plugins/system/system_test.go @@ -73,7 +73,8 @@ func TestSystemStats_GenerateStats(t *testing.T) { mps.On("DiskUsage").Return(du, nil) - diskio := disk.DiskIOCountersStat{ + diskio1 := disk.DiskIOCountersStat{ + ReadCount: 888, WriteCount: 5341, ReadBytes: 100000, @@ -84,8 +85,19 @@ func TestSystemStats_GenerateStats(t *testing.T) { IoTime: 123552, SerialNumber: "ab-123-ad", } + diskio2 := disk.DiskIOCountersStat{ + ReadCount: 444, + WriteCount: 2341, + ReadBytes: 200000, + WriteBytes: 400000, + ReadTime: 3123, + WriteTime: 6087, + Name: "sdb1", + IoTime: 246552, + SerialNumber: "bb-123-ad", + } - mps.On("DiskIO").Return(map[string]disk.DiskIOCountersStat{"sda1": diskio}, nil) + mps.On("DiskIO").Return(map[string]disk.DiskIOCountersStat{"sda1": diskio1, "sdb1": diskio2}, nil) netio := net.NetIOCountersStat{ Name: "eth0", @@ -262,21 +274,55 @@ func TestSystemStats_GenerateStats(t *testing.T) { assert.NoError(t, acc.ValidateTaggedValue("drop_in", uint64(7), ntags)) assert.NoError(t, acc.ValidateTaggedValue("drop_out", uint64(1), ntags)) - err = (&DiskIOStats{&mps}).Gather(&acc) + preDiskIOPoints := len(acc.Points) + + err = (&DiskIOStats{ps: &mps}).Gather(&acc) require.NoError(t, err) - dtags := map[string]string{ + numDiskIOPoints := len(acc.Points) - preDiskIOPoints + expectedAllDiskIOPoints := 14 + assert.Equal(t, expectedAllDiskIOPoints, numDiskIOPoints) + + dtags1 := map[string]string{ "name": "sda1", "serial": "ab-123-ad", } + dtags2 := map[string]string{ + "name": "sdb1", + "serial": "bb-123-ad", + } - assert.True(t, acc.CheckTaggedValue("reads", uint64(888), dtags)) - assert.True(t, acc.CheckTaggedValue("writes", uint64(5341), dtags)) - assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(100000), dtags)) - assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(200000), dtags)) - assert.True(t, acc.CheckTaggedValue("read_time", uint64(7123), dtags)) - assert.True(t, acc.CheckTaggedValue("write_time", uint64(9087), dtags)) - assert.True(t, acc.CheckTaggedValue("io_time", uint64(123552), dtags)) + assert.True(t, acc.CheckTaggedValue("reads", uint64(888), dtags1)) + assert.True(t, acc.CheckTaggedValue("writes", uint64(5341), dtags1)) + assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(100000), dtags1)) + assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(200000), dtags1)) + assert.True(t, acc.CheckTaggedValue("read_time", uint64(7123), dtags1)) + assert.True(t, acc.CheckTaggedValue("write_time", uint64(9087), dtags1)) + assert.True(t, acc.CheckTaggedValue("io_time", uint64(123552), dtags1)) + assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags2)) + assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags2)) + assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags2)) + assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags2)) + assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags2)) + assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags2)) + assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags2)) + + // We expect 7 more DiskIOPoints to show up with an explicit match on "sdb1" + // and serial should be missing from the tags with SkipSerialNumber set + err = (&DiskIOStats{ps: &mps, Devices: []string{"sdb1"}, SkipSerialNumber: true}).Gather(&acc) + assert.Equal(t, preDiskIOPoints+expectedAllDiskIOPoints+7, len(acc.Points)) + + dtags3 := map[string]string{ + "name": "sdb1", + } + + assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags3)) + assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags3)) + assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags3)) + assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags3)) + assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags3)) + assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags3)) + assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags3)) err = (&MemStats{&mps}).Gather(&acc) require.NoError(t, err)