diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e10bd93d..ddb7388b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,11 +6,13 @@ changed to just run docker commands in the Makefile. See `make docker-run` and `make docker-kill`. `make test` will still run all unit tests with docker. - Long unit tests are now run in CircleCI, with docker & race detector - Redis plugin tag has changed from `host` to `server` +- HAProxy plugin tag has changed from `host` to `server` ### Features - [#325](https://github.com/influxdb/telegraf/pull/325): NSQ output. Thanks @jrxFive! - [#318](https://github.com/influxdb/telegraf/pull/318): Prometheus output. Thanks @oldmantaiter! - [#338](https://github.com/influxdb/telegraf/pull/338): Restart Telegraf on package upgrade. Thanks @linsomniac! +- [#337](https://github.com/influxdb/telegraf/pull/337): Jolokia plugin, thanks @saiello! ### Bugfixes - [#331](https://github.com/influxdb/telegraf/pull/331): Dont overwrite host tag in redis plugin. diff --git a/LICENSE_OF_DEPENDENCIES.md b/LICENSE_OF_DEPENDENCIES.md index c16a73007..417d47d0e 100644 --- a/LICENSE_OF_DEPENDENCIES.md +++ b/LICENSE_OF_DEPENDENCIES.md @@ -28,6 +28,5 @@ - github.com/wvanbergen/kazoo-go [MIT LICENSE](https://github.com/wvanbergen/kazoo-go/blob/master/MIT-LICENSE) - gopkg.in/dancannon/gorethink.v1 [APACHE LICENSE](https://github.com/dancannon/gorethink/blob/v1.1.2/LICENSE) - gopkg.in/mgo.v2 [BSD LICENSE](https://github.com/go-mgo/mgo/blob/v2/LICENSE) +- golang.org/x/crypto/* [BSD LICENSE](https://github.com/golang/crypto/blob/master/LICENSE) -- golang.org/x/crypto/blowfish -- golang.org/x/crypto/bcrypt diff --git a/README.md b/README.md index aacf7a9ae..ae89e57d2 100644 --- a/README.md +++ b/README.md @@ -76,9 +76,12 @@ if you don't have it already. You also must build with golang version 1.4+. * Run `telegraf -sample-config > telegraf.conf` to create an initial configuration. * Or run `telegraf -sample-config -filter cpu:mem -outputfilter influxdb > telegraf.conf`. -to create a config file with only CPU and memory plugins defined, and InfluxDB output defined. +to create a config file with only CPU and memory plugins defined, and InfluxDB +output defined. * Edit the configuration to match your needs. -* Run `telegraf -config telegraf.conf -test` to output one full measurement sample to STDOUT. +* Run `telegraf -config telegraf.conf -test` to output one full measurement +sample to STDOUT. NOTE: you may want to run as the telegraf user if you are using +the linux packages `sudo -u telegraf telegraf -config telegraf.conf -test` * Run `telegraf -config telegraf.conf` to gather and send metrics to configured outputs. * Run `telegraf -config telegraf.conf -filter system:swap`. to run telegraf with only the system & swap plugins defined in the config. @@ -170,6 +173,7 @@ Telegraf currently has support for collecting metrics from: * exec (generic JSON-emitting executable plugin) * haproxy * httpjson (generic JSON-emitting http service plugin) +* jolokia (remote JMX with JSON over HTTP) * kafka_consumer * leofs * lustre2 diff --git a/plugins/all/all.go b/plugins/all/all.go index 452300eda..977b980b8 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -9,6 +9,7 @@ import ( _ "github.com/influxdb/telegraf/plugins/exec" _ "github.com/influxdb/telegraf/plugins/haproxy" _ "github.com/influxdb/telegraf/plugins/httpjson" + _ "github.com/influxdb/telegraf/plugins/jolokia" _ "github.com/influxdb/telegraf/plugins/kafka_consumer" _ "github.com/influxdb/telegraf/plugins/leofs" _ "github.com/influxdb/telegraf/plugins/lustre2" diff --git a/plugins/bcache/bcache.go b/plugins/bcache/bcache.go index ee63f3c48..76e638ea4 100644 --- a/plugins/bcache/bcache.go +++ b/plugins/bcache/bcache.go @@ -1,6 +1,7 @@ package bcache import ( + "errors" "io/ioutil" "os" "path/filepath" @@ -34,17 +35,6 @@ func (b *Bcache) Description() string { return "Read metrics of bcache from stats_total and dirty_data" } -func getBackingDevs(bcachePath string) []string { - bdevs, err := filepath.Glob(bcachePath + "/*/bdev*") - if len(bdevs) < 1 { - panic("Can't found any bcache device") - } - if err != nil { - panic(err) - } - return bdevs -} - func getTags(bdev string) map[string]string { backingDevFile, _ := os.Readlink(bdev) backingDevPath := strings.Split(backingDevFile, "/") @@ -83,11 +73,11 @@ func (b *Bcache) gatherBcache(bdev string, acc plugins.Accumulator) error { tags := getTags(bdev) metrics, err := filepath.Glob(bdev + "/stats_total/*") if len(metrics) < 0 { - panic("Can't read any stats file") + return errors.New("Can't read any stats file") } file, err := ioutil.ReadFile(bdev + "/dirty_data") if err != nil { - panic(err) + return err } rawValue := strings.TrimSpace(string(file)) value := prettyToBytes(rawValue) @@ -98,7 +88,7 @@ func (b *Bcache) gatherBcache(bdev string, acc plugins.Accumulator) error { file, err := ioutil.ReadFile(path) rawValue := strings.TrimSpace(string(file)) if err != nil { - panic(err) + return err } if key == "bypassed" { value := prettyToBytes(rawValue) @@ -125,7 +115,11 @@ func (b *Bcache) Gather(acc plugins.Accumulator) error { if len(bcachePath) == 0 { bcachePath = "/sys/fs/bcache" } - for _, bdev := range getBackingDevs(bcachePath) { + bdevs, _ := filepath.Glob(bcachePath + "/*/bdev*") + if len(bdevs) < 1 { + return errors.New("Can't found any bcache device") + } + for _, bdev := range bdevs { if restrictDevs { bcacheDev := getTags(bdev)["bcache_dev"] if !bcacheDevsChecked[bcacheDev] { diff --git a/plugins/haproxy/haproxy.go b/plugins/haproxy/haproxy.go index a1d14a32a..e28baf1e8 100644 --- a/plugins/haproxy/haproxy.go +++ b/plugins/haproxy/haproxy.go @@ -165,9 +165,9 @@ func importCsvResult(r io.Reader, acc plugins.Accumulator, host string) ([][]str for field, v := range row { tags := map[string]string{ - "host": host, - "proxy": row[HF_PXNAME], - "sv": row[HF_SVNAME], + "server": host, + "proxy": row[HF_PXNAME], + "sv": row[HF_SVNAME], } switch field { case HF_QCUR: diff --git a/plugins/haproxy/haproxy_test.go b/plugins/haproxy/haproxy_test.go index 0d63985d4..b87618700 100644 --- a/plugins/haproxy/haproxy_test.go +++ b/plugins/haproxy/haproxy_test.go @@ -42,9 +42,9 @@ func TestHaproxyGeneratesMetricsWithAuthentication(t *testing.T) { require.NoError(t, err) tags := map[string]string{ - "host": ts.Listener.Addr().String(), - "proxy": "be_app", - "sv": "host0", + "server": ts.Listener.Addr().String(), + "proxy": "be_app", + "sv": "host0", } assert.NoError(t, acc.ValidateTaggedValue("stot", uint64(171014), tags)) @@ -109,9 +109,9 @@ func TestHaproxyGeneratesMetricsWithoutAuthentication(t *testing.T) { require.NoError(t, err) tags := map[string]string{ - "proxy": "be_app", - "host": ts.Listener.Addr().String(), - "sv": "host0", + "proxy": "be_app", + "server": ts.Listener.Addr().String(), + "sv": "host0", } assert.NoError(t, acc.ValidateTaggedValue("stot", uint64(171014), tags)) diff --git a/plugins/httpjson/README.md b/plugins/httpjson/README.md index fd8bcb4cb..d016633a7 100644 --- a/plugins/httpjson/README.md +++ b/plugins/httpjson/README.md @@ -2,7 +2,8 @@ The httpjson plugin can collect data from remote URLs which respond with JSON. Then it flattens JSON and finds all numeric values, treating them as floats. -For example, if you have a service called _mycollector_, which has HTTP endpoint for gathering stats http://my.service.com/_stats: +For example, if you have a service called _mycollector_, which has HTTP endpoint for gathering stats at http://my.service.com/_stats, you would configure the HTTP JSON +plugin like this: ``` [[httpjson.services]] @@ -16,11 +17,11 @@ For example, if you have a service called _mycollector_, which has HTTP endpoint method = "GET" ``` -The name is used as a prefix for the measurements. +`name` is used as a prefix for the measurements. -The `method` specifies HTTP method to use for requests. +`method` specifies HTTP method to use for requests. -You can specify which keys from server response should be considered as tags: +You can also specify which keys from server response should be considered tags: ``` [[httpjson.services]] @@ -32,8 +33,6 @@ You can specify which keys from server response should be considered as tags: ] ``` -**NOTE**: tag values should be strings. - You can also specify additional request parameters for the service: ``` @@ -47,11 +46,30 @@ You can also specify additional request parameters for the service: ``` -# Sample +# Example: + +Let's say that we have a service named "mycollector" configured like this: + +``` +[httpjson] + [[httpjson.services]] + name = "mycollector" + + servers = [ + "http://my.service.com/_stats" + ] + + # HTTP method to use (case-sensitive) + method = "GET" + + tag_keys = ["service"] +``` + +which responds with the following JSON: -Let's say that we have a service named "mycollector", which responds with: ```json { + "service": "service01", "a": 0.5, "b": { "c": "some text", @@ -63,7 +81,68 @@ Let's say that we have a service named "mycollector", which responds with: The collected metrics will be: ``` -httpjson_mycollector_a value=0.5 -httpjson_mycollector_b_d value=0.1 -httpjson_mycollector_b_e value=5 +httpjson_mycollector_a,service='service01',server='http://my.service.com/_stats' value=0.5 +httpjson_mycollector_b_d,service='service01',server='http://my.service.com/_stats' value=0.1 +httpjson_mycollector_b_e,service='service01',server='http://my.service.com/_stats' value=5 +``` + +# Example 2, Multiple Services: + +There is also the option to collect JSON from multiple services, here is an +example doing that. + +``` +[httpjson] + [[httpjson.services]] + name = "mycollector1" + + servers = [ + "http://my.service1.com/_stats" + ] + + # HTTP method to use (case-sensitive) + method = "GET" + + [[httpjson.services]] + name = "mycollector2" + + servers = [ + "http://service.net/json/stats" + ] + + # HTTP method to use (case-sensitive) + method = "POST" +``` + +The services respond with the following JSON: + +mycollector1: +```json +{ + "a": 0.5, + "b": { + "c": "some text", + "d": 0.1, + "e": 5 + } +} +``` + +mycollector2: +```json +{ + "load": 100, + "users": 1335 +} +``` + +The collected metrics will be: + +``` +httpjson_mycollector1_a,server='http://my.service.com/_stats' value=0.5 +httpjson_mycollector1_b_d,server='http://my.service.com/_stats' value=0.1 +httpjson_mycollector1_b_e,server='http://my.service.com/_stats' value=5 + +httpjson_mycollector2_load,server='http://service.net/json/stats' value=100 +httpjson_mycollector2_users,server='http://service.net/json/stats' value=1335 ``` diff --git a/plugins/httpjson/httpjson.go b/plugins/httpjson/httpjson.go index a3f02e65f..b5913e2e1 100644 --- a/plugins/httpjson/httpjson.go +++ b/plugins/httpjson/httpjson.go @@ -127,7 +127,11 @@ func (h *HttpJson) Gather(acc plugins.Accumulator) error { // // Returns: // error: Any error that may have occurred -func (h *HttpJson) gatherServer(acc plugins.Accumulator, service Service, serverURL string) error { +func (h *HttpJson) gatherServer( + acc plugins.Accumulator, + service Service, + serverURL string, +) error { resp, err := h.sendRequest(service, serverURL) if err != nil { return err diff --git a/plugins/jolokia/README.md b/plugins/jolokia/README.md new file mode 100644 index 000000000..bda0c5f93 --- /dev/null +++ b/plugins/jolokia/README.md @@ -0,0 +1,51 @@ +# Telegraf plugin: Jolokia + +#### Plugin arguments: +- **context** string: Context root used of jolokia url +- **servers** []Server: List of servers + + **name** string: Server's logical name + + **host** string: Server's ip address or hostname + + **port** string: Server's listening port +- **metrics** []Metric + + **name** string: Name of the measure + + **jmx** string: Jmx path that identifies mbeans attributes + + **pass** []string: Attributes to retain when collecting values + + **drop** []string: Attributes to drop when collecting values + +#### Description + +The Jolokia plugin collects JVM metrics exposed as MBean's attributes through jolokia REST endpoint. All metrics +are collected for each server configured. + +See: https://jolokia.org/ + +# Measurements: +Jolokia plugin produces one measure for each metric configured, adding Server's `name`, `host` and `port` as tags. + +Given a configuration like: + +```ini +[jolokia] + +[[jolokia.servers]] + name = "as-service-1" + host = "127.0.0.1" + port = "8080" + +[[jolokia.servers]] + name = "as-service-2" + host = "127.0.0.1" + port = "8180" + +[[jolokia.metrics]] + name = "heap_memory_usage" + jmx = "/java.lang:type=Memory/HeapMemoryUsage" + pass = ["used", "max"] +``` + +The collected metrics will be: + +``` +jolokia_heap_memory_usage name=as-service-1,host=127.0.0.1,port=8080 used=xxx,max=yyy +jolokia_heap_memory_usage name=as-service-2,host=127.0.0.1,port=8180 used=vvv,max=zzz +``` diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go new file mode 100644 index 000000000..1ece12cf1 --- /dev/null +++ b/plugins/jolokia/jolokia.go @@ -0,0 +1,223 @@ +package jolokia + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "strings" + + "github.com/influxdb/telegraf/plugins" +) + +type Server struct { + Name string + Host string + Port string +} + +type Metric struct { + Name string + Jmx string + Pass []string + Drop []string +} + +type JolokiaClient interface { + MakeRequest(req *http.Request) (*http.Response, error) +} + +type JolokiaClientImpl struct { + client *http.Client +} + +func (c JolokiaClientImpl) MakeRequest(req *http.Request) (*http.Response, error) { + return c.client.Do(req) +} + +type Jolokia struct { + jClient JolokiaClient + Context string + Servers []Server + Metrics []Metric + Tags map[string]string +} + +func (j *Jolokia) SampleConfig() string { + return ` + # This is the context root used to compose the jolokia url + context = "/jolokia/read" + + # Tags added to each measurements + [jolokia.tags] + group = "as" + + # List of servers exposing jolokia read service + [[jolokia.servers]] + name = "stable" + host = "192.168.103.2" + port = "8180" + + # List of metrics collected on above servers + # Each metric consists in a name, a jmx path and either a pass or drop slice attributes + # This collect all heap memory usage metrics + [[jolokia.metrics]] + name = "heap_memory_usage" + jmx = "/java.lang:type=Memory/HeapMemoryUsage" + + + # This drops the 'committed' value from Eden space measurement + [[jolokia.metrics]] + name = "memory_eden" + jmx = "/java.lang:type=MemoryPool,name=PS Eden Space/Usage" + drop = [ "committed" ] + + + # This passes only DaemonThreadCount and ThreadCount + [[jolokia.metrics]] + name = "heap_threads" + jmx = "/java.lang:type=Threading" + pass = [ + "DaemonThreadCount", + "ThreadCount" + ] +` +} + +func (j *Jolokia) Description() string { + return "Read JMX metrics through Jolokia" +} + +func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) { + // Create + send request + req, err := http.NewRequest("GET", requestUrl.String(), nil) + if err != nil { + return nil, err + } + + resp, err := j.jClient.MakeRequest(req) + if err != nil { + return nil, err + } + + if err != nil { + return nil, err + } + defer resp.Body.Close() + + // Process response + if resp.StatusCode != http.StatusOK { + err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)", + requestUrl, + resp.StatusCode, + http.StatusText(resp.StatusCode), + http.StatusOK, + http.StatusText(http.StatusOK)) + return nil, err + } + + // read body + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + // Unmarshal json + var jsonOut map[string]interface{} + if err = json.Unmarshal([]byte(body), &jsonOut); err != nil { + return nil, errors.New("Error decoding JSON response") + } + + return jsonOut, nil +} + +func (m *Metric) shouldPass(field string) bool { + + if m.Pass != nil { + + for _, pass := range m.Pass { + if strings.HasPrefix(field, pass) { + return true + } + } + + return false + } + + if m.Drop != nil { + + for _, drop := range m.Drop { + if strings.HasPrefix(field, drop) { + return false + } + } + + return true + } + + return true +} + +func (m *Metric) filterFields(fields map[string]interface{}) map[string]interface{} { + + for field, _ := range fields { + if !m.shouldPass(field) { + delete(fields, field) + } + } + + return fields +} + +func (j *Jolokia) Gather(acc plugins.Accumulator) error { + + context := j.Context //"/jolokia/read" + servers := j.Servers + metrics := j.Metrics + tags := j.Tags + + if tags == nil { + tags = map[string]string{} + } + + for _, server := range servers { + for _, metric := range metrics { + + measurement := metric.Name + jmxPath := metric.Jmx + + tags["server"] = server.Name + tags["port"] = server.Port + tags["host"] = server.Host + + // Prepare URL + requestUrl, err := url.Parse("http://" + server.Host + ":" + server.Port + context + jmxPath) + if err != nil { + return err + } + + out, _ := j.getAttr(requestUrl) + + if values, ok := out["value"]; ok { + switch values.(type) { + case map[string]interface{}: + acc.AddFields(measurement, metric.filterFields(values.(map[string]interface{})), tags) + case interface{}: + acc.Add(measurement, values.(interface{}), tags) + } + } else { + fmt.Printf("Missing key 'value' in '%s' output response\n", requestUrl.String()) + } + } + } + + return nil +} + +func init() { + plugins.Add("jolokia", func() plugins.Plugin { + return &Jolokia{jClient: &JolokiaClientImpl{client: &http.Client{}}} + }) +} diff --git a/plugins/jolokia/jolokia_test.go b/plugins/jolokia/jolokia_test.go new file mode 100644 index 000000000..95df76e7b --- /dev/null +++ b/plugins/jolokia/jolokia_test.go @@ -0,0 +1,147 @@ +package jolokia + +import ( + _ "fmt" + "io/ioutil" + "net/http" + "strings" + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + _ "github.com/stretchr/testify/require" +) + +const validMultiValueJSON = ` +{ + "request":{ + "mbean":"java.lang:type=Memory", + "attribute":"HeapMemoryUsage", + "type":"read" + }, + "value":{ + "init":67108864, + "committed":456130560, + "max":477626368, + "used":203288528 + }, + "timestamp":1446129191, + "status":200 +}` + +const validSingleValueJSON = ` +{ + "request":{ + "path":"used", + "mbean":"java.lang:type=Memory", + "attribute":"HeapMemoryUsage", + "type":"read" + }, + "value":209274376, + "timestamp":1446129256, + "status":200 +}` + +const invalidJSON = "I don't think this is JSON" + +const empty = "" + +var Servers = []Server{Server{Name: "as1", Host: "127.0.0.1", Port: "8080"}} +var HeapMetric = Metric{Name: "heap_memory_usage", Jmx: "/java.lang:type=Memory/HeapMemoryUsage"} +var UsedHeapMetric = Metric{Name: "heap_memory_usage", Jmx: "/java.lang:type=Memory/HeapMemoryUsage", Pass: []string{"used"}} + +type jolokiaClientStub struct { + responseBody string + statusCode int +} + +func (c jolokiaClientStub) MakeRequest(req *http.Request) (*http.Response, error) { + resp := http.Response{} + resp.StatusCode = c.statusCode + resp.Body = ioutil.NopCloser(strings.NewReader(c.responseBody)) + return &resp, nil +} + +// Generates a pointer to an HttpJson object that uses a mock HTTP client. +// Parameters: +// response : Body of the response that the mock HTTP client should return +// statusCode: HTTP status code the mock HTTP client should return +// +// Returns: +// *HttpJson: Pointer to an HttpJson object that uses the generated mock HTTP client +func genJolokiaClientStub(response string, statusCode int, servers []Server, metrics []Metric) *Jolokia { + return &Jolokia{ + jClient: jolokiaClientStub{responseBody: response, statusCode: statusCode}, + Servers: servers, + Metrics: metrics, + } +} + +// Test that the proper values are ignored or collected +func TestHttpJsonMultiValue(t *testing.T) { + + jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{HeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Points)) + + assert.True(t, acc.CheckFieldsValue("heap_memory_usage", map[string]interface{}{"init": 67108864.0, + "committed": 456130560.0, + "max": 477626368.0, + "used": 203288528.0})) +} + +// Test that the proper values are ignored or collected +func TestHttpJsonMultiValueWithPass(t *testing.T) { + + jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{UsedHeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Points)) + + assert.True(t, acc.CheckFieldsValue("heap_memory_usage", map[string]interface{}{"used": 203288528.0})) +} + +// Test that the proper values are ignored or collected +func TestHttpJsonMultiValueTags(t *testing.T) { + + jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{UsedHeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Points)) + assert.NoError(t, acc.ValidateTaggedFieldsValue("heap_memory_usage", map[string]interface{}{"used": 203288528.0}, map[string]string{"host": "127.0.0.1", "port": "8080", "server": "as1"})) +} + +// Test that the proper values are ignored or collected +func TestHttpJsonSingleValueTags(t *testing.T) { + + jolokia := genJolokiaClientStub(validSingleValueJSON, 200, Servers, []Metric{UsedHeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Points)) + assert.NoError(t, acc.ValidateTaggedFieldsValue("heap_memory_usage", map[string]interface{}{"value": 209274376.0}, map[string]string{"host": "127.0.0.1", "port": "8080", "server": "as1"})) +} + +// Test that the proper values are ignored or collected +func TestHttpJsonOn404(t *testing.T) { + + jolokia := genJolokiaClientStub(validMultiValueJSON, 404, Servers, []Metric{UsedHeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 0, len(acc.Points)) +} diff --git a/plugins/kafka_consumer/kafka_consumer.go b/plugins/kafka_consumer/kafka_consumer.go index 7c1258944..1c0fbe266 100644 --- a/plugins/kafka_consumer/kafka_consumer.go +++ b/plugins/kafka_consumer/kafka_consumer.go @@ -74,7 +74,11 @@ func (k *Kafka) Gather(acc plugins.Accumulator) error { k.Consumer.Close() }() - go readFromKafka(k.Consumer.Messages(), metricQueue, k.BatchSize, k.Consumer.CommitUpto, halt) + go readFromKafka(k.Consumer.Messages(), + metricQueue, + k.BatchSize, + k.Consumer.CommitUpto, + halt) } return emitMetrics(k, acc, metricQueue) @@ -105,7 +109,13 @@ const millisecond = 1000000 * time.Nanosecond type ack func(*sarama.ConsumerMessage) error -func readFromKafka(kafkaMsgs <-chan *sarama.ConsumerMessage, metricProducer chan<- []byte, maxBatchSize int, ackMsg ack, halt <-chan bool) { +func readFromKafka( + kafkaMsgs <-chan *sarama.ConsumerMessage, + metricProducer chan<- []byte, + maxBatchSize int, + ackMsg ack, + halt <-chan bool, +) { batch := make([]byte, 0) currentBatchSize := 0 timeout := time.After(500 * millisecond) diff --git a/plugins/system/disk.go b/plugins/system/disk.go index 718d79949..784dcdb13 100644 --- a/plugins/system/disk.go +++ b/plugins/system/disk.go @@ -18,8 +18,7 @@ func (_ *DiskStats) Description() string { var diskSampleConfig = ` # By default, telegraf gather stats for all mountpoints. - # Setting mountpoints will restrict the stats to the specified ones. - # mountpoints. + # Setting mountpoints will restrict the stats to the specified mountpoints. # Mountpoints=["/"] ` @@ -64,13 +63,27 @@ func (s *DiskStats) Gather(acc plugins.Accumulator) error { type DiskIOStats struct { ps PS + + Devices []string + SkipSerialNumber bool } func (_ *DiskIOStats) Description() string { return "Read metrics about disk IO by device" } -func (_ *DiskIOStats) SampleConfig() string { return "" } +var diskIoSampleConfig = ` + # By default, telegraf will gather stats for all devices including + # disk partitions. + # Setting devices will restrict the stats to the specified devcies. + # Devices=["sda","sdb"] + # Uncomment the following line if you do not need disk serial numbers. + # SkipSerialNumber = true +` + +func (_ *DiskIOStats) SampleConfig() string { + return diskIoSampleConfig +} func (s *DiskIOStats) Gather(acc plugins.Accumulator) error { diskio, err := s.ps.DiskIO() @@ -78,12 +91,25 @@ func (s *DiskIOStats) Gather(acc plugins.Accumulator) error { return fmt.Errorf("error getting disk io info: %s", err) } + var restrictDevices bool + devices := make(map[string]bool) + if len(s.Devices) != 0 { + restrictDevices = true + for _, dev := range s.Devices { + devices[dev] = true + } + } + for _, io := range diskio { + _, member := devices[io.Name] + if restrictDevices && !member { + continue + } tags := map[string]string{} if len(io.Name) != 0 { tags["name"] = io.Name } - if len(io.SerialNumber) != 0 { + if len(io.SerialNumber) != 0 && !s.SkipSerialNumber { tags["serial"] = io.SerialNumber } diff --git a/plugins/system/system_test.go b/plugins/system/system_test.go index 5839c8c60..389965e3b 100644 --- a/plugins/system/system_test.go +++ b/plugins/system/system_test.go @@ -73,7 +73,8 @@ func TestSystemStats_GenerateStats(t *testing.T) { mps.On("DiskUsage").Return(du, nil) - diskio := disk.DiskIOCountersStat{ + diskio1 := disk.DiskIOCountersStat{ + ReadCount: 888, WriteCount: 5341, ReadBytes: 100000, @@ -84,8 +85,19 @@ func TestSystemStats_GenerateStats(t *testing.T) { IoTime: 123552, SerialNumber: "ab-123-ad", } + diskio2 := disk.DiskIOCountersStat{ + ReadCount: 444, + WriteCount: 2341, + ReadBytes: 200000, + WriteBytes: 400000, + ReadTime: 3123, + WriteTime: 6087, + Name: "sdb1", + IoTime: 246552, + SerialNumber: "bb-123-ad", + } - mps.On("DiskIO").Return(map[string]disk.DiskIOCountersStat{"sda1": diskio}, nil) + mps.On("DiskIO").Return(map[string]disk.DiskIOCountersStat{"sda1": diskio1, "sdb1": diskio2}, nil) netio := net.NetIOCountersStat{ Name: "eth0", @@ -262,21 +274,55 @@ func TestSystemStats_GenerateStats(t *testing.T) { assert.NoError(t, acc.ValidateTaggedValue("drop_in", uint64(7), ntags)) assert.NoError(t, acc.ValidateTaggedValue("drop_out", uint64(1), ntags)) - err = (&DiskIOStats{&mps}).Gather(&acc) + preDiskIOPoints := len(acc.Points) + + err = (&DiskIOStats{ps: &mps}).Gather(&acc) require.NoError(t, err) - dtags := map[string]string{ + numDiskIOPoints := len(acc.Points) - preDiskIOPoints + expectedAllDiskIOPoints := 14 + assert.Equal(t, expectedAllDiskIOPoints, numDiskIOPoints) + + dtags1 := map[string]string{ "name": "sda1", "serial": "ab-123-ad", } + dtags2 := map[string]string{ + "name": "sdb1", + "serial": "bb-123-ad", + } - assert.True(t, acc.CheckTaggedValue("reads", uint64(888), dtags)) - assert.True(t, acc.CheckTaggedValue("writes", uint64(5341), dtags)) - assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(100000), dtags)) - assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(200000), dtags)) - assert.True(t, acc.CheckTaggedValue("read_time", uint64(7123), dtags)) - assert.True(t, acc.CheckTaggedValue("write_time", uint64(9087), dtags)) - assert.True(t, acc.CheckTaggedValue("io_time", uint64(123552), dtags)) + assert.True(t, acc.CheckTaggedValue("reads", uint64(888), dtags1)) + assert.True(t, acc.CheckTaggedValue("writes", uint64(5341), dtags1)) + assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(100000), dtags1)) + assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(200000), dtags1)) + assert.True(t, acc.CheckTaggedValue("read_time", uint64(7123), dtags1)) + assert.True(t, acc.CheckTaggedValue("write_time", uint64(9087), dtags1)) + assert.True(t, acc.CheckTaggedValue("io_time", uint64(123552), dtags1)) + assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags2)) + assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags2)) + assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags2)) + assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags2)) + assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags2)) + assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags2)) + assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags2)) + + // We expect 7 more DiskIOPoints to show up with an explicit match on "sdb1" + // and serial should be missing from the tags with SkipSerialNumber set + err = (&DiskIOStats{ps: &mps, Devices: []string{"sdb1"}, SkipSerialNumber: true}).Gather(&acc) + assert.Equal(t, preDiskIOPoints+expectedAllDiskIOPoints+7, len(acc.Points)) + + dtags3 := map[string]string{ + "name": "sdb1", + } + + assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags3)) + assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags3)) + assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags3)) + assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags3)) + assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags3)) + assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags3)) + assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags3)) err = (&MemStats{&mps}).Gather(&acc) require.NoError(t, err) diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 361f81e9d..d8f44ddf8 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -106,15 +106,20 @@ func (a *Accumulator) Get(measurement string) (*Point, bool) { return nil, false } +// CheckValue calls CheckFieldsValue passing a single-value map as fields +func (a *Accumulator) CheckValue(measurement string, val interface{}) bool { + return a.CheckFieldsValue(measurement, map[string]interface{}{"value": val}) +} + // CheckValue checks that the accumulators point for the given measurement // is the same as the given value. -func (a *Accumulator) CheckValue(measurement string, val interface{}) bool { +func (a *Accumulator) CheckFieldsValue(measurement string, fields map[string]interface{}) bool { for _, p := range a.Points { if p.Measurement == measurement { - return p.Values["value"] == val + return reflect.DeepEqual(fields, p.Values) } } - fmt.Printf("CheckValue failed, measurement %s, value %s", measurement, val) + fmt.Printf("CheckFieldsValue failed, measurement %s, fields %s", measurement, fields) return false } @@ -127,12 +132,35 @@ func (a *Accumulator) CheckTaggedValue( return a.ValidateTaggedValue(measurement, val, tags) == nil } -// ValidateTaggedValue validates that the given measurement and value exist -// in the accumulator and with the given tags. +// ValidateTaggedValue calls ValidateTaggedFieldsValue passing a single-value map as fields func (a *Accumulator) ValidateTaggedValue( measurement string, val interface{}, tags map[string]string, +) error { + return a.ValidateTaggedFieldsValue(measurement, map[string]interface{}{"value": val}, tags) +} + +// ValidateValue calls ValidateTaggedValue +func (a *Accumulator) ValidateValue(measurement string, val interface{}) error { + return a.ValidateTaggedValue(measurement, val, nil) +} + +// CheckTaggedFieldsValue calls ValidateTaggedFieldsValue +func (a *Accumulator) CheckTaggedFieldsValue( + measurement string, + fields map[string]interface{}, + tags map[string]string, +) bool { + return a.ValidateTaggedFieldsValue(measurement, fields, tags) == nil +} + +// ValidateTaggedValue validates that the given measurement and value exist +// in the accumulator and with the given tags. +func (a *Accumulator) ValidateTaggedFieldsValue( + measurement string, + fields map[string]interface{}, + tags map[string]string, ) error { if tags == nil { tags = map[string]string{} @@ -143,9 +171,8 @@ func (a *Accumulator) ValidateTaggedValue( } if p.Measurement == measurement { - if p.Values["value"] != val { - return fmt.Errorf("%v (%T) != %v (%T)", - p.Values["value"], p.Values["value"], val, val) + if !reflect.DeepEqual(fields, p.Values) { + return fmt.Errorf("%v != %v ", fields, p.Values) } return nil } @@ -154,9 +181,12 @@ func (a *Accumulator) ValidateTaggedValue( return fmt.Errorf("unknown measurement %s with tags %v", measurement, tags) } -// ValidateValue calls ValidateTaggedValue -func (a *Accumulator) ValidateValue(measurement string, val interface{}) error { - return a.ValidateTaggedValue(measurement, val, nil) +// ValidateFieldsValue calls ValidateTaggedFieldsValue +func (a *Accumulator) ValidateFieldsValue( + measurement string, + fields map[string]interface{}, +) error { + return a.ValidateTaggedValue(measurement, fields, nil) } func (a *Accumulator) ValidateTaggedFields(