Merge remote-tracking branch 'upstream/master'

This commit is contained in:
Roman Statsevich 2015-11-09 15:19:36 +03:00
commit 0e32fa03c5
16 changed files with 684 additions and 68 deletions

View File

@ -6,11 +6,13 @@ changed to just run docker commands in the Makefile. See `make docker-run` and
`make docker-kill`. `make test` will still run all unit tests with docker. `make docker-kill`. `make test` will still run all unit tests with docker.
- Long unit tests are now run in CircleCI, with docker & race detector - Long unit tests are now run in CircleCI, with docker & race detector
- Redis plugin tag has changed from `host` to `server` - Redis plugin tag has changed from `host` to `server`
- HAProxy plugin tag has changed from `host` to `server`
### Features ### Features
- [#325](https://github.com/influxdb/telegraf/pull/325): NSQ output. Thanks @jrxFive! - [#325](https://github.com/influxdb/telegraf/pull/325): NSQ output. Thanks @jrxFive!
- [#318](https://github.com/influxdb/telegraf/pull/318): Prometheus output. Thanks @oldmantaiter! - [#318](https://github.com/influxdb/telegraf/pull/318): Prometheus output. Thanks @oldmantaiter!
- [#338](https://github.com/influxdb/telegraf/pull/338): Restart Telegraf on package upgrade. Thanks @linsomniac! - [#338](https://github.com/influxdb/telegraf/pull/338): Restart Telegraf on package upgrade. Thanks @linsomniac!
- [#337](https://github.com/influxdb/telegraf/pull/337): Jolokia plugin, thanks @saiello!
### Bugfixes ### Bugfixes
- [#331](https://github.com/influxdb/telegraf/pull/331): Dont overwrite host tag in redis plugin. - [#331](https://github.com/influxdb/telegraf/pull/331): Dont overwrite host tag in redis plugin.

View File

@ -28,6 +28,5 @@
- github.com/wvanbergen/kazoo-go [MIT LICENSE](https://github.com/wvanbergen/kazoo-go/blob/master/MIT-LICENSE) - github.com/wvanbergen/kazoo-go [MIT LICENSE](https://github.com/wvanbergen/kazoo-go/blob/master/MIT-LICENSE)
- gopkg.in/dancannon/gorethink.v1 [APACHE LICENSE](https://github.com/dancannon/gorethink/blob/v1.1.2/LICENSE) - gopkg.in/dancannon/gorethink.v1 [APACHE LICENSE](https://github.com/dancannon/gorethink/blob/v1.1.2/LICENSE)
- gopkg.in/mgo.v2 [BSD LICENSE](https://github.com/go-mgo/mgo/blob/v2/LICENSE) - gopkg.in/mgo.v2 [BSD LICENSE](https://github.com/go-mgo/mgo/blob/v2/LICENSE)
- golang.org/x/crypto/* [BSD LICENSE](https://github.com/golang/crypto/blob/master/LICENSE)
- golang.org/x/crypto/blowfish
- golang.org/x/crypto/bcrypt

View File

@ -76,9 +76,12 @@ if you don't have it already. You also must build with golang version 1.4+.
* Run `telegraf -sample-config > telegraf.conf` to create an initial configuration. * Run `telegraf -sample-config > telegraf.conf` to create an initial configuration.
* Or run `telegraf -sample-config -filter cpu:mem -outputfilter influxdb > telegraf.conf`. * Or run `telegraf -sample-config -filter cpu:mem -outputfilter influxdb > telegraf.conf`.
to create a config file with only CPU and memory plugins defined, and InfluxDB output defined. to create a config file with only CPU and memory plugins defined, and InfluxDB
output defined.
* Edit the configuration to match your needs. * Edit the configuration to match your needs.
* Run `telegraf -config telegraf.conf -test` to output one full measurement sample to STDOUT. * Run `telegraf -config telegraf.conf -test` to output one full measurement
sample to STDOUT. NOTE: you may want to run as the telegraf user if you are using
the linux packages `sudo -u telegraf telegraf -config telegraf.conf -test`
* Run `telegraf -config telegraf.conf` to gather and send metrics to configured outputs. * Run `telegraf -config telegraf.conf` to gather and send metrics to configured outputs.
* Run `telegraf -config telegraf.conf -filter system:swap`. * Run `telegraf -config telegraf.conf -filter system:swap`.
to run telegraf with only the system & swap plugins defined in the config. to run telegraf with only the system & swap plugins defined in the config.
@ -170,6 +173,7 @@ Telegraf currently has support for collecting metrics from:
* exec (generic JSON-emitting executable plugin) * exec (generic JSON-emitting executable plugin)
* haproxy * haproxy
* httpjson (generic JSON-emitting http service plugin) * httpjson (generic JSON-emitting http service plugin)
* jolokia (remote JMX with JSON over HTTP)
* kafka_consumer * kafka_consumer
* leofs * leofs
* lustre2 * lustre2

View File

@ -9,6 +9,7 @@ import (
_ "github.com/influxdb/telegraf/plugins/exec" _ "github.com/influxdb/telegraf/plugins/exec"
_ "github.com/influxdb/telegraf/plugins/haproxy" _ "github.com/influxdb/telegraf/plugins/haproxy"
_ "github.com/influxdb/telegraf/plugins/httpjson" _ "github.com/influxdb/telegraf/plugins/httpjson"
_ "github.com/influxdb/telegraf/plugins/jolokia"
_ "github.com/influxdb/telegraf/plugins/kafka_consumer" _ "github.com/influxdb/telegraf/plugins/kafka_consumer"
_ "github.com/influxdb/telegraf/plugins/leofs" _ "github.com/influxdb/telegraf/plugins/leofs"
_ "github.com/influxdb/telegraf/plugins/lustre2" _ "github.com/influxdb/telegraf/plugins/lustre2"

View File

@ -1,6 +1,7 @@
package bcache package bcache
import ( import (
"errors"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
@ -34,17 +35,6 @@ func (b *Bcache) Description() string {
return "Read metrics of bcache from stats_total and dirty_data" return "Read metrics of bcache from stats_total and dirty_data"
} }
func getBackingDevs(bcachePath string) []string {
bdevs, err := filepath.Glob(bcachePath + "/*/bdev*")
if len(bdevs) < 1 {
panic("Can't found any bcache device")
}
if err != nil {
panic(err)
}
return bdevs
}
func getTags(bdev string) map[string]string { func getTags(bdev string) map[string]string {
backingDevFile, _ := os.Readlink(bdev) backingDevFile, _ := os.Readlink(bdev)
backingDevPath := strings.Split(backingDevFile, "/") backingDevPath := strings.Split(backingDevFile, "/")
@ -83,11 +73,11 @@ func (b *Bcache) gatherBcache(bdev string, acc plugins.Accumulator) error {
tags := getTags(bdev) tags := getTags(bdev)
metrics, err := filepath.Glob(bdev + "/stats_total/*") metrics, err := filepath.Glob(bdev + "/stats_total/*")
if len(metrics) < 0 { if len(metrics) < 0 {
panic("Can't read any stats file") return errors.New("Can't read any stats file")
} }
file, err := ioutil.ReadFile(bdev + "/dirty_data") file, err := ioutil.ReadFile(bdev + "/dirty_data")
if err != nil { if err != nil {
panic(err) return err
} }
rawValue := strings.TrimSpace(string(file)) rawValue := strings.TrimSpace(string(file))
value := prettyToBytes(rawValue) value := prettyToBytes(rawValue)
@ -98,7 +88,7 @@ func (b *Bcache) gatherBcache(bdev string, acc plugins.Accumulator) error {
file, err := ioutil.ReadFile(path) file, err := ioutil.ReadFile(path)
rawValue := strings.TrimSpace(string(file)) rawValue := strings.TrimSpace(string(file))
if err != nil { if err != nil {
panic(err) return err
} }
if key == "bypassed" { if key == "bypassed" {
value := prettyToBytes(rawValue) value := prettyToBytes(rawValue)
@ -125,7 +115,11 @@ func (b *Bcache) Gather(acc plugins.Accumulator) error {
if len(bcachePath) == 0 { if len(bcachePath) == 0 {
bcachePath = "/sys/fs/bcache" bcachePath = "/sys/fs/bcache"
} }
for _, bdev := range getBackingDevs(bcachePath) { bdevs, _ := filepath.Glob(bcachePath + "/*/bdev*")
if len(bdevs) < 1 {
return errors.New("Can't found any bcache device")
}
for _, bdev := range bdevs {
if restrictDevs { if restrictDevs {
bcacheDev := getTags(bdev)["bcache_dev"] bcacheDev := getTags(bdev)["bcache_dev"]
if !bcacheDevsChecked[bcacheDev] { if !bcacheDevsChecked[bcacheDev] {

View File

@ -165,7 +165,7 @@ func importCsvResult(r io.Reader, acc plugins.Accumulator, host string) ([][]str
for field, v := range row { for field, v := range row {
tags := map[string]string{ tags := map[string]string{
"host": host, "server": host,
"proxy": row[HF_PXNAME], "proxy": row[HF_PXNAME],
"sv": row[HF_SVNAME], "sv": row[HF_SVNAME],
} }

View File

@ -42,7 +42,7 @@ func TestHaproxyGeneratesMetricsWithAuthentication(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
tags := map[string]string{ tags := map[string]string{
"host": ts.Listener.Addr().String(), "server": ts.Listener.Addr().String(),
"proxy": "be_app", "proxy": "be_app",
"sv": "host0", "sv": "host0",
} }
@ -110,7 +110,7 @@ func TestHaproxyGeneratesMetricsWithoutAuthentication(t *testing.T) {
tags := map[string]string{ tags := map[string]string{
"proxy": "be_app", "proxy": "be_app",
"host": ts.Listener.Addr().String(), "server": ts.Listener.Addr().String(),
"sv": "host0", "sv": "host0",
} }

View File

@ -2,7 +2,8 @@
The httpjson plugin can collect data from remote URLs which respond with JSON. Then it flattens JSON and finds all numeric values, treating them as floats. The httpjson plugin can collect data from remote URLs which respond with JSON. Then it flattens JSON and finds all numeric values, treating them as floats.
For example, if you have a service called _mycollector_, which has HTTP endpoint for gathering stats http://my.service.com/_stats: For example, if you have a service called _mycollector_, which has HTTP endpoint for gathering stats at http://my.service.com/_stats, you would configure the HTTP JSON
plugin like this:
``` ```
[[httpjson.services]] [[httpjson.services]]
@ -16,11 +17,11 @@ For example, if you have a service called _mycollector_, which has HTTP endpoint
method = "GET" method = "GET"
``` ```
The name is used as a prefix for the measurements. `name` is used as a prefix for the measurements.
The `method` specifies HTTP method to use for requests. `method` specifies HTTP method to use for requests.
You can specify which keys from server response should be considered as tags: You can also specify which keys from server response should be considered tags:
``` ```
[[httpjson.services]] [[httpjson.services]]
@ -32,8 +33,6 @@ You can specify which keys from server response should be considered as tags:
] ]
``` ```
**NOTE**: tag values should be strings.
You can also specify additional request parameters for the service: You can also specify additional request parameters for the service:
``` ```
@ -47,11 +46,30 @@ You can also specify additional request parameters for the service:
``` ```
# Sample # Example:
Let's say that we have a service named "mycollector" configured like this:
```
[httpjson]
[[httpjson.services]]
name = "mycollector"
servers = [
"http://my.service.com/_stats"
]
# HTTP method to use (case-sensitive)
method = "GET"
tag_keys = ["service"]
```
which responds with the following JSON:
Let's say that we have a service named "mycollector", which responds with:
```json ```json
{ {
"service": "service01",
"a": 0.5, "a": 0.5,
"b": { "b": {
"c": "some text", "c": "some text",
@ -63,7 +81,68 @@ Let's say that we have a service named "mycollector", which responds with:
The collected metrics will be: The collected metrics will be:
``` ```
httpjson_mycollector_a value=0.5 httpjson_mycollector_a,service='service01',server='http://my.service.com/_stats' value=0.5
httpjson_mycollector_b_d value=0.1 httpjson_mycollector_b_d,service='service01',server='http://my.service.com/_stats' value=0.1
httpjson_mycollector_b_e value=5 httpjson_mycollector_b_e,service='service01',server='http://my.service.com/_stats' value=5
```
# Example 2, Multiple Services:
There is also the option to collect JSON from multiple services, here is an
example doing that.
```
[httpjson]
[[httpjson.services]]
name = "mycollector1"
servers = [
"http://my.service1.com/_stats"
]
# HTTP method to use (case-sensitive)
method = "GET"
[[httpjson.services]]
name = "mycollector2"
servers = [
"http://service.net/json/stats"
]
# HTTP method to use (case-sensitive)
method = "POST"
```
The services respond with the following JSON:
mycollector1:
```json
{
"a": 0.5,
"b": {
"c": "some text",
"d": 0.1,
"e": 5
}
}
```
mycollector2:
```json
{
"load": 100,
"users": 1335
}
```
The collected metrics will be:
```
httpjson_mycollector1_a,server='http://my.service.com/_stats' value=0.5
httpjson_mycollector1_b_d,server='http://my.service.com/_stats' value=0.1
httpjson_mycollector1_b_e,server='http://my.service.com/_stats' value=5
httpjson_mycollector2_load,server='http://service.net/json/stats' value=100
httpjson_mycollector2_users,server='http://service.net/json/stats' value=1335
``` ```

View File

@ -127,7 +127,11 @@ func (h *HttpJson) Gather(acc plugins.Accumulator) error {
// //
// Returns: // Returns:
// error: Any error that may have occurred // error: Any error that may have occurred
func (h *HttpJson) gatherServer(acc plugins.Accumulator, service Service, serverURL string) error { func (h *HttpJson) gatherServer(
acc plugins.Accumulator,
service Service,
serverURL string,
) error {
resp, err := h.sendRequest(service, serverURL) resp, err := h.sendRequest(service, serverURL)
if err != nil { if err != nil {
return err return err

51
plugins/jolokia/README.md Normal file
View File

@ -0,0 +1,51 @@
# Telegraf plugin: Jolokia
#### Plugin arguments:
- **context** string: Context root used of jolokia url
- **servers** []Server: List of servers
+ **name** string: Server's logical name
+ **host** string: Server's ip address or hostname
+ **port** string: Server's listening port
- **metrics** []Metric
+ **name** string: Name of the measure
+ **jmx** string: Jmx path that identifies mbeans attributes
+ **pass** []string: Attributes to retain when collecting values
+ **drop** []string: Attributes to drop when collecting values
#### Description
The Jolokia plugin collects JVM metrics exposed as MBean's attributes through jolokia REST endpoint. All metrics
are collected for each server configured.
See: https://jolokia.org/
# Measurements:
Jolokia plugin produces one measure for each metric configured, adding Server's `name`, `host` and `port` as tags.
Given a configuration like:
```ini
[jolokia]
[[jolokia.servers]]
name = "as-service-1"
host = "127.0.0.1"
port = "8080"
[[jolokia.servers]]
name = "as-service-2"
host = "127.0.0.1"
port = "8180"
[[jolokia.metrics]]
name = "heap_memory_usage"
jmx = "/java.lang:type=Memory/HeapMemoryUsage"
pass = ["used", "max"]
```
The collected metrics will be:
```
jolokia_heap_memory_usage name=as-service-1,host=127.0.0.1,port=8080 used=xxx,max=yyy
jolokia_heap_memory_usage name=as-service-2,host=127.0.0.1,port=8180 used=vvv,max=zzz
```

223
plugins/jolokia/jolokia.go Normal file
View File

@ -0,0 +1,223 @@
package jolokia
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"
"github.com/influxdb/telegraf/plugins"
)
type Server struct {
Name string
Host string
Port string
}
type Metric struct {
Name string
Jmx string
Pass []string
Drop []string
}
type JolokiaClient interface {
MakeRequest(req *http.Request) (*http.Response, error)
}
type JolokiaClientImpl struct {
client *http.Client
}
func (c JolokiaClientImpl) MakeRequest(req *http.Request) (*http.Response, error) {
return c.client.Do(req)
}
type Jolokia struct {
jClient JolokiaClient
Context string
Servers []Server
Metrics []Metric
Tags map[string]string
}
func (j *Jolokia) SampleConfig() string {
return `
# This is the context root used to compose the jolokia url
context = "/jolokia/read"
# Tags added to each measurements
[jolokia.tags]
group = "as"
# List of servers exposing jolokia read service
[[jolokia.servers]]
name = "stable"
host = "192.168.103.2"
port = "8180"
# List of metrics collected on above servers
# Each metric consists in a name, a jmx path and either a pass or drop slice attributes
# This collect all heap memory usage metrics
[[jolokia.metrics]]
name = "heap_memory_usage"
jmx = "/java.lang:type=Memory/HeapMemoryUsage"
# This drops the 'committed' value from Eden space measurement
[[jolokia.metrics]]
name = "memory_eden"
jmx = "/java.lang:type=MemoryPool,name=PS Eden Space/Usage"
drop = [ "committed" ]
# This passes only DaemonThreadCount and ThreadCount
[[jolokia.metrics]]
name = "heap_threads"
jmx = "/java.lang:type=Threading"
pass = [
"DaemonThreadCount",
"ThreadCount"
]
`
}
func (j *Jolokia) Description() string {
return "Read JMX metrics through Jolokia"
}
func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
// Create + send request
req, err := http.NewRequest("GET", requestUrl.String(), nil)
if err != nil {
return nil, err
}
resp, err := j.jClient.MakeRequest(req)
if err != nil {
return nil, err
}
if err != nil {
return nil, err
}
defer resp.Body.Close()
// Process response
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)",
requestUrl,
resp.StatusCode,
http.StatusText(resp.StatusCode),
http.StatusOK,
http.StatusText(http.StatusOK))
return nil, err
}
// read body
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
// Unmarshal json
var jsonOut map[string]interface{}
if err = json.Unmarshal([]byte(body), &jsonOut); err != nil {
return nil, errors.New("Error decoding JSON response")
}
return jsonOut, nil
}
func (m *Metric) shouldPass(field string) bool {
if m.Pass != nil {
for _, pass := range m.Pass {
if strings.HasPrefix(field, pass) {
return true
}
}
return false
}
if m.Drop != nil {
for _, drop := range m.Drop {
if strings.HasPrefix(field, drop) {
return false
}
}
return true
}
return true
}
func (m *Metric) filterFields(fields map[string]interface{}) map[string]interface{} {
for field, _ := range fields {
if !m.shouldPass(field) {
delete(fields, field)
}
}
return fields
}
func (j *Jolokia) Gather(acc plugins.Accumulator) error {
context := j.Context //"/jolokia/read"
servers := j.Servers
metrics := j.Metrics
tags := j.Tags
if tags == nil {
tags = map[string]string{}
}
for _, server := range servers {
for _, metric := range metrics {
measurement := metric.Name
jmxPath := metric.Jmx
tags["server"] = server.Name
tags["port"] = server.Port
tags["host"] = server.Host
// Prepare URL
requestUrl, err := url.Parse("http://" + server.Host + ":" + server.Port + context + jmxPath)
if err != nil {
return err
}
out, _ := j.getAttr(requestUrl)
if values, ok := out["value"]; ok {
switch values.(type) {
case map[string]interface{}:
acc.AddFields(measurement, metric.filterFields(values.(map[string]interface{})), tags)
case interface{}:
acc.Add(measurement, values.(interface{}), tags)
}
} else {
fmt.Printf("Missing key 'value' in '%s' output response\n", requestUrl.String())
}
}
}
return nil
}
func init() {
plugins.Add("jolokia", func() plugins.Plugin {
return &Jolokia{jClient: &JolokiaClientImpl{client: &http.Client{}}}
})
}

View File

@ -0,0 +1,147 @@
package jolokia
import (
_ "fmt"
"io/ioutil"
"net/http"
"strings"
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
_ "github.com/stretchr/testify/require"
)
const validMultiValueJSON = `
{
"request":{
"mbean":"java.lang:type=Memory",
"attribute":"HeapMemoryUsage",
"type":"read"
},
"value":{
"init":67108864,
"committed":456130560,
"max":477626368,
"used":203288528
},
"timestamp":1446129191,
"status":200
}`
const validSingleValueJSON = `
{
"request":{
"path":"used",
"mbean":"java.lang:type=Memory",
"attribute":"HeapMemoryUsage",
"type":"read"
},
"value":209274376,
"timestamp":1446129256,
"status":200
}`
const invalidJSON = "I don't think this is JSON"
const empty = ""
var Servers = []Server{Server{Name: "as1", Host: "127.0.0.1", Port: "8080"}}
var HeapMetric = Metric{Name: "heap_memory_usage", Jmx: "/java.lang:type=Memory/HeapMemoryUsage"}
var UsedHeapMetric = Metric{Name: "heap_memory_usage", Jmx: "/java.lang:type=Memory/HeapMemoryUsage", Pass: []string{"used"}}
type jolokiaClientStub struct {
responseBody string
statusCode int
}
func (c jolokiaClientStub) MakeRequest(req *http.Request) (*http.Response, error) {
resp := http.Response{}
resp.StatusCode = c.statusCode
resp.Body = ioutil.NopCloser(strings.NewReader(c.responseBody))
return &resp, nil
}
// Generates a pointer to an HttpJson object that uses a mock HTTP client.
// Parameters:
// response : Body of the response that the mock HTTP client should return
// statusCode: HTTP status code the mock HTTP client should return
//
// Returns:
// *HttpJson: Pointer to an HttpJson object that uses the generated mock HTTP client
func genJolokiaClientStub(response string, statusCode int, servers []Server, metrics []Metric) *Jolokia {
return &Jolokia{
jClient: jolokiaClientStub{responseBody: response, statusCode: statusCode},
Servers: servers,
Metrics: metrics,
}
}
// Test that the proper values are ignored or collected
func TestHttpJsonMultiValue(t *testing.T) {
jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{HeapMetric})
var acc testutil.Accumulator
err := jolokia.Gather(&acc)
assert.Nil(t, err)
assert.Equal(t, 1, len(acc.Points))
assert.True(t, acc.CheckFieldsValue("heap_memory_usage", map[string]interface{}{"init": 67108864.0,
"committed": 456130560.0,
"max": 477626368.0,
"used": 203288528.0}))
}
// Test that the proper values are ignored or collected
func TestHttpJsonMultiValueWithPass(t *testing.T) {
jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{UsedHeapMetric})
var acc testutil.Accumulator
err := jolokia.Gather(&acc)
assert.Nil(t, err)
assert.Equal(t, 1, len(acc.Points))
assert.True(t, acc.CheckFieldsValue("heap_memory_usage", map[string]interface{}{"used": 203288528.0}))
}
// Test that the proper values are ignored or collected
func TestHttpJsonMultiValueTags(t *testing.T) {
jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{UsedHeapMetric})
var acc testutil.Accumulator
err := jolokia.Gather(&acc)
assert.Nil(t, err)
assert.Equal(t, 1, len(acc.Points))
assert.NoError(t, acc.ValidateTaggedFieldsValue("heap_memory_usage", map[string]interface{}{"used": 203288528.0}, map[string]string{"host": "127.0.0.1", "port": "8080", "server": "as1"}))
}
// Test that the proper values are ignored or collected
func TestHttpJsonSingleValueTags(t *testing.T) {
jolokia := genJolokiaClientStub(validSingleValueJSON, 200, Servers, []Metric{UsedHeapMetric})
var acc testutil.Accumulator
err := jolokia.Gather(&acc)
assert.Nil(t, err)
assert.Equal(t, 1, len(acc.Points))
assert.NoError(t, acc.ValidateTaggedFieldsValue("heap_memory_usage", map[string]interface{}{"value": 209274376.0}, map[string]string{"host": "127.0.0.1", "port": "8080", "server": "as1"}))
}
// Test that the proper values are ignored or collected
func TestHttpJsonOn404(t *testing.T) {
jolokia := genJolokiaClientStub(validMultiValueJSON, 404, Servers, []Metric{UsedHeapMetric})
var acc testutil.Accumulator
err := jolokia.Gather(&acc)
assert.Nil(t, err)
assert.Equal(t, 0, len(acc.Points))
}

View File

@ -74,7 +74,11 @@ func (k *Kafka) Gather(acc plugins.Accumulator) error {
k.Consumer.Close() k.Consumer.Close()
}() }()
go readFromKafka(k.Consumer.Messages(), metricQueue, k.BatchSize, k.Consumer.CommitUpto, halt) go readFromKafka(k.Consumer.Messages(),
metricQueue,
k.BatchSize,
k.Consumer.CommitUpto,
halt)
} }
return emitMetrics(k, acc, metricQueue) return emitMetrics(k, acc, metricQueue)
@ -105,7 +109,13 @@ const millisecond = 1000000 * time.Nanosecond
type ack func(*sarama.ConsumerMessage) error type ack func(*sarama.ConsumerMessage) error
func readFromKafka(kafkaMsgs <-chan *sarama.ConsumerMessage, metricProducer chan<- []byte, maxBatchSize int, ackMsg ack, halt <-chan bool) { func readFromKafka(
kafkaMsgs <-chan *sarama.ConsumerMessage,
metricProducer chan<- []byte,
maxBatchSize int,
ackMsg ack,
halt <-chan bool,
) {
batch := make([]byte, 0) batch := make([]byte, 0)
currentBatchSize := 0 currentBatchSize := 0
timeout := time.After(500 * millisecond) timeout := time.After(500 * millisecond)

View File

@ -18,8 +18,7 @@ func (_ *DiskStats) Description() string {
var diskSampleConfig = ` var diskSampleConfig = `
# By default, telegraf gather stats for all mountpoints. # By default, telegraf gather stats for all mountpoints.
# Setting mountpoints will restrict the stats to the specified ones. # Setting mountpoints will restrict the stats to the specified mountpoints.
# mountpoints.
# Mountpoints=["/"] # Mountpoints=["/"]
` `
@ -64,13 +63,27 @@ func (s *DiskStats) Gather(acc plugins.Accumulator) error {
type DiskIOStats struct { type DiskIOStats struct {
ps PS ps PS
Devices []string
SkipSerialNumber bool
} }
func (_ *DiskIOStats) Description() string { func (_ *DiskIOStats) Description() string {
return "Read metrics about disk IO by device" return "Read metrics about disk IO by device"
} }
func (_ *DiskIOStats) SampleConfig() string { return "" } var diskIoSampleConfig = `
# By default, telegraf will gather stats for all devices including
# disk partitions.
# Setting devices will restrict the stats to the specified devcies.
# Devices=["sda","sdb"]
# Uncomment the following line if you do not need disk serial numbers.
# SkipSerialNumber = true
`
func (_ *DiskIOStats) SampleConfig() string {
return diskIoSampleConfig
}
func (s *DiskIOStats) Gather(acc plugins.Accumulator) error { func (s *DiskIOStats) Gather(acc plugins.Accumulator) error {
diskio, err := s.ps.DiskIO() diskio, err := s.ps.DiskIO()
@ -78,12 +91,25 @@ func (s *DiskIOStats) Gather(acc plugins.Accumulator) error {
return fmt.Errorf("error getting disk io info: %s", err) return fmt.Errorf("error getting disk io info: %s", err)
} }
var restrictDevices bool
devices := make(map[string]bool)
if len(s.Devices) != 0 {
restrictDevices = true
for _, dev := range s.Devices {
devices[dev] = true
}
}
for _, io := range diskio { for _, io := range diskio {
_, member := devices[io.Name]
if restrictDevices && !member {
continue
}
tags := map[string]string{} tags := map[string]string{}
if len(io.Name) != 0 { if len(io.Name) != 0 {
tags["name"] = io.Name tags["name"] = io.Name
} }
if len(io.SerialNumber) != 0 { if len(io.SerialNumber) != 0 && !s.SkipSerialNumber {
tags["serial"] = io.SerialNumber tags["serial"] = io.SerialNumber
} }

View File

@ -73,7 +73,8 @@ func TestSystemStats_GenerateStats(t *testing.T) {
mps.On("DiskUsage").Return(du, nil) mps.On("DiskUsage").Return(du, nil)
diskio := disk.DiskIOCountersStat{ diskio1 := disk.DiskIOCountersStat{
ReadCount: 888, ReadCount: 888,
WriteCount: 5341, WriteCount: 5341,
ReadBytes: 100000, ReadBytes: 100000,
@ -84,8 +85,19 @@ func TestSystemStats_GenerateStats(t *testing.T) {
IoTime: 123552, IoTime: 123552,
SerialNumber: "ab-123-ad", SerialNumber: "ab-123-ad",
} }
diskio2 := disk.DiskIOCountersStat{
ReadCount: 444,
WriteCount: 2341,
ReadBytes: 200000,
WriteBytes: 400000,
ReadTime: 3123,
WriteTime: 6087,
Name: "sdb1",
IoTime: 246552,
SerialNumber: "bb-123-ad",
}
mps.On("DiskIO").Return(map[string]disk.DiskIOCountersStat{"sda1": diskio}, nil) mps.On("DiskIO").Return(map[string]disk.DiskIOCountersStat{"sda1": diskio1, "sdb1": diskio2}, nil)
netio := net.NetIOCountersStat{ netio := net.NetIOCountersStat{
Name: "eth0", Name: "eth0",
@ -262,21 +274,55 @@ func TestSystemStats_GenerateStats(t *testing.T) {
assert.NoError(t, acc.ValidateTaggedValue("drop_in", uint64(7), ntags)) assert.NoError(t, acc.ValidateTaggedValue("drop_in", uint64(7), ntags))
assert.NoError(t, acc.ValidateTaggedValue("drop_out", uint64(1), ntags)) assert.NoError(t, acc.ValidateTaggedValue("drop_out", uint64(1), ntags))
err = (&DiskIOStats{&mps}).Gather(&acc) preDiskIOPoints := len(acc.Points)
err = (&DiskIOStats{ps: &mps}).Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
dtags := map[string]string{ numDiskIOPoints := len(acc.Points) - preDiskIOPoints
expectedAllDiskIOPoints := 14
assert.Equal(t, expectedAllDiskIOPoints, numDiskIOPoints)
dtags1 := map[string]string{
"name": "sda1", "name": "sda1",
"serial": "ab-123-ad", "serial": "ab-123-ad",
} }
dtags2 := map[string]string{
"name": "sdb1",
"serial": "bb-123-ad",
}
assert.True(t, acc.CheckTaggedValue("reads", uint64(888), dtags)) assert.True(t, acc.CheckTaggedValue("reads", uint64(888), dtags1))
assert.True(t, acc.CheckTaggedValue("writes", uint64(5341), dtags)) assert.True(t, acc.CheckTaggedValue("writes", uint64(5341), dtags1))
assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(100000), dtags)) assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(100000), dtags1))
assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(200000), dtags)) assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(200000), dtags1))
assert.True(t, acc.CheckTaggedValue("read_time", uint64(7123), dtags)) assert.True(t, acc.CheckTaggedValue("read_time", uint64(7123), dtags1))
assert.True(t, acc.CheckTaggedValue("write_time", uint64(9087), dtags)) assert.True(t, acc.CheckTaggedValue("write_time", uint64(9087), dtags1))
assert.True(t, acc.CheckTaggedValue("io_time", uint64(123552), dtags)) assert.True(t, acc.CheckTaggedValue("io_time", uint64(123552), dtags1))
assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags2))
assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags2))
assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags2))
assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags2))
assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags2))
assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags2))
assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags2))
// We expect 7 more DiskIOPoints to show up with an explicit match on "sdb1"
// and serial should be missing from the tags with SkipSerialNumber set
err = (&DiskIOStats{ps: &mps, Devices: []string{"sdb1"}, SkipSerialNumber: true}).Gather(&acc)
assert.Equal(t, preDiskIOPoints+expectedAllDiskIOPoints+7, len(acc.Points))
dtags3 := map[string]string{
"name": "sdb1",
}
assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags3))
assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags3))
assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags3))
assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags3))
assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags3))
assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags3))
assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags3))
err = (&MemStats{&mps}).Gather(&acc) err = (&MemStats{&mps}).Gather(&acc)
require.NoError(t, err) require.NoError(t, err)

