From befc9061676fa6a583bab4d826245ae5f7a3f8ef Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 2 Nov 2015 16:19:37 -0700 Subject: [PATCH 01/15] Improve the HTTP JSON plugin README with more examples. --- plugins/httpjson/README.md | 101 ++++++++++++++++++++--- plugins/httpjson/httpjson.go | 6 +- plugins/kafka_consumer/kafka_consumer.go | 14 +++- 3 files changed, 107 insertions(+), 14 deletions(-) diff --git a/plugins/httpjson/README.md b/plugins/httpjson/README.md index fd8bcb4cb..d016633a7 100644 --- a/plugins/httpjson/README.md +++ b/plugins/httpjson/README.md @@ -2,7 +2,8 @@ The httpjson plugin can collect data from remote URLs which respond with JSON. Then it flattens JSON and finds all numeric values, treating them as floats. -For example, if you have a service called _mycollector_, which has HTTP endpoint for gathering stats http://my.service.com/_stats: +For example, if you have a service called _mycollector_, which has HTTP endpoint for gathering stats at http://my.service.com/_stats, you would configure the HTTP JSON +plugin like this: ``` [[httpjson.services]] @@ -16,11 +17,11 @@ For example, if you have a service called _mycollector_, which has HTTP endpoint method = "GET" ``` -The name is used as a prefix for the measurements. +`name` is used as a prefix for the measurements. -The `method` specifies HTTP method to use for requests. +`method` specifies HTTP method to use for requests. -You can specify which keys from server response should be considered as tags: +You can also specify which keys from server response should be considered tags: ``` [[httpjson.services]] @@ -32,8 +33,6 @@ You can specify which keys from server response should be considered as tags: ] ``` -**NOTE**: tag values should be strings. - You can also specify additional request parameters for the service: ``` @@ -47,11 +46,30 @@ You can also specify additional request parameters for the service: ``` -# Sample +# Example: + +Let's say that we have a service named "mycollector" configured like this: + +``` +[httpjson] + [[httpjson.services]] + name = "mycollector" + + servers = [ + "http://my.service.com/_stats" + ] + + # HTTP method to use (case-sensitive) + method = "GET" + + tag_keys = ["service"] +``` + +which responds with the following JSON: -Let's say that we have a service named "mycollector", which responds with: ```json { + "service": "service01", "a": 0.5, "b": { "c": "some text", @@ -63,7 +81,68 @@ Let's say that we have a service named "mycollector", which responds with: The collected metrics will be: ``` -httpjson_mycollector_a value=0.5 -httpjson_mycollector_b_d value=0.1 -httpjson_mycollector_b_e value=5 +httpjson_mycollector_a,service='service01',server='http://my.service.com/_stats' value=0.5 +httpjson_mycollector_b_d,service='service01',server='http://my.service.com/_stats' value=0.1 +httpjson_mycollector_b_e,service='service01',server='http://my.service.com/_stats' value=5 +``` + +# Example 2, Multiple Services: + +There is also the option to collect JSON from multiple services, here is an +example doing that. + +``` +[httpjson] + [[httpjson.services]] + name = "mycollector1" + + servers = [ + "http://my.service1.com/_stats" + ] + + # HTTP method to use (case-sensitive) + method = "GET" + + [[httpjson.services]] + name = "mycollector2" + + servers = [ + "http://service.net/json/stats" + ] + + # HTTP method to use (case-sensitive) + method = "POST" +``` + +The services respond with the following JSON: + +mycollector1: +```json +{ + "a": 0.5, + "b": { + "c": "some text", + "d": 0.1, + "e": 5 + } +} +``` + +mycollector2: +```json +{ + "load": 100, + "users": 1335 +} +``` + +The collected metrics will be: + +``` +httpjson_mycollector1_a,server='http://my.service.com/_stats' value=0.5 +httpjson_mycollector1_b_d,server='http://my.service.com/_stats' value=0.1 +httpjson_mycollector1_b_e,server='http://my.service.com/_stats' value=5 + +httpjson_mycollector2_load,server='http://service.net/json/stats' value=100 +httpjson_mycollector2_users,server='http://service.net/json/stats' value=1335 ``` diff --git a/plugins/httpjson/httpjson.go b/plugins/httpjson/httpjson.go index a3f02e65f..b5913e2e1 100644 --- a/plugins/httpjson/httpjson.go +++ b/plugins/httpjson/httpjson.go @@ -127,7 +127,11 @@ func (h *HttpJson) Gather(acc plugins.Accumulator) error { // // Returns: // error: Any error that may have occurred -func (h *HttpJson) gatherServer(acc plugins.Accumulator, service Service, serverURL string) error { +func (h *HttpJson) gatherServer( + acc plugins.Accumulator, + service Service, + serverURL string, +) error { resp, err := h.sendRequest(service, serverURL) if err != nil { return err diff --git a/plugins/kafka_consumer/kafka_consumer.go b/plugins/kafka_consumer/kafka_consumer.go index 7c1258944..1c0fbe266 100644 --- a/plugins/kafka_consumer/kafka_consumer.go +++ b/plugins/kafka_consumer/kafka_consumer.go @@ -74,7 +74,11 @@ func (k *Kafka) Gather(acc plugins.Accumulator) error { k.Consumer.Close() }() - go readFromKafka(k.Consumer.Messages(), metricQueue, k.BatchSize, k.Consumer.CommitUpto, halt) + go readFromKafka(k.Consumer.Messages(), + metricQueue, + k.BatchSize, + k.Consumer.CommitUpto, + halt) } return emitMetrics(k, acc, metricQueue) @@ -105,7 +109,13 @@ const millisecond = 1000000 * time.Nanosecond type ack func(*sarama.ConsumerMessage) error -func readFromKafka(kafkaMsgs <-chan *sarama.ConsumerMessage, metricProducer chan<- []byte, maxBatchSize int, ackMsg ack, halt <-chan bool) { +func readFromKafka( + kafkaMsgs <-chan *sarama.ConsumerMessage, + metricProducer chan<- []byte, + maxBatchSize int, + ackMsg ack, + halt <-chan bool, +) { batch := make([]byte, 0) currentBatchSize := 0 timeout := time.After(500 * millisecond) From 6794fd06eb880fc26ac0b9e1bdbd83238c667759 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 3 Nov 2015 11:18:57 -0700 Subject: [PATCH 02/15] Suggest running as telegraf user in test mode in README Fixes #330 --- README.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index aacf7a9ae..4b6bd636f 100644 --- a/README.md +++ b/README.md @@ -76,9 +76,12 @@ if you don't have it already. You also must build with golang version 1.4+. * Run `telegraf -sample-config > telegraf.conf` to create an initial configuration. * Or run `telegraf -sample-config -filter cpu:mem -outputfilter influxdb > telegraf.conf`. -to create a config file with only CPU and memory plugins defined, and InfluxDB output defined. +to create a config file with only CPU and memory plugins defined, and InfluxDB +output defined. * Edit the configuration to match your needs. -* Run `telegraf -config telegraf.conf -test` to output one full measurement sample to STDOUT. +* Run `telegraf -config telegraf.conf -test` to output one full measurement +sample to STDOUT. NOTE: you may want to run as the telegraf user if you are using +the linux packages `sudo -u telegraf telegraf -config telegraf.conf -test` * Run `telegraf -config telegraf.conf` to gather and send metrics to configured outputs. * Run `telegraf -config telegraf.conf -filter system:swap`. to run telegraf with only the system & swap plugins defined in the config. From e2854232d0d6b517cde8a1bf13628ae4b3bf8506 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 3 Nov 2015 11:04:52 -0700 Subject: [PATCH 03/15] Change HAProxy plugin tag from host to server fixes #342 --- CHANGELOG.md | 1 + plugins/haproxy/haproxy.go | 6 +++--- plugins/haproxy/haproxy_test.go | 12 ++++++------ 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e10bd93d..eee4a9ad0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ changed to just run docker commands in the Makefile. See `make docker-run` and `make docker-kill`. `make test` will still run all unit tests with docker. - Long unit tests are now run in CircleCI, with docker & race detector - Redis plugin tag has changed from `host` to `server` +- HAProxy plugin tag has changed from `host` to `server` ### Features - [#325](https://github.com/influxdb/telegraf/pull/325): NSQ output. Thanks @jrxFive! diff --git a/plugins/haproxy/haproxy.go b/plugins/haproxy/haproxy.go index a1d14a32a..e28baf1e8 100644 --- a/plugins/haproxy/haproxy.go +++ b/plugins/haproxy/haproxy.go @@ -165,9 +165,9 @@ func importCsvResult(r io.Reader, acc plugins.Accumulator, host string) ([][]str for field, v := range row { tags := map[string]string{ - "host": host, - "proxy": row[HF_PXNAME], - "sv": row[HF_SVNAME], + "server": host, + "proxy": row[HF_PXNAME], + "sv": row[HF_SVNAME], } switch field { case HF_QCUR: diff --git a/plugins/haproxy/haproxy_test.go b/plugins/haproxy/haproxy_test.go index 0d63985d4..b87618700 100644 --- a/plugins/haproxy/haproxy_test.go +++ b/plugins/haproxy/haproxy_test.go @@ -42,9 +42,9 @@ func TestHaproxyGeneratesMetricsWithAuthentication(t *testing.T) { require.NoError(t, err) tags := map[string]string{ - "host": ts.Listener.Addr().String(), - "proxy": "be_app", - "sv": "host0", + "server": ts.Listener.Addr().String(), + "proxy": "be_app", + "sv": "host0", } assert.NoError(t, acc.ValidateTaggedValue("stot", uint64(171014), tags)) @@ -109,9 +109,9 @@ func TestHaproxyGeneratesMetricsWithoutAuthentication(t *testing.T) { require.NoError(t, err) tags := map[string]string{ - "proxy": "be_app", - "host": ts.Listener.Addr().String(), - "sv": "host0", + "proxy": "be_app", + "server": ts.Listener.Addr().String(), + "sv": "host0", } assert.NoError(t, acc.ValidateTaggedValue("stot", uint64(171014), tags)) From 4c2501be952061fc052b8c6613dc154f440bc42f Mon Sep 17 00:00:00 2001 From: Sean Beckett Date: Wed, 4 Nov 2015 15:30:01 -0800 Subject: [PATCH 04/15] updating Golang crypto --- LICENSE_OF_DEPENDENCIES.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/LICENSE_OF_DEPENDENCIES.md b/LICENSE_OF_DEPENDENCIES.md index c16a73007..417d47d0e 100644 --- a/LICENSE_OF_DEPENDENCIES.md +++ b/LICENSE_OF_DEPENDENCIES.md @@ -28,6 +28,5 @@ - github.com/wvanbergen/kazoo-go [MIT LICENSE](https://github.com/wvanbergen/kazoo-go/blob/master/MIT-LICENSE) - gopkg.in/dancannon/gorethink.v1 [APACHE LICENSE](https://github.com/dancannon/gorethink/blob/v1.1.2/LICENSE) - gopkg.in/mgo.v2 [BSD LICENSE](https://github.com/go-mgo/mgo/blob/v2/LICENSE) +- golang.org/x/crypto/* [BSD LICENSE](https://github.com/golang/crypto/blob/master/LICENSE) -- golang.org/x/crypto/blowfish -- golang.org/x/crypto/bcrypt From f05d89ed72a6b08d4c1c0fa1eb31147f800aa077 Mon Sep 17 00:00:00 2001 From: cornerot Date: Wed, 4 Nov 2015 14:03:43 +0300 Subject: [PATCH 05/15] removed "panic" from bcache plugin closes #343 --- plugins/bcache/bcache.go | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/plugins/bcache/bcache.go b/plugins/bcache/bcache.go index ee63f3c48..76e638ea4 100644 --- a/plugins/bcache/bcache.go +++ b/plugins/bcache/bcache.go @@ -1,6 +1,7 @@ package bcache import ( + "errors" "io/ioutil" "os" "path/filepath" @@ -34,17 +35,6 @@ func (b *Bcache) Description() string { return "Read metrics of bcache from stats_total and dirty_data" } -func getBackingDevs(bcachePath string) []string { - bdevs, err := filepath.Glob(bcachePath + "/*/bdev*") - if len(bdevs) < 1 { - panic("Can't found any bcache device") - } - if err != nil { - panic(err) - } - return bdevs -} - func getTags(bdev string) map[string]string { backingDevFile, _ := os.Readlink(bdev) backingDevPath := strings.Split(backingDevFile, "/") @@ -83,11 +73,11 @@ func (b *Bcache) gatherBcache(bdev string, acc plugins.Accumulator) error { tags := getTags(bdev) metrics, err := filepath.Glob(bdev + "/stats_total/*") if len(metrics) < 0 { - panic("Can't read any stats file") + return errors.New("Can't read any stats file") } file, err := ioutil.ReadFile(bdev + "/dirty_data") if err != nil { - panic(err) + return err } rawValue := strings.TrimSpace(string(file)) value := prettyToBytes(rawValue) @@ -98,7 +88,7 @@ func (b *Bcache) gatherBcache(bdev string, acc plugins.Accumulator) error { file, err := ioutil.ReadFile(path) rawValue := strings.TrimSpace(string(file)) if err != nil { - panic(err) + return err } if key == "bypassed" { value := prettyToBytes(rawValue) @@ -125,7 +115,11 @@ func (b *Bcache) Gather(acc plugins.Accumulator) error { if len(bcachePath) == 0 { bcachePath = "/sys/fs/bcache" } - for _, bdev := range getBackingDevs(bcachePath) { + bdevs, _ := filepath.Glob(bcachePath + "/*/bdev*") + if len(bdevs) < 1 { + return errors.New("Can't found any bcache device") + } + for _, bdev := range bdevs { if restrictDevs { bcacheDev := getTags(bdev)["bcache_dev"] if !bcacheDevsChecked[bcacheDev] { From 25fd4297a8d908af2a755fd6fb6d13865532be6b Mon Sep 17 00:00:00 2001 From: Simone Aiello Date: Wed, 28 Oct 2015 09:13:22 +0100 Subject: [PATCH 06/15] Jolokia plugin first commit --- plugins/all/all.go | 1 + plugins/jolokia/jolokia.go | 189 +++++++++++++++++++++++++++++++++++++ 2 files changed, 190 insertions(+) create mode 100644 plugins/jolokia/jolokia.go diff --git a/plugins/all/all.go b/plugins/all/all.go index f29a4987b..d714dd81e 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -9,6 +9,7 @@ import ( _ "github.com/influxdb/telegraf/plugins/exec" _ "github.com/influxdb/telegraf/plugins/haproxy" _ "github.com/influxdb/telegraf/plugins/httpjson" + _ "github.com/influxdb/telegraf/plugins/jolokia" _ "github.com/influxdb/telegraf/plugins/kafka_consumer" _ "github.com/influxdb/telegraf/plugins/leofs" _ "github.com/influxdb/telegraf/plugins/lustre2" diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go new file mode 100644 index 000000000..ee43246da --- /dev/null +++ b/plugins/jolokia/jolokia.go @@ -0,0 +1,189 @@ +package jolokia + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + // "net/url" + "strings" + // "sync" + + "github.com/influxdb/telegraf/plugins" +) + + +type Server struct { + Name string + Host string + Port string +} + +type Metric struct { + Name string + Jmx string + Pass []string + Drop []string +} + +type Jolokia struct { + + Context string + Servers []Server + Metrics []Metric + +} + + +func (j *Jolokia) SampleConfig() string { + return `[jolokia] + context = "/jolokia/read" + + [[jolokia.servers]] + name = "stable" + host = "192.168.103.2" + port = "8180" + + [[jolokia.metrics]] + name = "heap_memory_usage" + jmx = "/java.lang:type=Memory/HeapMemoryUsage" + pass = ["used"] + + [[jolokia.metrics]] + name = "memory_eden" + jmx = "/java.lang:type=MemoryPool,name=PS Eden Space/Usage" + pass = ["used"] + + [[jolokia.metrics]] + name = "heap_threads" + jmx = "/java.lang:type=Threading" + # drop = ["AllThread"] + pass = ["CurrentThreadCpuTime","CurrentThreadUserTime","DaemonThreadCount","ThreadCount","TotalStartedThreadCount"] +` +} + +func (j *Jolokia) Description() string { + return "Read JMX metrics through Jolokia" +} + + + +func getAttr(url string) (map[string]interface{}, error) { + //make request + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + // Process response + if resp.StatusCode != http.StatusOK { + err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)", + url, + resp.StatusCode, + http.StatusText(resp.StatusCode), + http.StatusOK, + http.StatusText(http.StatusOK)) + return nil, err + } + + // read body + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + // Unmarshal json + var jsonOut map[string]interface{} + if err = json.Unmarshal([]byte(body), &jsonOut); err != nil { + return nil, errors.New("Error decoding JSON response") + } + + return jsonOut, nil +} + +func (m *Metric) shouldPass(field string) bool { + + if m.Pass != nil { + + for _, pass := range m.Pass{ + if strings.HasPrefix(field, pass) { + return true + } + } + + return false + } + + if m.Drop != nil { + + for _, drop := range m.Drop{ + if strings.HasPrefix(field, drop) { + return false + } + } + + return true + } + + return true +} + +func (m *Metric) filterFields(fields map[string]interface{}) map[string]interface{} { + + for field, _ := range fields{ + if !m.shouldPass(field) { + delete(fields, field) + } + } + + return fields +} + + +func (j *Jolokia) Gather(acc plugins.Accumulator) error { + + context := j.Context //"/jolokia/read" + servers := j.Servers + metrics := j.Metrics + + var tags = map[string]string{ + "group": "application_server", + } + + for _, server := range servers { + for _, metric := range metrics { + + measurement := metric.Name + jmxPath := metric.Jmx + + tags["server"] = server.Name + tags["port"] = server.Port + tags["host"] = server.Host + + url := "http://" + server.Host + ":" + server.Port + context + jmxPath + //fmt.Println(url) + out, _ := getAttr(url) + + if values, ok := out["value"]; ok { + switch values.(type) { + case map[string]interface{}: + acc.AddFields(measurement, metric.filterFields(values.(map[string]interface{})), tags) + case interface{}: + acc.Add(measurement, values.(interface{}), tags) + } + }else{ + fmt.Println("Missing key value") + } + } + } + + return nil +} + +func init() { + plugins.Add("jolokia", func() plugins.Plugin { + return &Jolokia{} + }) +} From 2daa9ff260e4196feedefaa986629898b643dfce Mon Sep 17 00:00:00 2001 From: saiello Date: Wed, 28 Oct 2015 11:36:03 +0100 Subject: [PATCH 07/15] Added Tags as toml field --- plugins/jolokia/jolokia.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go index ee43246da..4dbd2fea0 100644 --- a/plugins/jolokia/jolokia.go +++ b/plugins/jolokia/jolokia.go @@ -32,7 +32,7 @@ type Jolokia struct { Context string Servers []Server Metrics []Metric - + Tags map[string]string } @@ -40,6 +40,9 @@ func (j *Jolokia) SampleConfig() string { return `[jolokia] context = "/jolokia/read" + [[jolokia.tags]] + group = "as" + [[jolokia.servers]] name = "stable" host = "192.168.103.2" @@ -147,10 +150,12 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error { context := j.Context //"/jolokia/read" servers := j.Servers metrics := j.Metrics + tags := j.Tags + + if tags == nil{ + tags = map[string]string{} + } - var tags = map[string]string{ - "group": "application_server", - } for _, server := range servers { for _, metric := range metrics { @@ -174,7 +179,7 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error { acc.Add(measurement, values.(interface{}), tags) } }else{ - fmt.Println("Missing key value") + fmt.Printf("Missing key 'value' in '%s' output response\n", url) } } } From 40d8aeecb096fc463b3c2148dff2de47663e43c4 Mon Sep 17 00:00:00 2001 From: saiello Date: Thu, 29 Oct 2015 13:25:16 +0100 Subject: [PATCH 08/15] Use url.Parse to validate configuration params --- plugins/jolokia/jolokia.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go index 4dbd2fea0..7ac73f02d 100644 --- a/plugins/jolokia/jolokia.go +++ b/plugins/jolokia/jolokia.go @@ -6,7 +6,7 @@ import ( "fmt" "io/ioutil" "net/http" - // "net/url" + "net/url" "strings" // "sync" @@ -72,9 +72,9 @@ func (j *Jolokia) Description() string { -func getAttr(url string) (map[string]interface{}, error) { +func getAttr(requestUrl *url.URL) (map[string]interface{}, error) { //make request - resp, err := http.Get(url) + resp, err := http.Get(requestUrl.String()) if err != nil { return nil, err } @@ -83,7 +83,7 @@ func getAttr(url string) (map[string]interface{}, error) { // Process response if resp.StatusCode != http.StatusOK { err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)", - url, + requestUrl, resp.StatusCode, http.StatusText(resp.StatusCode), http.StatusOK, @@ -167,9 +167,13 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error { tags["port"] = server.Port tags["host"] = server.Host - url := "http://" + server.Host + ":" + server.Port + context + jmxPath - //fmt.Println(url) - out, _ := getAttr(url) + // Prepare URL + requestUrl, err := url.Parse("http://" + server.Host + ":" + server.Port + context + jmxPath) + if err != nil { + return err + } + + out, _ := getAttr(requestUrl) if values, ok := out["value"]; ok { switch values.(type) { @@ -179,7 +183,7 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error { acc.Add(measurement, values.(interface{}), tags) } }else{ - fmt.Printf("Missing key 'value' in '%s' output response\n", url) + fmt.Printf("Missing key 'value' in '%s' output response\n", requestUrl.String()) } } } From 62270a369777bf309de0b38e06cfa9191e948453 Mon Sep 17 00:00:00 2001 From: saiello Date: Thu, 29 Oct 2015 14:48:39 +0100 Subject: [PATCH 09/15] go fmt run over jolokia.go --- plugins/jolokia/jolokia.go | 149 ++++++++++++++++++------------------- 1 file changed, 71 insertions(+), 78 deletions(-) diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go index 7ac73f02d..f33979b91 100644 --- a/plugins/jolokia/jolokia.go +++ b/plugins/jolokia/jolokia.go @@ -13,29 +13,26 @@ import ( "github.com/influxdb/telegraf/plugins" ) - type Server struct { - Name string - Host string - Port string + Name string + Host string + Port string } type Metric struct { - Name string - Jmx string - Pass []string - Drop []string + Name string + Jmx string + Pass []string + Drop []string } type Jolokia struct { - - Context string - Servers []Server - Metrics []Metric - Tags map[string]string + Context string + Servers []Server + Metrics []Metric + Tags map[string]string } - func (j *Jolokia) SampleConfig() string { return `[jolokia] context = "/jolokia/read" @@ -70,100 +67,96 @@ func (j *Jolokia) Description() string { return "Read JMX metrics through Jolokia" } - - func getAttr(requestUrl *url.URL) (map[string]interface{}, error) { - //make request + //make request resp, err := http.Get(requestUrl.String()) if err != nil { return nil, err } defer resp.Body.Close() - // Process response - if resp.StatusCode != http.StatusOK { - err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)", - requestUrl, - resp.StatusCode, - http.StatusText(resp.StatusCode), - http.StatusOK, - http.StatusText(http.StatusOK)) - return nil, err - } + // Process response + if resp.StatusCode != http.StatusOK { + err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)", + requestUrl, + resp.StatusCode, + http.StatusText(resp.StatusCode), + http.StatusOK, + http.StatusText(http.StatusOK)) + return nil, err + } - // read body - body, err := ioutil.ReadAll(resp.Body) + // read body + body, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, err } - // Unmarshal json - var jsonOut map[string]interface{} - if err = json.Unmarshal([]byte(body), &jsonOut); err != nil { - return nil, errors.New("Error decoding JSON response") - } + // Unmarshal json + var jsonOut map[string]interface{} + if err = json.Unmarshal([]byte(body), &jsonOut); err != nil { + return nil, errors.New("Error decoding JSON response") + } - return jsonOut, nil + return jsonOut, nil } func (m *Metric) shouldPass(field string) bool { - if m.Pass != nil { + if m.Pass != nil { - for _, pass := range m.Pass{ - if strings.HasPrefix(field, pass) { + for _, pass := range m.Pass { + if strings.HasPrefix(field, pass) { return true } - } + } - return false - } + return false + } - if m.Drop != nil { + if m.Drop != nil { - for _, drop := range m.Drop{ - if strings.HasPrefix(field, drop) { + for _, drop := range m.Drop { + if strings.HasPrefix(field, drop) { return false } - } + } - return true - } + return true + } - return true + return true } func (m *Metric) filterFields(fields map[string]interface{}) map[string]interface{} { - for field, _ := range fields{ - if !m.shouldPass(field) { + for field, _ := range fields { + if !m.shouldPass(field) { delete(fields, field) } - } + } - return fields + return fields } - func (j *Jolokia) Gather(acc plugins.Accumulator) error { - context := j.Context //"/jolokia/read" - servers := j.Servers - metrics := j.Metrics + context := j.Context //"/jolokia/read" + servers := j.Servers + metrics := j.Metrics tags := j.Tags - if tags == nil{ - tags = map[string]string{} + if tags == nil { + tags = map[string]string{} } + for _, server := range servers { + for _, metric := range metrics { - for _, server := range servers { - for _, metric := range metrics { + measurement := metric.Name + jmxPath := metric.Jmx - measurement := metric.Name - jmxPath := metric.Jmx - - tags["server"] = server.Name + tags["server"] = server.Name tags["port"] = server.Port tags["host"] = server.Host @@ -173,22 +166,22 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error { return err } - out, _ := getAttr(requestUrl) + out, _ := getAttr(requestUrl) - if values, ok := out["value"]; ok { - switch values.(type) { - case map[string]interface{}: - acc.AddFields(measurement, metric.filterFields(values.(map[string]interface{})), tags) - case interface{}: - acc.Add(measurement, values.(interface{}), tags) - } - }else{ - fmt.Printf("Missing key 'value' in '%s' output response\n", requestUrl.String()) - } - } - } + if values, ok := out["value"]; ok { + switch values.(type) { + case map[string]interface{}: + acc.AddFields(measurement, metric.filterFields(values.(map[string]interface{})), tags) + case interface{}: + acc.Add(measurement, values.(interface{}), tags) + } + } else { + fmt.Printf("Missing key 'value' in '%s' output response\n", requestUrl.String()) + } + } + } - return nil + return nil } func init() { From eabc0875deb6f9b1faaed0230f9b4a454736748f Mon Sep 17 00:00:00 2001 From: saiello Date: Thu, 29 Oct 2015 14:51:15 +0100 Subject: [PATCH 10/15] Fixed sampleconfig --- plugins/jolokia/jolokia.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go index f33979b91..e9230208d 100644 --- a/plugins/jolokia/jolokia.go +++ b/plugins/jolokia/jolokia.go @@ -34,10 +34,10 @@ type Jolokia struct { } func (j *Jolokia) SampleConfig() string { - return `[jolokia] + return ` context = "/jolokia/read" - [[jolokia.tags]] + [jolokia.tags] group = "as" [[jolokia.servers]] From 55c598f9ff4d4090c2ffb4a1eec2430a975b9158 Mon Sep 17 00:00:00 2001 From: saiello Date: Thu, 29 Oct 2015 17:00:26 +0100 Subject: [PATCH 11/15] Create a JolokiaClient. allowing to inject a stub implementation --- plugins/jolokia/jolokia.go | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go index e9230208d..a9ec11635 100644 --- a/plugins/jolokia/jolokia.go +++ b/plugins/jolokia/jolokia.go @@ -26,7 +26,20 @@ type Metric struct { Drop []string } +type JolokiaClient interface { + MakeRequest(req *http.Request) (*http.Response, error) +} + +type JolokiaClientImpl struct { + client *http.Client +} + +func (c JolokiaClientImpl) MakeRequest(req *http.Request) (*http.Response, error) { + return c.client.Do(req) +} + type Jolokia struct { + jClient JolokiaClient Context string Servers []Server Metrics []Metric @@ -67,9 +80,18 @@ func (j *Jolokia) Description() string { return "Read JMX metrics through Jolokia" } -func getAttr(requestUrl *url.URL) (map[string]interface{}, error) { - //make request - resp, err := http.Get(requestUrl.String()) +func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) { + // Create + send request + req, err := http.NewRequest("GET", requestUrl.String(), nil) + if err != nil { + return nil, err + } + + resp, err := j.jClient.MakeRequest(req) + if err != nil { + return nil, err + } + if err != nil { return nil, err } @@ -166,7 +188,7 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error { return err } - out, _ := getAttr(requestUrl) + out, _ := j.getAttr(requestUrl) if values, ok := out["value"]; ok { switch values.(type) { @@ -186,6 +208,6 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error { func init() { plugins.Add("jolokia", func() plugins.Plugin { - return &Jolokia{} + return &Jolokia{jClient: &JolokiaClientImpl{client: &http.Client{}}} }) } From b2e22cbc597e95960d372a311a650039cd4368bd Mon Sep 17 00:00:00 2001 From: saiello Date: Mon, 2 Nov 2015 12:09:30 +0100 Subject: [PATCH 12/15] Add fields value test methods --- testutil/accumulator.go | 52 ++++++++++++++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 361f81e9d..d8f44ddf8 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -106,15 +106,20 @@ func (a *Accumulator) Get(measurement string) (*Point, bool) { return nil, false } +// CheckValue calls CheckFieldsValue passing a single-value map as fields +func (a *Accumulator) CheckValue(measurement string, val interface{}) bool { + return a.CheckFieldsValue(measurement, map[string]interface{}{"value": val}) +} + // CheckValue checks that the accumulators point for the given measurement // is the same as the given value. -func (a *Accumulator) CheckValue(measurement string, val interface{}) bool { +func (a *Accumulator) CheckFieldsValue(measurement string, fields map[string]interface{}) bool { for _, p := range a.Points { if p.Measurement == measurement { - return p.Values["value"] == val + return reflect.DeepEqual(fields, p.Values) } } - fmt.Printf("CheckValue failed, measurement %s, value %s", measurement, val) + fmt.Printf("CheckFieldsValue failed, measurement %s, fields %s", measurement, fields) return false } @@ -127,12 +132,35 @@ func (a *Accumulator) CheckTaggedValue( return a.ValidateTaggedValue(measurement, val, tags) == nil } -// ValidateTaggedValue validates that the given measurement and value exist -// in the accumulator and with the given tags. +// ValidateTaggedValue calls ValidateTaggedFieldsValue passing a single-value map as fields func (a *Accumulator) ValidateTaggedValue( measurement string, val interface{}, tags map[string]string, +) error { + return a.ValidateTaggedFieldsValue(measurement, map[string]interface{}{"value": val}, tags) +} + +// ValidateValue calls ValidateTaggedValue +func (a *Accumulator) ValidateValue(measurement string, val interface{}) error { + return a.ValidateTaggedValue(measurement, val, nil) +} + +// CheckTaggedFieldsValue calls ValidateTaggedFieldsValue +func (a *Accumulator) CheckTaggedFieldsValue( + measurement string, + fields map[string]interface{}, + tags map[string]string, +) bool { + return a.ValidateTaggedFieldsValue(measurement, fields, tags) == nil +} + +// ValidateTaggedValue validates that the given measurement and value exist +// in the accumulator and with the given tags. +func (a *Accumulator) ValidateTaggedFieldsValue( + measurement string, + fields map[string]interface{}, + tags map[string]string, ) error { if tags == nil { tags = map[string]string{} @@ -143,9 +171,8 @@ func (a *Accumulator) ValidateTaggedValue( } if p.Measurement == measurement { - if p.Values["value"] != val { - return fmt.Errorf("%v (%T) != %v (%T)", - p.Values["value"], p.Values["value"], val, val) + if !reflect.DeepEqual(fields, p.Values) { + return fmt.Errorf("%v != %v ", fields, p.Values) } return nil } @@ -154,9 +181,12 @@ func (a *Accumulator) ValidateTaggedValue( return fmt.Errorf("unknown measurement %s with tags %v", measurement, tags) } -// ValidateValue calls ValidateTaggedValue -func (a *Accumulator) ValidateValue(measurement string, val interface{}) error { - return a.ValidateTaggedValue(measurement, val, nil) +// ValidateFieldsValue calls ValidateTaggedFieldsValue +func (a *Accumulator) ValidateFieldsValue( + measurement string, + fields map[string]interface{}, +) error { + return a.ValidateTaggedValue(measurement, fields, nil) } func (a *Accumulator) ValidateTaggedFields( From 921ffb7bdb974cab286ae2082e7fb4c6f75a3676 Mon Sep 17 00:00:00 2001 From: saiello Date: Mon, 2 Nov 2015 12:09:53 +0100 Subject: [PATCH 13/15] Test for jolokia plugin --- plugins/jolokia/jolokia_test.go | 147 ++++++++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 plugins/jolokia/jolokia_test.go diff --git a/plugins/jolokia/jolokia_test.go b/plugins/jolokia/jolokia_test.go new file mode 100644 index 000000000..95df76e7b --- /dev/null +++ b/plugins/jolokia/jolokia_test.go @@ -0,0 +1,147 @@ +package jolokia + +import ( + _ "fmt" + "io/ioutil" + "net/http" + "strings" + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + _ "github.com/stretchr/testify/require" +) + +const validMultiValueJSON = ` +{ + "request":{ + "mbean":"java.lang:type=Memory", + "attribute":"HeapMemoryUsage", + "type":"read" + }, + "value":{ + "init":67108864, + "committed":456130560, + "max":477626368, + "used":203288528 + }, + "timestamp":1446129191, + "status":200 +}` + +const validSingleValueJSON = ` +{ + "request":{ + "path":"used", + "mbean":"java.lang:type=Memory", + "attribute":"HeapMemoryUsage", + "type":"read" + }, + "value":209274376, + "timestamp":1446129256, + "status":200 +}` + +const invalidJSON = "I don't think this is JSON" + +const empty = "" + +var Servers = []Server{Server{Name: "as1", Host: "127.0.0.1", Port: "8080"}} +var HeapMetric = Metric{Name: "heap_memory_usage", Jmx: "/java.lang:type=Memory/HeapMemoryUsage"} +var UsedHeapMetric = Metric{Name: "heap_memory_usage", Jmx: "/java.lang:type=Memory/HeapMemoryUsage", Pass: []string{"used"}} + +type jolokiaClientStub struct { + responseBody string + statusCode int +} + +func (c jolokiaClientStub) MakeRequest(req *http.Request) (*http.Response, error) { + resp := http.Response{} + resp.StatusCode = c.statusCode + resp.Body = ioutil.NopCloser(strings.NewReader(c.responseBody)) + return &resp, nil +} + +// Generates a pointer to an HttpJson object that uses a mock HTTP client. +// Parameters: +// response : Body of the response that the mock HTTP client should return +// statusCode: HTTP status code the mock HTTP client should return +// +// Returns: +// *HttpJson: Pointer to an HttpJson object that uses the generated mock HTTP client +func genJolokiaClientStub(response string, statusCode int, servers []Server, metrics []Metric) *Jolokia { + return &Jolokia{ + jClient: jolokiaClientStub{responseBody: response, statusCode: statusCode}, + Servers: servers, + Metrics: metrics, + } +} + +// Test that the proper values are ignored or collected +func TestHttpJsonMultiValue(t *testing.T) { + + jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{HeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Points)) + + assert.True(t, acc.CheckFieldsValue("heap_memory_usage", map[string]interface{}{"init": 67108864.0, + "committed": 456130560.0, + "max": 477626368.0, + "used": 203288528.0})) +} + +// Test that the proper values are ignored or collected +func TestHttpJsonMultiValueWithPass(t *testing.T) { + + jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{UsedHeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Points)) + + assert.True(t, acc.CheckFieldsValue("heap_memory_usage", map[string]interface{}{"used": 203288528.0})) +} + +// Test that the proper values are ignored or collected +func TestHttpJsonMultiValueTags(t *testing.T) { + + jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{UsedHeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Points)) + assert.NoError(t, acc.ValidateTaggedFieldsValue("heap_memory_usage", map[string]interface{}{"used": 203288528.0}, map[string]string{"host": "127.0.0.1", "port": "8080", "server": "as1"})) +} + +// Test that the proper values are ignored or collected +func TestHttpJsonSingleValueTags(t *testing.T) { + + jolokia := genJolokiaClientStub(validSingleValueJSON, 200, Servers, []Metric{UsedHeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Points)) + assert.NoError(t, acc.ValidateTaggedFieldsValue("heap_memory_usage", map[string]interface{}{"value": 209274376.0}, map[string]string{"host": "127.0.0.1", "port": "8080", "server": "as1"})) +} + +// Test that the proper values are ignored or collected +func TestHttpJsonOn404(t *testing.T) { + + jolokia := genJolokiaClientStub(validMultiValueJSON, 404, Servers, []Metric{UsedHeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 0, len(acc.Points)) +} From acf1da4d3057919a29721d697c241ed128c2bd8a Mon Sep 17 00:00:00 2001 From: saiello Date: Tue, 3 Nov 2015 22:00:23 +0100 Subject: [PATCH 14/15] Added jolokia README.md closes #337 --- CHANGELOG.md | 1 + README.md | 1 + plugins/jolokia/README.md | 51 ++++++++++++++++++++++++++++++++++++++ plugins/jolokia/jolokia.go | 40 +++++++++++++++++++----------- 4 files changed, 78 insertions(+), 15 deletions(-) create mode 100644 plugins/jolokia/README.md diff --git a/CHANGELOG.md b/CHANGELOG.md index eee4a9ad0..ddb7388b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ changed to just run docker commands in the Makefile. See `make docker-run` and - [#325](https://github.com/influxdb/telegraf/pull/325): NSQ output. Thanks @jrxFive! - [#318](https://github.com/influxdb/telegraf/pull/318): Prometheus output. Thanks @oldmantaiter! - [#338](https://github.com/influxdb/telegraf/pull/338): Restart Telegraf on package upgrade. Thanks @linsomniac! +- [#337](https://github.com/influxdb/telegraf/pull/337): Jolokia plugin, thanks @saiello! ### Bugfixes - [#331](https://github.com/influxdb/telegraf/pull/331): Dont overwrite host tag in redis plugin. diff --git a/README.md b/README.md index 4b6bd636f..ae89e57d2 100644 --- a/README.md +++ b/README.md @@ -173,6 +173,7 @@ Telegraf currently has support for collecting metrics from: * exec (generic JSON-emitting executable plugin) * haproxy * httpjson (generic JSON-emitting http service plugin) +* jolokia (remote JMX with JSON over HTTP) * kafka_consumer * leofs * lustre2 diff --git a/plugins/jolokia/README.md b/plugins/jolokia/README.md new file mode 100644 index 000000000..bda0c5f93 --- /dev/null +++ b/plugins/jolokia/README.md @@ -0,0 +1,51 @@ +# Telegraf plugin: Jolokia + +#### Plugin arguments: +- **context** string: Context root used of jolokia url +- **servers** []Server: List of servers + + **name** string: Server's logical name + + **host** string: Server's ip address or hostname + + **port** string: Server's listening port +- **metrics** []Metric + + **name** string: Name of the measure + + **jmx** string: Jmx path that identifies mbeans attributes + + **pass** []string: Attributes to retain when collecting values + + **drop** []string: Attributes to drop when collecting values + +#### Description + +The Jolokia plugin collects JVM metrics exposed as MBean's attributes through jolokia REST endpoint. All metrics +are collected for each server configured. + +See: https://jolokia.org/ + +# Measurements: +Jolokia plugin produces one measure for each metric configured, adding Server's `name`, `host` and `port` as tags. + +Given a configuration like: + +```ini +[jolokia] + +[[jolokia.servers]] + name = "as-service-1" + host = "127.0.0.1" + port = "8080" + +[[jolokia.servers]] + name = "as-service-2" + host = "127.0.0.1" + port = "8180" + +[[jolokia.metrics]] + name = "heap_memory_usage" + jmx = "/java.lang:type=Memory/HeapMemoryUsage" + pass = ["used", "max"] +``` + +The collected metrics will be: + +``` +jolokia_heap_memory_usage name=as-service-1,host=127.0.0.1,port=8080 used=xxx,max=yyy +jolokia_heap_memory_usage name=as-service-2,host=127.0.0.1,port=8180 used=vvv,max=zzz +``` diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go index a9ec11635..1ece12cf1 100644 --- a/plugins/jolokia/jolokia.go +++ b/plugins/jolokia/jolokia.go @@ -8,7 +8,6 @@ import ( "net/http" "net/url" "strings" - // "sync" "github.com/influxdb/telegraf/plugins" ) @@ -48,31 +47,42 @@ type Jolokia struct { func (j *Jolokia) SampleConfig() string { return ` + # This is the context root used to compose the jolokia url context = "/jolokia/read" + # Tags added to each measurements [jolokia.tags] group = "as" + # List of servers exposing jolokia read service [[jolokia.servers]] - name = "stable" - host = "192.168.103.2" - port = "8180" + name = "stable" + host = "192.168.103.2" + port = "8180" + # List of metrics collected on above servers + # Each metric consists in a name, a jmx path and either a pass or drop slice attributes + # This collect all heap memory usage metrics [[jolokia.metrics]] - name = "heap_memory_usage" - jmx = "/java.lang:type=Memory/HeapMemoryUsage" - pass = ["used"] + name = "heap_memory_usage" + jmx = "/java.lang:type=Memory/HeapMemoryUsage" - [[jolokia.metrics]] - name = "memory_eden" - jmx = "/java.lang:type=MemoryPool,name=PS Eden Space/Usage" - pass = ["used"] + # This drops the 'committed' value from Eden space measurement [[jolokia.metrics]] - name = "heap_threads" - jmx = "/java.lang:type=Threading" - # drop = ["AllThread"] - pass = ["CurrentThreadCpuTime","CurrentThreadUserTime","DaemonThreadCount","ThreadCount","TotalStartedThreadCount"] + name = "memory_eden" + jmx = "/java.lang:type=MemoryPool,name=PS Eden Space/Usage" + drop = [ "committed" ] + + + # This passes only DaemonThreadCount and ThreadCount + [[jolokia.metrics]] + name = "heap_threads" + jmx = "/java.lang:type=Threading" + pass = [ + "DaemonThreadCount", + "ThreadCount" + ] ` } From 00614026b3561e86d6d74f2b9700744d7857a03e Mon Sep 17 00:00:00 2001 From: Subhachandra Chandra Date: Wed, 4 Nov 2015 17:21:42 -0800 Subject: [PATCH 15/15] Added parameters "Devices" and "SkipSerialNumber to DiskIO plugin. "Devices" can be used to specify storage devices on which stats should be reported. "SkipSerialNumber" can be used to omit the device serial number. Added tests to verify the new parameters. closes #344 --- plugins/system/disk.go | 34 +++++++++++++++--- plugins/system/system_test.go | 68 +++++++++++++++++++++++++++++------ 2 files changed, 87 insertions(+), 15 deletions(-) diff --git a/plugins/system/disk.go b/plugins/system/disk.go index 718d79949..784dcdb13 100644 --- a/plugins/system/disk.go +++ b/plugins/system/disk.go @@ -18,8 +18,7 @@ func (_ *DiskStats) Description() string { var diskSampleConfig = ` # By default, telegraf gather stats for all mountpoints. - # Setting mountpoints will restrict the stats to the specified ones. - # mountpoints. + # Setting mountpoints will restrict the stats to the specified mountpoints. # Mountpoints=["/"] ` @@ -64,13 +63,27 @@ func (s *DiskStats) Gather(acc plugins.Accumulator) error { type DiskIOStats struct { ps PS + + Devices []string + SkipSerialNumber bool } func (_ *DiskIOStats) Description() string { return "Read metrics about disk IO by device" } -func (_ *DiskIOStats) SampleConfig() string { return "" } +var diskIoSampleConfig = ` + # By default, telegraf will gather stats for all devices including + # disk partitions. + # Setting devices will restrict the stats to the specified devcies. + # Devices=["sda","sdb"] + # Uncomment the following line if you do not need disk serial numbers. + # SkipSerialNumber = true +` + +func (_ *DiskIOStats) SampleConfig() string { + return diskIoSampleConfig +} func (s *DiskIOStats) Gather(acc plugins.Accumulator) error { diskio, err := s.ps.DiskIO() @@ -78,12 +91,25 @@ func (s *DiskIOStats) Gather(acc plugins.Accumulator) error { return fmt.Errorf("error getting disk io info: %s", err) } + var restrictDevices bool + devices := make(map[string]bool) + if len(s.Devices) != 0 { + restrictDevices = true + for _, dev := range s.Devices { + devices[dev] = true + } + } + for _, io := range diskio { + _, member := devices[io.Name] + if restrictDevices && !member { + continue + } tags := map[string]string{} if len(io.Name) != 0 { tags["name"] = io.Name } - if len(io.SerialNumber) != 0 { + if len(io.SerialNumber) != 0 && !s.SkipSerialNumber { tags["serial"] = io.SerialNumber } diff --git a/plugins/system/system_test.go b/plugins/system/system_test.go index 5839c8c60..389965e3b 100644 --- a/plugins/system/system_test.go +++ b/plugins/system/system_test.go @@ -73,7 +73,8 @@ func TestSystemStats_GenerateStats(t *testing.T) { mps.On("DiskUsage").Return(du, nil) - diskio := disk.DiskIOCountersStat{ + diskio1 := disk.DiskIOCountersStat{ + ReadCount: 888, WriteCount: 5341, ReadBytes: 100000, @@ -84,8 +85,19 @@ func TestSystemStats_GenerateStats(t *testing.T) { IoTime: 123552, SerialNumber: "ab-123-ad", } + diskio2 := disk.DiskIOCountersStat{ + ReadCount: 444, + WriteCount: 2341, + ReadBytes: 200000, + WriteBytes: 400000, + ReadTime: 3123, + WriteTime: 6087, + Name: "sdb1", + IoTime: 246552, + SerialNumber: "bb-123-ad", + } - mps.On("DiskIO").Return(map[string]disk.DiskIOCountersStat{"sda1": diskio}, nil) + mps.On("DiskIO").Return(map[string]disk.DiskIOCountersStat{"sda1": diskio1, "sdb1": diskio2}, nil) netio := net.NetIOCountersStat{ Name: "eth0", @@ -262,21 +274,55 @@ func TestSystemStats_GenerateStats(t *testing.T) { assert.NoError(t, acc.ValidateTaggedValue("drop_in", uint64(7), ntags)) assert.NoError(t, acc.ValidateTaggedValue("drop_out", uint64(1), ntags)) - err = (&DiskIOStats{&mps}).Gather(&acc) + preDiskIOPoints := len(acc.Points) + + err = (&DiskIOStats{ps: &mps}).Gather(&acc) require.NoError(t, err) - dtags := map[string]string{ + numDiskIOPoints := len(acc.Points) - preDiskIOPoints + expectedAllDiskIOPoints := 14 + assert.Equal(t, expectedAllDiskIOPoints, numDiskIOPoints) + + dtags1 := map[string]string{ "name": "sda1", "serial": "ab-123-ad", } + dtags2 := map[string]string{ + "name": "sdb1", + "serial": "bb-123-ad", + } - assert.True(t, acc.CheckTaggedValue("reads", uint64(888), dtags)) - assert.True(t, acc.CheckTaggedValue("writes", uint64(5341), dtags)) - assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(100000), dtags)) - assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(200000), dtags)) - assert.True(t, acc.CheckTaggedValue("read_time", uint64(7123), dtags)) - assert.True(t, acc.CheckTaggedValue("write_time", uint64(9087), dtags)) - assert.True(t, acc.CheckTaggedValue("io_time", uint64(123552), dtags)) + assert.True(t, acc.CheckTaggedValue("reads", uint64(888), dtags1)) + assert.True(t, acc.CheckTaggedValue("writes", uint64(5341), dtags1)) + assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(100000), dtags1)) + assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(200000), dtags1)) + assert.True(t, acc.CheckTaggedValue("read_time", uint64(7123), dtags1)) + assert.True(t, acc.CheckTaggedValue("write_time", uint64(9087), dtags1)) + assert.True(t, acc.CheckTaggedValue("io_time", uint64(123552), dtags1)) + assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags2)) + assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags2)) + assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags2)) + assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags2)) + assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags2)) + assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags2)) + assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags2)) + + // We expect 7 more DiskIOPoints to show up with an explicit match on "sdb1" + // and serial should be missing from the tags with SkipSerialNumber set + err = (&DiskIOStats{ps: &mps, Devices: []string{"sdb1"}, SkipSerialNumber: true}).Gather(&acc) + assert.Equal(t, preDiskIOPoints+expectedAllDiskIOPoints+7, len(acc.Points)) + + dtags3 := map[string]string{ + "name": "sdb1", + } + + assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags3)) + assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags3)) + assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags3)) + assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags3)) + assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags3)) + assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags3)) + assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags3)) err = (&MemStats{&mps}).Gather(&acc) require.NoError(t, err)