diff --git a/CHANGELOG.md b/CHANGELOG.md index c24ecd761..7e7d09df9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ changed to just run docker commands in the Makefile. See `make docker-run` and - [#318](https://github.com/influxdb/telegraf/pull/318): Prometheus output. Thanks @oldmantaiter! - [#338](https://github.com/influxdb/telegraf/pull/338): Restart Telegraf on package upgrade. Thanks @linsomniac! - [#350](https://github.com/influxdb/telegraf/pull/350): Amon output. +- [#337](https://github.com/influxdb/telegraf/pull/337): Jolokia plugin, thanks @saiello! + ### Bugfixes - [#331](https://github.com/influxdb/telegraf/pull/331): Dont overwrite host tag in redis plugin. diff --git a/README.md b/README.md index 864510069..5cae6ec41 100644 --- a/README.md +++ b/README.md @@ -173,6 +173,7 @@ Telegraf currently has support for collecting metrics from: * exec (generic JSON-emitting executable plugin) * haproxy * httpjson (generic JSON-emitting http service plugin) +* jolokia (remote JMX with JSON over HTTP) * kafka_consumer * leofs * lustre2 diff --git a/plugins/all/all.go b/plugins/all/all.go index f29a4987b..d714dd81e 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -9,6 +9,7 @@ import ( _ "github.com/influxdb/telegraf/plugins/exec" _ "github.com/influxdb/telegraf/plugins/haproxy" _ "github.com/influxdb/telegraf/plugins/httpjson" + _ "github.com/influxdb/telegraf/plugins/jolokia" _ "github.com/influxdb/telegraf/plugins/kafka_consumer" _ "github.com/influxdb/telegraf/plugins/leofs" _ "github.com/influxdb/telegraf/plugins/lustre2" diff --git a/plugins/bcache/bcache.go b/plugins/bcache/bcache.go index ee63f3c48..76e638ea4 100644 --- a/plugins/bcache/bcache.go +++ b/plugins/bcache/bcache.go @@ -1,6 +1,7 @@ package bcache import ( + "errors" "io/ioutil" "os" "path/filepath" @@ -34,17 +35,6 @@ func (b *Bcache) Description() string { return "Read metrics of bcache from stats_total and dirty_data" } -func getBackingDevs(bcachePath string) []string { - bdevs, err := filepath.Glob(bcachePath + "/*/bdev*") - if len(bdevs) < 1 { - panic("Can't found any bcache device") - } - if err != nil { - panic(err) - } - return bdevs -} - func getTags(bdev string) map[string]string { backingDevFile, _ := os.Readlink(bdev) backingDevPath := strings.Split(backingDevFile, "/") @@ -83,11 +73,11 @@ func (b *Bcache) gatherBcache(bdev string, acc plugins.Accumulator) error { tags := getTags(bdev) metrics, err := filepath.Glob(bdev + "/stats_total/*") if len(metrics) < 0 { - panic("Can't read any stats file") + return errors.New("Can't read any stats file") } file, err := ioutil.ReadFile(bdev + "/dirty_data") if err != nil { - panic(err) + return err } rawValue := strings.TrimSpace(string(file)) value := prettyToBytes(rawValue) @@ -98,7 +88,7 @@ func (b *Bcache) gatherBcache(bdev string, acc plugins.Accumulator) error { file, err := ioutil.ReadFile(path) rawValue := strings.TrimSpace(string(file)) if err != nil { - panic(err) + return err } if key == "bypassed" { value := prettyToBytes(rawValue) @@ -125,7 +115,11 @@ func (b *Bcache) Gather(acc plugins.Accumulator) error { if len(bcachePath) == 0 { bcachePath = "/sys/fs/bcache" } - for _, bdev := range getBackingDevs(bcachePath) { + bdevs, _ := filepath.Glob(bcachePath + "/*/bdev*") + if len(bdevs) < 1 { + return errors.New("Can't found any bcache device") + } + for _, bdev := range bdevs { if restrictDevs { bcacheDev := getTags(bdev)["bcache_dev"] if !bcacheDevsChecked[bcacheDev] { diff --git a/plugins/jolokia/README.md b/plugins/jolokia/README.md new file mode 100644 index 000000000..bda0c5f93 --- /dev/null +++ b/plugins/jolokia/README.md @@ -0,0 +1,51 @@ +# Telegraf plugin: Jolokia + +#### Plugin arguments: +- **context** string: Context root used of jolokia url +- **servers** []Server: List of servers + + **name** string: Server's logical name + + **host** string: Server's ip address or hostname + + **port** string: Server's listening port +- **metrics** []Metric + + **name** string: Name of the measure + + **jmx** string: Jmx path that identifies mbeans attributes + + **pass** []string: Attributes to retain when collecting values + + **drop** []string: Attributes to drop when collecting values + +#### Description + +The Jolokia plugin collects JVM metrics exposed as MBean's attributes through jolokia REST endpoint. All metrics +are collected for each server configured. + +See: https://jolokia.org/ + +# Measurements: +Jolokia plugin produces one measure for each metric configured, adding Server's `name`, `host` and `port` as tags. + +Given a configuration like: + +```ini +[jolokia] + +[[jolokia.servers]] + name = "as-service-1" + host = "127.0.0.1" + port = "8080" + +[[jolokia.servers]] + name = "as-service-2" + host = "127.0.0.1" + port = "8180" + +[[jolokia.metrics]] + name = "heap_memory_usage" + jmx = "/java.lang:type=Memory/HeapMemoryUsage" + pass = ["used", "max"] +``` + +The collected metrics will be: + +``` +jolokia_heap_memory_usage name=as-service-1,host=127.0.0.1,port=8080 used=xxx,max=yyy +jolokia_heap_memory_usage name=as-service-2,host=127.0.0.1,port=8180 used=vvv,max=zzz +``` diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go new file mode 100644 index 000000000..1ece12cf1 --- /dev/null +++ b/plugins/jolokia/jolokia.go @@ -0,0 +1,223 @@ +package jolokia + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "strings" + + "github.com/influxdb/telegraf/plugins" +) + +type Server struct { + Name string + Host string + Port string +} + +type Metric struct { + Name string + Jmx string + Pass []string + Drop []string +} + +type JolokiaClient interface { + MakeRequest(req *http.Request) (*http.Response, error) +} + +type JolokiaClientImpl struct { + client *http.Client +} + +func (c JolokiaClientImpl) MakeRequest(req *http.Request) (*http.Response, error) { + return c.client.Do(req) +} + +type Jolokia struct { + jClient JolokiaClient + Context string + Servers []Server + Metrics []Metric + Tags map[string]string +} + +func (j *Jolokia) SampleConfig() string { + return ` + # This is the context root used to compose the jolokia url + context = "/jolokia/read" + + # Tags added to each measurements + [jolokia.tags] + group = "as" + + # List of servers exposing jolokia read service + [[jolokia.servers]] + name = "stable" + host = "192.168.103.2" + port = "8180" + + # List of metrics collected on above servers + # Each metric consists in a name, a jmx path and either a pass or drop slice attributes + # This collect all heap memory usage metrics + [[jolokia.metrics]] + name = "heap_memory_usage" + jmx = "/java.lang:type=Memory/HeapMemoryUsage" + + + # This drops the 'committed' value from Eden space measurement + [[jolokia.metrics]] + name = "memory_eden" + jmx = "/java.lang:type=MemoryPool,name=PS Eden Space/Usage" + drop = [ "committed" ] + + + # This passes only DaemonThreadCount and ThreadCount + [[jolokia.metrics]] + name = "heap_threads" + jmx = "/java.lang:type=Threading" + pass = [ + "DaemonThreadCount", + "ThreadCount" + ] +` +} + +func (j *Jolokia) Description() string { + return "Read JMX metrics through Jolokia" +} + +func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) { + // Create + send request + req, err := http.NewRequest("GET", requestUrl.String(), nil) + if err != nil { + return nil, err + } + + resp, err := j.jClient.MakeRequest(req) + if err != nil { + return nil, err + } + + if err != nil { + return nil, err + } + defer resp.Body.Close() + + // Process response + if resp.StatusCode != http.StatusOK { + err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)", + requestUrl, + resp.StatusCode, + http.StatusText(resp.StatusCode), + http.StatusOK, + http.StatusText(http.StatusOK)) + return nil, err + } + + // read body + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + // Unmarshal json + var jsonOut map[string]interface{} + if err = json.Unmarshal([]byte(body), &jsonOut); err != nil { + return nil, errors.New("Error decoding JSON response") + } + + return jsonOut, nil +} + +func (m *Metric) shouldPass(field string) bool { + + if m.Pass != nil { + + for _, pass := range m.Pass { + if strings.HasPrefix(field, pass) { + return true + } + } + + return false + } + + if m.Drop != nil { + + for _, drop := range m.Drop { + if strings.HasPrefix(field, drop) { + return false + } + } + + return true + } + + return true +} + +func (m *Metric) filterFields(fields map[string]interface{}) map[string]interface{} { + + for field, _ := range fields { + if !m.shouldPass(field) { + delete(fields, field) + } + } + + return fields +} + +func (j *Jolokia) Gather(acc plugins.Accumulator) error { + + context := j.Context //"/jolokia/read" + servers := j.Servers + metrics := j.Metrics + tags := j.Tags + + if tags == nil { + tags = map[string]string{} + } + + for _, server := range servers { + for _, metric := range metrics { + + measurement := metric.Name + jmxPath := metric.Jmx + + tags["server"] = server.Name + tags["port"] = server.Port + tags["host"] = server.Host + + // Prepare URL + requestUrl, err := url.Parse("http://" + server.Host + ":" + server.Port + context + jmxPath) + if err != nil { + return err + } + + out, _ := j.getAttr(requestUrl) + + if values, ok := out["value"]; ok { + switch values.(type) { + case map[string]interface{}: + acc.AddFields(measurement, metric.filterFields(values.(map[string]interface{})), tags) + case interface{}: + acc.Add(measurement, values.(interface{}), tags) + } + } else { + fmt.Printf("Missing key 'value' in '%s' output response\n", requestUrl.String()) + } + } + } + + return nil +} + +func init() { + plugins.Add("jolokia", func() plugins.Plugin { + return &Jolokia{jClient: &JolokiaClientImpl{client: &http.Client{}}} + }) +} diff --git a/plugins/jolokia/jolokia_test.go b/plugins/jolokia/jolokia_test.go new file mode 100644 index 000000000..95df76e7b --- /dev/null +++ b/plugins/jolokia/jolokia_test.go @@ -0,0 +1,147 @@ +package jolokia + +import ( + _ "fmt" + "io/ioutil" + "net/http" + "strings" + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + _ "github.com/stretchr/testify/require" +) + +const validMultiValueJSON = ` +{ + "request":{ + "mbean":"java.lang:type=Memory", + "attribute":"HeapMemoryUsage", + "type":"read" + }, + "value":{ + "init":67108864, + "committed":456130560, + "max":477626368, + "used":203288528 + }, + "timestamp":1446129191, + "status":200 +}` + +const validSingleValueJSON = ` +{ + "request":{ + "path":"used", + "mbean":"java.lang:type=Memory", + "attribute":"HeapMemoryUsage", + "type":"read" + }, + "value":209274376, + "timestamp":1446129256, + "status":200 +}` + +const invalidJSON = "I don't think this is JSON" + +const empty = "" + +var Servers = []Server{Server{Name: "as1", Host: "127.0.0.1", Port: "8080"}} +var HeapMetric = Metric{Name: "heap_memory_usage", Jmx: "/java.lang:type=Memory/HeapMemoryUsage"} +var UsedHeapMetric = Metric{Name: "heap_memory_usage", Jmx: "/java.lang:type=Memory/HeapMemoryUsage", Pass: []string{"used"}} + +type jolokiaClientStub struct { + responseBody string + statusCode int +} + +func (c jolokiaClientStub) MakeRequest(req *http.Request) (*http.Response, error) { + resp := http.Response{} + resp.StatusCode = c.statusCode + resp.Body = ioutil.NopCloser(strings.NewReader(c.responseBody)) + return &resp, nil +} + +// Generates a pointer to an HttpJson object that uses a mock HTTP client. +// Parameters: +// response : Body of the response that the mock HTTP client should return +// statusCode: HTTP status code the mock HTTP client should return +// +// Returns: +// *HttpJson: Pointer to an HttpJson object that uses the generated mock HTTP client +func genJolokiaClientStub(response string, statusCode int, servers []Server, metrics []Metric) *Jolokia { + return &Jolokia{ + jClient: jolokiaClientStub{responseBody: response, statusCode: statusCode}, + Servers: servers, + Metrics: metrics, + } +} + +// Test that the proper values are ignored or collected +func TestHttpJsonMultiValue(t *testing.T) { + + jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{HeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Points)) + + assert.True(t, acc.CheckFieldsValue("heap_memory_usage", map[string]interface{}{"init": 67108864.0, + "committed": 456130560.0, + "max": 477626368.0, + "used": 203288528.0})) +} + +// Test that the proper values are ignored or collected +func TestHttpJsonMultiValueWithPass(t *testing.T) { + + jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{UsedHeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Points)) + + assert.True(t, acc.CheckFieldsValue("heap_memory_usage", map[string]interface{}{"used": 203288528.0})) +} + +// Test that the proper values are ignored or collected +func TestHttpJsonMultiValueTags(t *testing.T) { + + jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{UsedHeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Points)) + assert.NoError(t, acc.ValidateTaggedFieldsValue("heap_memory_usage", map[string]interface{}{"used": 203288528.0}, map[string]string{"host": "127.0.0.1", "port": "8080", "server": "as1"})) +} + +// Test that the proper values are ignored or collected +func TestHttpJsonSingleValueTags(t *testing.T) { + + jolokia := genJolokiaClientStub(validSingleValueJSON, 200, Servers, []Metric{UsedHeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Points)) + assert.NoError(t, acc.ValidateTaggedFieldsValue("heap_memory_usage", map[string]interface{}{"value": 209274376.0}, map[string]string{"host": "127.0.0.1", "port": "8080", "server": "as1"})) +} + +// Test that the proper values are ignored or collected +func TestHttpJsonOn404(t *testing.T) { + + jolokia := genJolokiaClientStub(validMultiValueJSON, 404, Servers, []Metric{UsedHeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 0, len(acc.Points)) +} diff --git a/plugins/system/disk.go b/plugins/system/disk.go index 718d79949..784dcdb13 100644 --- a/plugins/system/disk.go +++ b/plugins/system/disk.go @@ -18,8 +18,7 @@ func (_ *DiskStats) Description() string { var diskSampleConfig = ` # By default, telegraf gather stats for all mountpoints. - # Setting mountpoints will restrict the stats to the specified ones. - # mountpoints. + # Setting mountpoints will restrict the stats to the specified mountpoints. # Mountpoints=["/"] ` @@ -64,13 +63,27 @@ func (s *DiskStats) Gather(acc plugins.Accumulator) error { type DiskIOStats struct { ps PS + + Devices []string + SkipSerialNumber bool } func (_ *DiskIOStats) Description() string { return "Read metrics about disk IO by device" } -func (_ *DiskIOStats) SampleConfig() string { return "" } +var diskIoSampleConfig = ` + # By default, telegraf will gather stats for all devices including + # disk partitions. + # Setting devices will restrict the stats to the specified devcies. + # Devices=["sda","sdb"] + # Uncomment the following line if you do not need disk serial numbers. + # SkipSerialNumber = true +` + +func (_ *DiskIOStats) SampleConfig() string { + return diskIoSampleConfig +} func (s *DiskIOStats) Gather(acc plugins.Accumulator) error { diskio, err := s.ps.DiskIO() @@ -78,12 +91,25 @@ func (s *DiskIOStats) Gather(acc plugins.Accumulator) error { return fmt.Errorf("error getting disk io info: %s", err) } + var restrictDevices bool + devices := make(map[string]bool) + if len(s.Devices) != 0 { + restrictDevices = true + for _, dev := range s.Devices { + devices[dev] = true + } + } + for _, io := range diskio { + _, member := devices[io.Name] + if restrictDevices && !member { + continue + } tags := map[string]string{} if len(io.Name) != 0 { tags["name"] = io.Name } - if len(io.SerialNumber) != 0 { + if len(io.SerialNumber) != 0 && !s.SkipSerialNumber { tags["serial"] = io.SerialNumber } diff --git a/plugins/system/system_test.go b/plugins/system/system_test.go index 5839c8c60..389965e3b 100644 --- a/plugins/system/system_test.go +++ b/plugins/system/system_test.go @@ -73,7 +73,8 @@ func TestSystemStats_GenerateStats(t *testing.T) { mps.On("DiskUsage").Return(du, nil) - diskio := disk.DiskIOCountersStat{ + diskio1 := disk.DiskIOCountersStat{ + ReadCount: 888, WriteCount: 5341, ReadBytes: 100000, @@ -84,8 +85,19 @@ func TestSystemStats_GenerateStats(t *testing.T) { IoTime: 123552, SerialNumber: "ab-123-ad", } + diskio2 := disk.DiskIOCountersStat{ + ReadCount: 444, + WriteCount: 2341, + ReadBytes: 200000, + WriteBytes: 400000, + ReadTime: 3123, + WriteTime: 6087, + Name: "sdb1", + IoTime: 246552, + SerialNumber: "bb-123-ad", + } - mps.On("DiskIO").Return(map[string]disk.DiskIOCountersStat{"sda1": diskio}, nil) + mps.On("DiskIO").Return(map[string]disk.DiskIOCountersStat{"sda1": diskio1, "sdb1": diskio2}, nil) netio := net.NetIOCountersStat{ Name: "eth0", @@ -262,21 +274,55 @@ func TestSystemStats_GenerateStats(t *testing.T) { assert.NoError(t, acc.ValidateTaggedValue("drop_in", uint64(7), ntags)) assert.NoError(t, acc.ValidateTaggedValue("drop_out", uint64(1), ntags)) - err = (&DiskIOStats{&mps}).Gather(&acc) + preDiskIOPoints := len(acc.Points) + + err = (&DiskIOStats{ps: &mps}).Gather(&acc) require.NoError(t, err) - dtags := map[string]string{ + numDiskIOPoints := len(acc.Points) - preDiskIOPoints + expectedAllDiskIOPoints := 14 + assert.Equal(t, expectedAllDiskIOPoints, numDiskIOPoints) + + dtags1 := map[string]string{ "name": "sda1", "serial": "ab-123-ad", } + dtags2 := map[string]string{ + "name": "sdb1", + "serial": "bb-123-ad", + } - assert.True(t, acc.CheckTaggedValue("reads", uint64(888), dtags)) - assert.True(t, acc.CheckTaggedValue("writes", uint64(5341), dtags)) - assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(100000), dtags)) - assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(200000), dtags)) - assert.True(t, acc.CheckTaggedValue("read_time", uint64(7123), dtags)) - assert.True(t, acc.CheckTaggedValue("write_time", uint64(9087), dtags)) - assert.True(t, acc.CheckTaggedValue("io_time", uint64(123552), dtags)) + assert.True(t, acc.CheckTaggedValue("reads", uint64(888), dtags1)) + assert.True(t, acc.CheckTaggedValue("writes", uint64(5341), dtags1)) + assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(100000), dtags1)) + assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(200000), dtags1)) + assert.True(t, acc.CheckTaggedValue("read_time", uint64(7123), dtags1)) + assert.True(t, acc.CheckTaggedValue("write_time", uint64(9087), dtags1)) + assert.True(t, acc.CheckTaggedValue("io_time", uint64(123552), dtags1)) + assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags2)) + assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags2)) + assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags2)) + assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags2)) + assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags2)) + assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags2)) + assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags2)) + + // We expect 7 more DiskIOPoints to show up with an explicit match on "sdb1" + // and serial should be missing from the tags with SkipSerialNumber set + err = (&DiskIOStats{ps: &mps, Devices: []string{"sdb1"}, SkipSerialNumber: true}).Gather(&acc) + assert.Equal(t, preDiskIOPoints+expectedAllDiskIOPoints+7, len(acc.Points)) + + dtags3 := map[string]string{ + "name": "sdb1", + } + + assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags3)) + assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags3)) + assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags3)) + assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags3)) + assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags3)) + assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags3)) + assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags3)) err = (&MemStats{&mps}).Gather(&acc) require.NoError(t, err) diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 361f81e9d..d8f44ddf8 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -106,15 +106,20 @@ func (a *Accumulator) Get(measurement string) (*Point, bool) { return nil, false } +// CheckValue calls CheckFieldsValue passing a single-value map as fields +func (a *Accumulator) CheckValue(measurement string, val interface{}) bool { + return a.CheckFieldsValue(measurement, map[string]interface{}{"value": val}) +} + // CheckValue checks that the accumulators point for the given measurement // is the same as the given value. -func (a *Accumulator) CheckValue(measurement string, val interface{}) bool { +func (a *Accumulator) CheckFieldsValue(measurement string, fields map[string]interface{}) bool { for _, p := range a.Points { if p.Measurement == measurement { - return p.Values["value"] == val + return reflect.DeepEqual(fields, p.Values) } } - fmt.Printf("CheckValue failed, measurement %s, value %s", measurement, val) + fmt.Printf("CheckFieldsValue failed, measurement %s, fields %s", measurement, fields) return false } @@ -127,12 +132,35 @@ func (a *Accumulator) CheckTaggedValue( return a.ValidateTaggedValue(measurement, val, tags) == nil } -// ValidateTaggedValue validates that the given measurement and value exist -// in the accumulator and with the given tags. +// ValidateTaggedValue calls ValidateTaggedFieldsValue passing a single-value map as fields func (a *Accumulator) ValidateTaggedValue( measurement string, val interface{}, tags map[string]string, +) error { + return a.ValidateTaggedFieldsValue(measurement, map[string]interface{}{"value": val}, tags) +} + +// ValidateValue calls ValidateTaggedValue +func (a *Accumulator) ValidateValue(measurement string, val interface{}) error { + return a.ValidateTaggedValue(measurement, val, nil) +} + +// CheckTaggedFieldsValue calls ValidateTaggedFieldsValue +func (a *Accumulator) CheckTaggedFieldsValue( + measurement string, + fields map[string]interface{}, + tags map[string]string, +) bool { + return a.ValidateTaggedFieldsValue(measurement, fields, tags) == nil +} + +// ValidateTaggedValue validates that the given measurement and value exist +// in the accumulator and with the given tags. +func (a *Accumulator) ValidateTaggedFieldsValue( + measurement string, + fields map[string]interface{}, + tags map[string]string, ) error { if tags == nil { tags = map[string]string{} @@ -143,9 +171,8 @@ func (a *Accumulator) ValidateTaggedValue( } if p.Measurement == measurement { - if p.Values["value"] != val { - return fmt.Errorf("%v (%T) != %v (%T)", - p.Values["value"], p.Values["value"], val, val) + if !reflect.DeepEqual(fields, p.Values) { + return fmt.Errorf("%v != %v ", fields, p.Values) } return nil } @@ -154,9 +181,12 @@ func (a *Accumulator) ValidateTaggedValue( return fmt.Errorf("unknown measurement %s with tags %v", measurement, tags) } -// ValidateValue calls ValidateTaggedValue -func (a *Accumulator) ValidateValue(measurement string, val interface{}) error { - return a.ValidateTaggedValue(measurement, val, nil) +// ValidateFieldsValue calls ValidateTaggedFieldsValue +func (a *Accumulator) ValidateFieldsValue( + measurement string, + fields map[string]interface{}, +) error { + return a.ValidateTaggedValue(measurement, fields, nil) } func (a *Accumulator) ValidateTaggedFields(