Sync with upstream

This commit is contained in:
martinrusev 2015-11-06 13:01:13 +02:00
commit d9d461cf79
10 changed files with 562 additions and 41 deletions

View File

@ -13,6 +13,8 @@ changed to just run docker commands in the Makefile. See `make docker-run` and
- [#318](https://github.com/influxdb/telegraf/pull/318): Prometheus output. Thanks @oldmantaiter! - [#318](https://github.com/influxdb/telegraf/pull/318): Prometheus output. Thanks @oldmantaiter!
- [#338](https://github.com/influxdb/telegraf/pull/338): Restart Telegraf on package upgrade. Thanks @linsomniac! - [#338](https://github.com/influxdb/telegraf/pull/338): Restart Telegraf on package upgrade. Thanks @linsomniac!
- [#350](https://github.com/influxdb/telegraf/pull/350): Amon output. - [#350](https://github.com/influxdb/telegraf/pull/350): Amon output.
- [#337](https://github.com/influxdb/telegraf/pull/337): Jolokia plugin, thanks @saiello!
### Bugfixes ### Bugfixes
- [#331](https://github.com/influxdb/telegraf/pull/331): Dont overwrite host tag in redis plugin. - [#331](https://github.com/influxdb/telegraf/pull/331): Dont overwrite host tag in redis plugin.

View File

@ -173,6 +173,7 @@ Telegraf currently has support for collecting metrics from:
* exec (generic JSON-emitting executable plugin) * exec (generic JSON-emitting executable plugin)
* haproxy * haproxy
* httpjson (generic JSON-emitting http service plugin) * httpjson (generic JSON-emitting http service plugin)
* jolokia (remote JMX with JSON over HTTP)
* kafka_consumer * kafka_consumer
* leofs * leofs
* lustre2 * lustre2

View File

@ -9,6 +9,7 @@ import (
_ "github.com/influxdb/telegraf/plugins/exec" _ "github.com/influxdb/telegraf/plugins/exec"
_ "github.com/influxdb/telegraf/plugins/haproxy" _ "github.com/influxdb/telegraf/plugins/haproxy"
_ "github.com/influxdb/telegraf/plugins/httpjson" _ "github.com/influxdb/telegraf/plugins/httpjson"
_ "github.com/influxdb/telegraf/plugins/jolokia"
_ "github.com/influxdb/telegraf/plugins/kafka_consumer" _ "github.com/influxdb/telegraf/plugins/kafka_consumer"
_ "github.com/influxdb/telegraf/plugins/leofs" _ "github.com/influxdb/telegraf/plugins/leofs"
_ "github.com/influxdb/telegraf/plugins/lustre2" _ "github.com/influxdb/telegraf/plugins/lustre2"

View File

@ -1,6 +1,7 @@
package bcache package bcache
import ( import (
"errors"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
@ -34,17 +35,6 @@ func (b *Bcache) Description() string {
return "Read metrics of bcache from stats_total and dirty_data" return "Read metrics of bcache from stats_total and dirty_data"
} }
func getBackingDevs(bcachePath string) []string {
bdevs, err := filepath.Glob(bcachePath + "/*/bdev*")
if len(bdevs) < 1 {
panic("Can't found any bcache device")
}
if err != nil {
panic(err)
}
return bdevs
}
func getTags(bdev string) map[string]string { func getTags(bdev string) map[string]string {
backingDevFile, _ := os.Readlink(bdev) backingDevFile, _ := os.Readlink(bdev)
backingDevPath := strings.Split(backingDevFile, "/") backingDevPath := strings.Split(backingDevFile, "/")
@ -83,11 +73,11 @@ func (b *Bcache) gatherBcache(bdev string, acc plugins.Accumulator) error {
tags := getTags(bdev) tags := getTags(bdev)
metrics, err := filepath.Glob(bdev + "/stats_total/*") metrics, err := filepath.Glob(bdev + "/stats_total/*")
if len(metrics) < 0 { if len(metrics) < 0 {
panic("Can't read any stats file") return errors.New("Can't read any stats file")
} }
file, err := ioutil.ReadFile(bdev + "/dirty_data") file, err := ioutil.ReadFile(bdev + "/dirty_data")
if err != nil { if err != nil {
panic(err) return err
} }
rawValue := strings.TrimSpace(string(file)) rawValue := strings.TrimSpace(string(file))
value := prettyToBytes(rawValue) value := prettyToBytes(rawValue)
@ -98,7 +88,7 @@ func (b *Bcache) gatherBcache(bdev string, acc plugins.Accumulator) error {
file, err := ioutil.ReadFile(path) file, err := ioutil.ReadFile(path)
rawValue := strings.TrimSpace(string(file)) rawValue := strings.TrimSpace(string(file))
if err != nil { if err != nil {
panic(err) return err
} }
if key == "bypassed" { if key == "bypassed" {
value := prettyToBytes(rawValue) value := prettyToBytes(rawValue)
@ -125,7 +115,11 @@ func (b *Bcache) Gather(acc plugins.Accumulator) error {
if len(bcachePath) == 0 { if len(bcachePath) == 0 {
bcachePath = "/sys/fs/bcache" bcachePath = "/sys/fs/bcache"
} }
for _, bdev := range getBackingDevs(bcachePath) { bdevs, _ := filepath.Glob(bcachePath + "/*/bdev*")
if len(bdevs) < 1 {
return errors.New("Can't found any bcache device")
}
for _, bdev := range bdevs {
if restrictDevs { if restrictDevs {
bcacheDev := getTags(bdev)["bcache_dev"] bcacheDev := getTags(bdev)["bcache_dev"]
if !bcacheDevsChecked[bcacheDev] { if !bcacheDevsChecked[bcacheDev] {

51
plugins/jolokia/README.md Normal file
View File

@ -0,0 +1,51 @@
# Telegraf plugin: Jolokia
#### Plugin arguments:
- **context** string: Context root used of jolokia url
- **servers** []Server: List of servers
+ **name** string: Server's logical name
+ **host** string: Server's ip address or hostname
+ **port** string: Server's listening port
- **metrics** []Metric
+ **name** string: Name of the measure
+ **jmx** string: Jmx path that identifies mbeans attributes
+ **pass** []string: Attributes to retain when collecting values
+ **drop** []string: Attributes to drop when collecting values
#### Description
The Jolokia plugin collects JVM metrics exposed as MBean's attributes through jolokia REST endpoint. All metrics
are collected for each server configured.
See: https://jolokia.org/
# Measurements:
Jolokia plugin produces one measure for each metric configured, adding Server's `name`, `host` and `port` as tags.
Given a configuration like:
```ini
[jolokia]
[[jolokia.servers]]
name = "as-service-1"
host = "127.0.0.1"
port = "8080"
[[jolokia.servers]]
name = "as-service-2"
host = "127.0.0.1"
port = "8180"
[[jolokia.metrics]]
name = "heap_memory_usage"
jmx = "/java.lang:type=Memory/HeapMemoryUsage"
pass = ["used", "max"]
```
The collected metrics will be:
```
jolokia_heap_memory_usage name=as-service-1,host=127.0.0.1,port=8080 used=xxx,max=yyy
jolokia_heap_memory_usage name=as-service-2,host=127.0.0.1,port=8180 used=vvv,max=zzz
```

223
plugins/jolokia/jolokia.go Normal file
View File

@ -0,0 +1,223 @@
package jolokia
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"
"github.com/influxdb/telegraf/plugins"
)
type Server struct {
Name string
Host string
Port string
}
type Metric struct {
Name string
Jmx string
Pass []string
Drop []string
}
type JolokiaClient interface {
MakeRequest(req *http.Request) (*http.Response, error)
}
type JolokiaClientImpl struct {
client *http.Client
}
func (c JolokiaClientImpl) MakeRequest(req *http.Request) (*http.Response, error) {
return c.client.Do(req)
}
type Jolokia struct {
jClient JolokiaClient
Context string
Servers []Server
Metrics []Metric
Tags map[string]string
}
func (j *Jolokia) SampleConfig() string {
return `
# This is the context root used to compose the jolokia url
context = "/jolokia/read"
# Tags added to each measurements
[jolokia.tags]
group = "as"
# List of servers exposing jolokia read service
[[jolokia.servers]]
name = "stable"
host = "192.168.103.2"
port = "8180"
# List of metrics collected on above servers
# Each metric consists in a name, a jmx path and either a pass or drop slice attributes
# This collect all heap memory usage metrics
[[jolokia.metrics]]
name = "heap_memory_usage"
jmx = "/java.lang:type=Memory/HeapMemoryUsage"
# This drops the 'committed' value from Eden space measurement
[[jolokia.metrics]]
name = "memory_eden"
jmx = "/java.lang:type=MemoryPool,name=PS Eden Space/Usage"
drop = [ "committed" ]
# This passes only DaemonThreadCount and ThreadCount
[[jolokia.metrics]]
name = "heap_threads"
jmx = "/java.lang:type=Threading"
pass = [
"DaemonThreadCount",
"ThreadCount"
]
`
}
func (j *Jolokia) Description() string {
return "Read JMX metrics through Jolokia"
}
func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
// Create + send request
req, err := http.NewRequest("GET", requestUrl.String(), nil)
if err != nil {
return nil, err
}
resp, err := j.jClient.MakeRequest(req)
if err != nil {
return nil, err
}
if err != nil {
return nil, err
}
defer resp.Body.Close()
// Process response
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)",
requestUrl,
resp.StatusCode,
http.StatusText(resp.StatusCode),
http.StatusOK,
http.StatusText(http.StatusOK))
return nil, err
}
// read body
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
// Unmarshal json
var jsonOut map[string]interface{}
if err = json.Unmarshal([]byte(body), &jsonOut); err != nil {
return nil, errors.New("Error decoding JSON response")
}
return jsonOut, nil
}
func (m *Metric) shouldPass(field string) bool {
if m.Pass != nil {
for _, pass := range m.Pass {
if strings.HasPrefix(field, pass) {
return true
}
}
return false
}
if m.Drop != nil {
for _, drop := range m.Drop {
if strings.HasPrefix(field, drop) {
return false
}
}
return true
}
return true
}
func (m *Metric) filterFields(fields map[string]interface{}) map[string]interface{} {
for field, _ := range fields {
if !m.shouldPass(field) {
delete(fields, field)
}
}
return fields
}
func (j *Jolokia) Gather(acc plugins.Accumulator) error {
context := j.Context //"/jolokia/read"
servers := j.Servers
metrics := j.Metrics
tags := j.Tags
if tags == nil {
tags = map[string]string{}
}
for _, server := range servers {
for _, metric := range metrics {
measurement := metric.Name
jmxPath := metric.Jmx
tags["server"] = server.Name
tags["port"] = server.Port
tags["host"] = server.Host
// Prepare URL
requestUrl, err := url.Parse("http://" + server.Host + ":" + server.Port + context + jmxPath)
if err != nil {
return err
}
out, _ := j.getAttr(requestUrl)
if values, ok := out["value"]; ok {
switch values.(type) {
case map[string]interface{}:
acc.AddFields(measurement, metric.filterFields(values.(map[string]interface{})), tags)
case interface{}:
acc.Add(measurement, values.(interface{}), tags)
}
} else {
fmt.Printf("Missing key 'value' in '%s' output response\n", requestUrl.String())
}
}
}
return nil
}
func init() {
plugins.Add("jolokia", func() plugins.Plugin {
return &Jolokia{jClient: &JolokiaClientImpl{client: &http.Client{}}}
})
}

View File

@ -0,0 +1,147 @@
package jolokia
import (
_ "fmt"
"io/ioutil"
"net/http"
"strings"
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
_ "github.com/stretchr/testify/require"
)
const validMultiValueJSON = `
{
"request":{
"mbean":"java.lang:type=Memory",
"attribute":"HeapMemoryUsage",
"type":"read"
},
"value":{
"init":67108864,
"committed":456130560,
"max":477626368,
"used":203288528
},
"timestamp":1446129191,
"status":200
}`
const validSingleValueJSON = `
{
"request":{
"path":"used",
"mbean":"java.lang:type=Memory",
"attribute":"HeapMemoryUsage",
"type":"read"
},
"value":209274376,
"timestamp":1446129256,
"status":200
}`
const invalidJSON = "I don't think this is JSON"
const empty = ""
var Servers = []Server{Server{Name: "as1", Host: "127.0.0.1", Port: "8080"}}
var HeapMetric = Metric{Name: "heap_memory_usage", Jmx: "/java.lang:type=Memory/HeapMemoryUsage"}
var UsedHeapMetric = Metric{Name: "heap_memory_usage", Jmx: "/java.lang:type=Memory/HeapMemoryUsage", Pass: []string{"used"}}
type jolokiaClientStub struct {
responseBody string
statusCode int
}
func (c jolokiaClientStub) MakeRequest(req *http.Request) (*http.Response, error) {
resp := http.Response{}
resp.StatusCode = c.statusCode
resp.Body = ioutil.NopCloser(strings.NewReader(c.responseBody))
return &resp, nil
}
// Generates a pointer to an HttpJson object that uses a mock HTTP client.
// Parameters:
// response : Body of the response that the mock HTTP client should return
// statusCode: HTTP status code the mock HTTP client should return
//
// Returns:
// *HttpJson: Pointer to an HttpJson object that uses the generated mock HTTP client
func genJolokiaClientStub(response string, statusCode int, servers []Server, metrics []Metric) *Jolokia {
return &Jolokia{
jClient: jolokiaClientStub{responseBody: response, statusCode: statusCode},
Servers: servers,
Metrics: metrics,
}
}
// Test that the proper values are ignored or collected
func TestHttpJsonMultiValue(t *testing.T) {
jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{HeapMetric})
var acc testutil.Accumulator
err := jolokia.Gather(&acc)
assert.Nil(t, err)
assert.Equal(t, 1, len(acc.Points))
assert.True(t, acc.CheckFieldsValue("heap_memory_usage", map[string]interface{}{"init": 67108864.0,
"committed": 456130560.0,
"max": 477626368.0,
"used": 203288528.0}))
}
// Test that the proper values are ignored or collected
func TestHttpJsonMultiValueWithPass(t *testing.T) {
jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{UsedHeapMetric})
var acc testutil.Accumulator
err := jolokia.Gather(&acc)
assert.Nil(t, err)
assert.Equal(t, 1, len(acc.Points))
assert.True(t, acc.CheckFieldsValue("heap_memory_usage", map[string]interface{}{"used": 203288528.0}))
}
// Test that the proper values are ignored or collected
func TestHttpJsonMultiValueTags(t *testing.T) {
jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{UsedHeapMetric})
var acc testutil.Accumulator
err := jolokia.Gather(&acc)
assert.Nil(t, err)
assert.Equal(t, 1, len(acc.Points))
assert.NoError(t, acc.ValidateTaggedFieldsValue("heap_memory_usage", map[string]interface{}{"used": 203288528.0}, map[string]string{"host": "127.0.0.1", "port": "8080", "server": "as1"}))
}
// Test that the proper values are ignored or collected
func TestHttpJsonSingleValueTags(t *testing.T) {
jolokia := genJolokiaClientStub(validSingleValueJSON, 200, Servers, []Metric{UsedHeapMetric})
var acc testutil.Accumulator
err := jolokia.Gather(&acc)
assert.Nil(t, err)
assert.Equal(t, 1, len(acc.Points))
assert.NoError(t, acc.ValidateTaggedFieldsValue("heap_memory_usage", map[string]interface{}{"value": 209274376.0}, map[string]string{"host": "127.0.0.1", "port": "8080", "server": "as1"}))
}
// Test that the proper values are ignored or collected
func TestHttpJsonOn404(t *testing.T) {
jolokia := genJolokiaClientStub(validMultiValueJSON, 404, Servers, []Metric{UsedHeapMetric})
var acc testutil.Accumulator
err := jolokia.Gather(&acc)
assert.Nil(t, err)
assert.Equal(t, 0, len(acc.Points))
}

View File

@ -18,8 +18,7 @@ func (_ *DiskStats) Description() string {
var diskSampleConfig = ` var diskSampleConfig = `
# By default, telegraf gather stats for all mountpoints. # By default, telegraf gather stats for all mountpoints.
# Setting mountpoints will restrict the stats to the specified ones. # Setting mountpoints will restrict the stats to the specified mountpoints.
# mountpoints.
# Mountpoints=["/"] # Mountpoints=["/"]
` `
@ -64,13 +63,27 @@ func (s *DiskStats) Gather(acc plugins.Accumulator) error {
type DiskIOStats struct { type DiskIOStats struct {
ps PS ps PS
Devices []string
SkipSerialNumber bool
} }
func (_ *DiskIOStats) Description() string { func (_ *DiskIOStats) Description() string {
return "Read metrics about disk IO by device" return "Read metrics about disk IO by device"
} }
func (_ *DiskIOStats) SampleConfig() string { return "" } var diskIoSampleConfig = `
# By default, telegraf will gather stats for all devices including
# disk partitions.
# Setting devices will restrict the stats to the specified devcies.
# Devices=["sda","sdb"]
# Uncomment the following line if you do not need disk serial numbers.
# SkipSerialNumber = true
`
func (_ *DiskIOStats) SampleConfig() string {
return diskIoSampleConfig
}
func (s *DiskIOStats) Gather(acc plugins.Accumulator) error { func (s *DiskIOStats) Gather(acc plugins.Accumulator) error {
diskio, err := s.ps.DiskIO() diskio, err := s.ps.DiskIO()
@ -78,12 +91,25 @@ func (s *DiskIOStats) Gather(acc plugins.Accumulator) error {
return fmt.Errorf("error getting disk io info: %s", err) return fmt.Errorf("error getting disk io info: %s", err)
} }
var restrictDevices bool
devices := make(map[string]bool)
if len(s.Devices) != 0 {
restrictDevices = true
for _, dev := range s.Devices {
devices[dev] = true
}
}
for _, io := range diskio { for _, io := range diskio {
_, member := devices[io.Name]
if restrictDevices && !member {
continue
}
tags := map[string]string{} tags := map[string]string{}
if len(io.Name) != 0 { if len(io.Name) != 0 {
tags["name"] = io.Name tags["name"] = io.Name
} }
if len(io.SerialNumber) != 0 { if len(io.SerialNumber) != 0 && !s.SkipSerialNumber {
tags["serial"] = io.SerialNumber tags["serial"] = io.SerialNumber
} }

View File

@ -73,7 +73,8 @@ func TestSystemStats_GenerateStats(t *testing.T) {
mps.On("DiskUsage").Return(du, nil) mps.On("DiskUsage").Return(du, nil)
diskio := disk.DiskIOCountersStat{ diskio1 := disk.DiskIOCountersStat{
ReadCount: 888, ReadCount: 888,
WriteCount: 5341, WriteCount: 5341,
ReadBytes: 100000, ReadBytes: 100000,
@ -84,8 +85,19 @@ func TestSystemStats_GenerateStats(t *testing.T) {
IoTime: 123552, IoTime: 123552,
SerialNumber: "ab-123-ad", SerialNumber: "ab-123-ad",
} }
diskio2 := disk.DiskIOCountersStat{
ReadCount: 444,
WriteCount: 2341,
ReadBytes: 200000,
WriteBytes: 400000,
ReadTime: 3123,
WriteTime: 6087,
Name: "sdb1",
IoTime: 246552,
SerialNumber: "bb-123-ad",
}
mps.On("DiskIO").Return(map[string]disk.DiskIOCountersStat{"sda1": diskio}, nil) mps.On("DiskIO").Return(map[string]disk.DiskIOCountersStat{"sda1": diskio1, "sdb1": diskio2}, nil)
netio := net.NetIOCountersStat{ netio := net.NetIOCountersStat{
Name: "eth0", Name: "eth0",
@ -262,21 +274,55 @@ func TestSystemStats_GenerateStats(t *testing.T) {
assert.NoError(t, acc.ValidateTaggedValue("drop_in", uint64(7), ntags)) assert.NoError(t, acc.ValidateTaggedValue("drop_in", uint64(7), ntags))
assert.NoError(t, acc.ValidateTaggedValue("drop_out", uint64(1), ntags)) assert.NoError(t, acc.ValidateTaggedValue("drop_out", uint64(1), ntags))
err = (&DiskIOStats{&mps}).Gather(&acc) preDiskIOPoints := len(acc.Points)
err = (&DiskIOStats{ps: &mps}).Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
dtags := map[string]string{ numDiskIOPoints := len(acc.Points) - preDiskIOPoints
expectedAllDiskIOPoints := 14
assert.Equal(t, expectedAllDiskIOPoints, numDiskIOPoints)
dtags1 := map[string]string{
"name": "sda1", "name": "sda1",
"serial": "ab-123-ad", "serial": "ab-123-ad",
} }
dtags2 := map[string]string{
"name": "sdb1",
"serial": "bb-123-ad",
}
assert.True(t, acc.CheckTaggedValue("reads", uint64(888), dtags)) assert.True(t, acc.CheckTaggedValue("reads", uint64(888), dtags1))
assert.True(t, acc.CheckTaggedValue("writes", uint64(5341), dtags)) assert.True(t, acc.CheckTaggedValue("writes", uint64(5341), dtags1))
assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(100000), dtags)) assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(100000), dtags1))
assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(200000), dtags)) assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(200000), dtags1))
assert.True(t, acc.CheckTaggedValue("read_time", uint64(7123), dtags)) assert.True(t, acc.CheckTaggedValue("read_time", uint64(7123), dtags1))
assert.True(t, acc.CheckTaggedValue("write_time", uint64(9087), dtags)) assert.True(t, acc.CheckTaggedValue("write_time", uint64(9087), dtags1))
assert.True(t, acc.CheckTaggedValue("io_time", uint64(123552), dtags)) assert.True(t, acc.CheckTaggedValue("io_time", uint64(123552), dtags1))
assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags2))
assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags2))
assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags2))
assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags2))
assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags2))
assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags2))
assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags2))
// We expect 7 more DiskIOPoints to show up with an explicit match on "sdb1"
// and serial should be missing from the tags with SkipSerialNumber set
err = (&DiskIOStats{ps: &mps, Devices: []string{"sdb1"}, SkipSerialNumber: true}).Gather(&acc)
assert.Equal(t, preDiskIOPoints+expectedAllDiskIOPoints+7, len(acc.Points))
dtags3 := map[string]string{
"name": "sdb1",
}
assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags3))
assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags3))
assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags3))
assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags3))
assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags3))
assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags3))
assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags3))
err = (&MemStats{&mps}).Gather(&acc) err = (&MemStats{&mps}).Gather(&acc)
require.NoError(t, err) require.NoError(t, err)