View File

@ -106,15 +106,20 @@ func (a *Accumulator) Get(measurement string) (*Point, bool) {
return nil, false return nil, false
} }
// CheckValue calls CheckFieldsValue passing a single-value map as fields
func (a *Accumulator) CheckValue(measurement string, val interface{}) bool {
return a.CheckFieldsValue(measurement, map[string]interface{}{"value": val})
}
// CheckValue checks that the accumulators point for the given measurement // CheckValue checks that the accumulators point for the given measurement
// is the same as the given value. // is the same as the given value.
func (a *Accumulator) CheckValue(measurement string, val interface{}) bool { func (a *Accumulator) CheckFieldsValue(measurement string, fields map[string]interface{}) bool {
for _, p := range a.Points { for _, p := range a.Points {
if p.Measurement == measurement { if p.Measurement == measurement {
return p.Values["value"] == val return reflect.DeepEqual(fields, p.Values)
} }
} }
fmt.Printf("CheckValue failed, measurement %s, value %s", measurement, val) fmt.Printf("CheckFieldsValue failed, measurement %s, fields %s", measurement, fields)
return false return false
} }
@ -127,12 +132,35 @@ func (a *Accumulator) CheckTaggedValue(
return a.ValidateTaggedValue(measurement, val, tags) == nil return a.ValidateTaggedValue(measurement, val, tags) == nil
} }
// ValidateTaggedValue validates that the given measurement and value exist // ValidateTaggedValue calls ValidateTaggedFieldsValue passing a single-value map as fields
// in the accumulator and with the given tags.
func (a *Accumulator) ValidateTaggedValue( func (a *Accumulator) ValidateTaggedValue(
measurement string, measurement string,
val interface{}, val interface{},
tags map[string]string, tags map[string]string,
) error {
return a.ValidateTaggedFieldsValue(measurement, map[string]interface{}{"value": val}, tags)
}
// ValidateValue calls ValidateTaggedValue
func (a *Accumulator) ValidateValue(measurement string, val interface{}) error {
return a.ValidateTaggedValue(measurement, val, nil)
}
// CheckTaggedFieldsValue calls ValidateTaggedFieldsValue
func (a *Accumulator) CheckTaggedFieldsValue(
measurement string,
fields map[string]interface{},
tags map[string]string,
) bool {
return a.ValidateTaggedFieldsValue(measurement, fields, tags) == nil
}
// ValidateTaggedValue validates that the given measurement and value exist
// in the accumulator and with the given tags.
func (a *Accumulator) ValidateTaggedFieldsValue(
measurement string,
fields map[string]interface{},
tags map[string]string,
) error { ) error {
if tags == nil { if tags == nil {
tags = map[string]string{} tags = map[string]string{}
@ -143,9 +171,8 @@ func (a *Accumulator) ValidateTaggedValue(
} }
if p.Measurement == measurement { if p.Measurement == measurement {
if p.Values["value"] != val { if !reflect.DeepEqual(fields, p.Values) {
return fmt.Errorf("%v (%T) != %v (%T)", return fmt.Errorf("%v != %v ", fields, p.Values)
p.Values["value"], p.Values["value"], val, val)
} }
return nil return nil
} }
@ -154,9 +181,12 @@ func (a *Accumulator) ValidateTaggedValue(
return fmt.Errorf("unknown measurement %s with tags %v", measurement, tags) return fmt.Errorf("unknown measurement %s with tags %v", measurement, tags)
} }
// ValidateValue calls ValidateTaggedValue // ValidateFieldsValue calls ValidateTaggedFieldsValue
func (a *Accumulator) ValidateValue(measurement string, val interface{}) error { func (a *Accumulator) ValidateFieldsValue(
return a.ValidateTaggedValue(measurement, val, nil) measurement string,
fields map[string]interface{},
) error {
return a.ValidateTaggedValue(measurement, fields, nil)
} }
func (a *Accumulator) ValidateTaggedFields( func (a *Accumulator) ValidateTaggedFields(