View File

@ -106,15 +106,20 @@ func (a *Accumulator) Get(measurement string) (*Point, bool) {
return nil, false return nil, false
} }
// CheckValue calls CheckFieldsValue passing a single-value map as fields
func (a *Accumulator) CheckValue(measurement string, val interface{}) bool {
return a.CheckFieldsValue(measurement, map[string]interface{}{"value": val})
}
// CheckValue checks that the accumulators point for the given measurement // CheckValue checks that the accumulators point for the given measurement
// is the same as the given value. // is the same as the given value.
func (a *Accumulator) CheckValue(measurement string, val interface{}) bool { func (a *Accumulator) CheckFieldsValue(measurement string, fields map[string]interface{}) bool {
for _, p := range a.Points { for _, p := range a.Points {
if p.Measurement == measurement { if p.Measurement == measurement {
return p.Values["value"] == val return reflect.DeepEqual(fields, p.Values)
} }
} }
fmt.Printf("CheckValue failed, measurement %s, value %s", measurement, val) fmt.Printf("CheckFieldsValue failed, measurement %s, fields %s", measurement, fields)
return false return false
} }
@ -127,12 +132,35 @@ func (a *Accumulator) CheckTaggedValue(
return a.ValidateTaggedValue(measurement, val, tags) == nil return a.ValidateTaggedValue(measurement, val, tags) == nil
} }
// ValidateTaggedValue validates that the given measurement and value exist // ValidateTaggedValue calls ValidateTaggedFieldsValue passing a single-value map as fields
// in the accumulator and with the given tags.
func (a *Accumulator) ValidateTaggedValue( func (a *Accumulator) ValidateTaggedValue(
measurement string, measurement string,
val interface{}, val interface{},
tags map[string]string, tags map[string]string,
) error {
return a.ValidateTaggedFieldsValue(measurement, map[string]interface{}{"value": val}, tags)
}
// ValidateValue calls ValidateTaggedValue
func (a *Accumulator) ValidateValue(measurement string, val interface{}) error {
return a.ValidateTaggedValue(measurement, val, nil)
}
// CheckTaggedFieldsValue calls ValidateTaggedFieldsValue
func (a *Accumulator) CheckTaggedFieldsValue(
measurement string,
fields map[string]interface{},
tags map[string]string,
) bool {
return a.ValidateTaggedFieldsValue(measurement, fields, tags) == nil
}
// ValidateTaggedValue validates that the given measurement and value exist
// in the accumulator and with the given tags.
func (a *Accumulator) ValidateTaggedFieldsValue(
measurement string,
fields map[string]interface{},
tags map[string]string,
) error { ) error {
if tags == nil { if tags == nil {
tags = map[string]string{} tags = map[string]string{}
@ -143,9 +171,8 @@ func (a *Accumulator) ValidateTaggedValue(
} }
if p.Measurement == measurement { if p.Measurement == measurement {
if p.Values["value"] != val { if !reflect.DeepEqual(fields, p.Values) {
return fmt.Errorf("%v (%T) != %v (%T)", return fmt.Errorf("%v != %v ", fields, p.Values)
p.Values["value"], p.Values["value"], val, val)
} }
return nil return nil
} }
@ -154,9 +181,12 @@ func (a *Accumulator) ValidateTaggedValue(
return fmt.Errorf("unknown measurement %s with tags %v", measurement, tags) return fmt.Errorf("unknown measurement %s with tags %v", measurement, tags)
} }
// ValidateValue calls ValidateTaggedValue // ValidateFieldsValue calls ValidateTaggedFieldsValue
func (a *Accumulator) ValidateValue(measurement string, val interface{}) error { func (a *Accumulator) ValidateFieldsValue(
return a.ValidateTaggedValue(measurement, val, nil) measurement string,
fields map[string]interface{},
) error {
return a.ValidateTaggedValue(measurement, fields, nil)
} }
func (a *Accumulator) ValidateTaggedFields( func (a *Accumulator) ValidateTaggedFields(