diff --git a/CHANGELOG.md b/CHANGELOG.md index b6cb15f89..5973fe713 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ no longer insert field names when the field is simply named `value`. This is because the `value` field is redundant in the graphite/librato context. ### Features +- [#952](https://github.com/influxdata/telegraf/pull/952): Cassandra input plugin. Thanks @subhachandrachandra! - [#976](https://github.com/influxdata/telegraf/pull/976): Reduce allocations in the UDP and statsd inputs. - [#979](https://github.com/influxdata/telegraf/pull/979): Reduce allocations in the TCP listener. - [#992](https://github.com/influxdata/telegraf/pull/992): Refactor allocations in TCP/UDP listeners. diff --git a/README.md b/README.md index 994c6803d..9c073f416 100644 --- a/README.md +++ b/README.md @@ -160,6 +160,7 @@ Currently implemented sources: * [aerospike](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/aerospike) * [apache](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/apache) * [bcache](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/bcache) +* [cassandra](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/cassandra) * [couchbase](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/couchbase) * [couchdb](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/couchdb) * [disque](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/disque) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 2d784ca27..3f56ee541 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -4,6 +4,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/aerospike" _ "github.com/influxdata/telegraf/plugins/inputs/apache" _ "github.com/influxdata/telegraf/plugins/inputs/bcache" + _ "github.com/influxdata/telegraf/plugins/inputs/cassandra" _ "github.com/influxdata/telegraf/plugins/inputs/cloudwatch" _ "github.com/influxdata/telegraf/plugins/inputs/couchbase" _ "github.com/influxdata/telegraf/plugins/inputs/couchdb" diff --git a/plugins/inputs/cassandra/README.md b/plugins/inputs/cassandra/README.md new file mode 100644 index 000000000..bfbcff77c --- /dev/null +++ b/plugins/inputs/cassandra/README.md @@ -0,0 +1,125 @@ +# Telegraf plugin: Cassandra + +#### Plugin arguments: +- **context** string: Context root used for jolokia url +- **servers** []string: List of servers with the format ":port" +- **metrics** []string: List of Jmx paths that identify mbeans attributes + +#### Description + +The Cassandra plugin collects Cassandra/JVM metrics exposed as MBean's attributes through jolokia REST endpoint. All metrics are collected for each server configured. + +See: https://jolokia.org/ and [Cassandra Documentation](http://docs.datastax.com/en/cassandra/3.x/cassandra/operations/monitoringCassandraTOC.html) + +# Measurements: +Cassandra plugin produces one or more measurements for each metric configured, adding Server's name as `host` tag. More than one measurement is generated when querying table metrics with a wildcard for the keyspace or table name. + +Given a configuration like: + +```toml +[[inputs.cassandra]] + context = "/jolokia/read" + servers = [":8778"] + metrics = ["/java.lang:type=Memory/HeapMemoryUsage"] +``` + +The collected metrics will be: + +``` +javaMemory,host=myHost,mname=HeapMemoryUsage HeapMemoryUsage_committed=1040187392,HeapMemoryUsage_init=1050673152,HeapMemoryUsage_max=1040187392,HeapMemoryUsage_used=368155000 1459551767230567084 +``` + +# Useful Metrics: + +Here is a list of metrics that might be useful to monitor your cassandra cluster. This was put together from multiple sources on the web. + +- [How to monitor Cassandra performance metrics](https://www.datadoghq.com/blog/how-to-monitor-cassandra-performance-metrics) +- [Cassandra Documentation](http://docs.datastax.com/en/cassandra/3.x/cassandra/operations/monitoringCassandraTOC.html) + +####measurement = javaGarbageCollector + +- /java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionTime +- /java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionCount +- /java.lang:type=GarbageCollector,name=ParNew/CollectionTime +- /java.lang:type=GarbageCollector,name=ParNew/CollectionCount + +####measurement = javaMemory + +- /java.lang:type=Memory/HeapMemoryUsage +- /java.lang:type=Memory/NonHeapMemoryUsage + +####measurement = cassandraCache + +- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Hit +- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Requests +- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Entries +- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Size +- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Capacity +- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Hit +- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Requests +- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Entries +- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Size +- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Capacity + +####measurement = cassandraClient + +- /org.apache.cassandra.metrics:type=Client,name=connectedNativeClients + +####measurement = cassandraClientRequest + +- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=TotalLatency +- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=TotalLatency +- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Latency +- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Latency +- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Timeouts +- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Timeouts +- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Unavailables +- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Unavailables +- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Failures +- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Failures + +####measurement = cassandraCommitLog + +- /org.apache.cassandra.metrics:type=CommitLog,name=PendingTasks +- /org.apache.cassandra.metrics:type=CommitLog,name=TotalCommitLogSize + +####measurement = cassandraCompaction + +- /org.apache.cassandra.metrics:type=Compaction,name=CompletedTask +- /org.apache.cassandra.metrics:type=Compaction,name=PendingTasks +- /org.apache.cassandra.metrics:type=Compaction,name=TotalCompactionsCompleted +- /org.apache.cassandra.metrics:type=Compaction,name=BytesCompacted + +####measurement = cassandraStorage + +- /org.apache.cassandra.metrics:type=Storage,name=Load +- /org.apache.cassandra.metrics:type=Storage,name=Exceptions + +####measurement = cassandraTable +Using wildcards for "keyspace" and "scope" can create a lot of series as metrics will be reported for every table and keyspace including internal system tables. Specify a keyspace name and/or a table name to limit them. + +- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=LiveDiskSpaceUsed +- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=TotalDiskSpaceUsed +- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=ReadLatency +- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=CoordinatorReadLatency +- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=WriteLatency +- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=ReadTotalLatency +- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=WriteTotalLatency + + +####measurement = cassandraThreadPools + +- /org.apache.cassandra.metrics:type=ThreadPools,path=internal,scope=CompactionExecutor,name=ActiveTasks +- /org.apache.cassandra.metrics:type=ThreadPools,path=internal,scope=AntiEntropyStage,name=ActiveTasks +- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=CounterMutationStage,name=PendingTasks +- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=CounterMutationStage,name=CurrentlyBlockedTasks +- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=MutationStage,name=PendingTasks +- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=MutationStage,name=CurrentlyBlockedTasks +- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadRepairStage,name=PendingTasks +- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadRepairStage,name=CurrentlyBlockedTasks +- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadStage,name=PendingTasks +- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadStage,name=CurrentlyBlockedTasks +- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=RequestResponseStage,name=PendingTasks +- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=RequestResponseStage,name=CurrentlyBlockedTasks + + diff --git a/plugins/inputs/cassandra/cassandra.go b/plugins/inputs/cassandra/cassandra.go new file mode 100644 index 000000000..b7525de1a --- /dev/null +++ b/plugins/inputs/cassandra/cassandra.go @@ -0,0 +1,318 @@ +package cassandra + +import ( + "encoding/json" + "errors" + "fmt" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "io/ioutil" + "net/http" + "net/url" + //"reflect" + "strings" +) + +/*type Server struct { + Host string + Username string + Password string + Port string +}*/ + +type JolokiaClient interface { + MakeRequest(req *http.Request) (*http.Response, error) +} + +type JolokiaClientImpl struct { + client *http.Client +} + +func (c JolokiaClientImpl) MakeRequest(req *http.Request) (*http.Response, error) { + return c.client.Do(req) +} + +type Cassandra struct { + jClient JolokiaClient + Context string + Servers []string + Metrics []string +} + +type javaMetric struct { + host string + metric string + acc telegraf.Accumulator +} + +type cassandraMetric struct { + host string + metric string + acc telegraf.Accumulator +} + +type jmxMetric interface { + addTagsFields(out map[string]interface{}) +} + +func addServerTags(host string, tags map[string]string) { + if host != "" && host != "localhost" && host != "127.0.0.1" { + tags["host"] = host + } +} + +func newJavaMetric(host string, metric string, + acc telegraf.Accumulator) *javaMetric { + return &javaMetric{host: host, metric: metric, acc: acc} +} + +func newCassandraMetric(host string, metric string, + acc telegraf.Accumulator) *cassandraMetric { + return &cassandraMetric{host: host, metric: metric, acc: acc} +} + +func addValuesAsFields(values map[string]interface{}, fields map[string]interface{}, + mname string) { + for k, v := range values { + if v != nil { + fields[mname+"_"+k] = v + } + } +} + +func parseJmxMetricRequest(mbean string) map[string]string { + tokens := make(map[string]string) + classAndPairs := strings.Split(mbean, ":") + if classAndPairs[0] == "org.apache.cassandra.metrics" { + tokens["class"] = "cassandra" + } else if classAndPairs[0] == "java.lang" { + tokens["class"] = "java" + } else { + return tokens + } + pairs := strings.Split(classAndPairs[1], ",") + for _, pair := range pairs { + p := strings.Split(pair, "=") + tokens[p[0]] = p[1] + } + return tokens +} + +func addTokensToTags(tokens map[string]string, tags map[string]string) { + for k, v := range tokens { + if k == "name" { + tags["mname"] = v // name seems to a reserved word in influxdb + } else if k == "class" || k == "type" { + continue // class and type are used in the metric name + } else { + tags[k] = v + } + } +} + +func (j javaMetric) addTagsFields(out map[string]interface{}) { + tags := make(map[string]string) + fields := make(map[string]interface{}) + + a := out["request"].(map[string]interface{}) + attribute := a["attribute"].(string) + mbean := a["mbean"].(string) + + tokens := parseJmxMetricRequest(mbean) + addTokensToTags(tokens, tags) + addServerTags(j.host, tags) + + if _, ok := tags["mname"]; !ok { + //Queries for a single value will not return a "name" tag in the response. + tags["mname"] = attribute + } + + if values, ok := out["value"]; ok { + switch t := values.(type) { + case map[string]interface{}: + addValuesAsFields(values.(map[string]interface{}), fields, attribute) + case interface{}: + fields[attribute] = t + } + j.acc.AddFields(tokens["class"]+tokens["type"], fields, tags) + } else { + fmt.Printf("Missing key 'value' in '%s' output response\n%v\n", + j.metric, out) + } +} + +func addCassandraMetric(mbean string, c cassandraMetric, + values map[string]interface{}) { + + tags := make(map[string]string) + fields := make(map[string]interface{}) + tokens := parseJmxMetricRequest(mbean) + addTokensToTags(tokens, tags) + addServerTags(c.host, tags) + addValuesAsFields(values, fields, tags["mname"]) + c.acc.AddFields(tokens["class"]+tokens["type"], fields, tags) + +} + +func (c cassandraMetric) addTagsFields(out map[string]interface{}) { + + r := out["request"] + + tokens := parseJmxMetricRequest(r.(map[string]interface{})["mbean"].(string)) + // Requests with wildcards for keyspace or table names will return nested + // maps in the json response + if tokens["type"] == "Table" && (tokens["keyspace"] == "*" || + tokens["scope"] == "*") { + if valuesMap, ok := out["value"]; ok { + for k, v := range valuesMap.(map[string]interface{}) { + addCassandraMetric(k, c, v.(map[string]interface{})) + } + } else { + fmt.Printf("Missing key 'value' in '%s' output response\n%v\n", + c.metric, out) + return + } + } else { + if values, ok := out["value"]; ok { + addCassandraMetric(r.(map[string]interface{})["mbean"].(string), + c, values.(map[string]interface{})) + } else { + fmt.Printf("Missing key 'value' in '%s' output response\n%v\n", + c.metric, out) + return + } + } +} + +func (j *Cassandra) SampleConfig() string { + return ` + # This is the context root used to compose the jolokia url + context = "/jolokia/read" + ## List of cassandra servers exposing jolokia read service + servers = ["myuser:mypassword@10.10.10.1:8778","10.10.10.2:8778",":8778"] + ## List of metrics collected on above servers + ## Each metric consists of a jmx path. + ## This will collect all heap memory usage metrics from the jvm and + ## ReadLatency metrics for all keyspaces and tables. + ## "type=Table" in the query works with Cassandra3.0. Older versions might + ## need to use "type=ColumnFamily" + metrics = [ + "/java.lang:type=Memory/HeapMemoryUsage", + "/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency" + ] +` +} + +func (j *Cassandra) Description() string { + return "Read Cassandra metrics through Jolokia" +} + +func (j *Cassandra) getAttr(requestUrl *url.URL) (map[string]interface{}, error) { + // Create + send request + req, err := http.NewRequest("GET", requestUrl.String(), nil) + if err != nil { + return nil, err + } + + resp, err := j.jClient.MakeRequest(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + // Process response + if resp.StatusCode != http.StatusOK { + err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)", + requestUrl, + resp.StatusCode, + http.StatusText(resp.StatusCode), + http.StatusOK, + http.StatusText(http.StatusOK)) + return nil, err + } + + // read body + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + // Unmarshal json + var jsonOut map[string]interface{} + if err = json.Unmarshal([]byte(body), &jsonOut); err != nil { + return nil, errors.New("Error decoding JSON response") + } + + return jsonOut, nil +} + +func parseServerTokens(server string) map[string]string { + serverTokens := make(map[string]string) + + hostAndUser := strings.Split(server, "@") + hostPort := "" + userPasswd := "" + if len(hostAndUser) == 2 { + hostPort = hostAndUser[1] + userPasswd = hostAndUser[0] + } else { + hostPort = hostAndUser[0] + } + hostTokens := strings.Split(hostPort, ":") + serverTokens["host"] = hostTokens[0] + serverTokens["port"] = hostTokens[1] + + if userPasswd != "" { + userTokens := strings.Split(userPasswd, ":") + serverTokens["user"] = userTokens[0] + serverTokens["passwd"] = userTokens[1] + } + return serverTokens +} + +func (c *Cassandra) Gather(acc telegraf.Accumulator) error { + context := c.Context + servers := c.Servers + metrics := c.Metrics + + for _, server := range servers { + for _, metric := range metrics { + var m jmxMetric + + serverTokens := parseServerTokens(server) + + if strings.HasPrefix(metric, "/java.lang:") { + m = newJavaMetric(serverTokens["host"], metric, acc) + } else if strings.HasPrefix(metric, + "/org.apache.cassandra.metrics:") { + m = newCassandraMetric(serverTokens["host"], metric, acc) + } + + // Prepare URL + requestUrl, err := url.Parse("http://" + serverTokens["host"] + ":" + + serverTokens["port"] + context + metric) + if err != nil { + return err + } + if serverTokens["user"] != "" && serverTokens["passwd"] != "" { + requestUrl.User = url.UserPassword(serverTokens["user"], + serverTokens["passwd"]) + } + fmt.Printf("host %s url %s\n", serverTokens["host"], requestUrl) + + out, err := c.getAttr(requestUrl) + if out["status"] != 200.0 { + fmt.Printf("URL returned with status %v\n", out["status"]) + continue + } + m.addTagsFields(out) + } + } + return nil +} + +func init() { + inputs.Add("cassandra", func() telegraf.Input { + return &Cassandra{jClient: &JolokiaClientImpl{client: &http.Client{}}} + }) +} diff --git a/plugins/inputs/cassandra/cassandra_test.go b/plugins/inputs/cassandra/cassandra_test.go new file mode 100644 index 000000000..184fa3bbb --- /dev/null +++ b/plugins/inputs/cassandra/cassandra_test.go @@ -0,0 +1,286 @@ +package cassandra + +import ( + _ "fmt" + "io/ioutil" + "net/http" + "strings" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + _ "github.com/stretchr/testify/require" +) + +const validJavaMultiValueJSON = ` +{ + "request":{ + "mbean":"java.lang:type=Memory", + "attribute":"HeapMemoryUsage", + "type":"read" + }, + "value":{ + "init":67108864, + "committed":456130560, + "max":477626368, + "used":203288528 + }, + "timestamp":1446129191, + "status":200 +}` + +const validCassandraMultiValueJSON = ` +{ + "request": { + "mbean": "org.apache.cassandra.metrics:keyspace=test_keyspace1,name=ReadLatency,scope=test_table,type=Table", + "type": "read"}, + "status": 200, + "timestamp": 1458089229, + "value": { + "999thPercentile": 20.0, + "99thPercentile": 10.0, + "Count": 400, + "DurationUnit": "microseconds", + "Max": 30.0, + "Mean": null, + "MeanRate": 3.0, + "Min": 1.0, + "RateUnit": "events/second", + "StdDev": null + } +}` + +const validCassandraNestedMultiValueJSON = ` +{ + "request": { + "mbean": "org.apache.cassandra.metrics:keyspace=test_keyspace1,name=ReadLatency,scope=*,type=Table", + "type": "read"}, + "status": 200, + "timestamp": 1458089184, + "value": { + "org.apache.cassandra.metrics:keyspace=test_keyspace1,name=ReadLatency,scope=test_table1,type=Table": + { "999thPercentile": 1.0, + "Count": 100, + "DurationUnit": "microseconds", + "OneMinuteRate": 1.0, + "RateUnit": "events/second", + "StdDev": null + }, + "org.apache.cassandra.metrics:keyspace=test_keyspace2,name=ReadLatency,scope=test_table2,type=Table": + { "999thPercentile": 2.0, + "Count": 200, + "DurationUnit": "microseconds", + "OneMinuteRate": 2.0, + "RateUnit": "events/second", + "StdDev": null + } + } +}` + +const validSingleValueJSON = ` +{ + "request":{ + "path":"used", + "mbean":"java.lang:type=Memory", + "attribute":"HeapMemoryUsage", + "type":"read" + }, + "value":209274376, + "timestamp":1446129256, + "status":200 +}` + +const validJavaMultiTypeJSON = ` +{ + "request":{ + "mbean":"java.lang:name=ConcurrentMarkSweep,type=GarbageCollector", + "attribute":"CollectionCount", + "type":"read" + }, + "value":1, + "timestamp":1459316570, + "status":200 +}` + +const invalidJSON = "I don't think this is JSON" + +const empty = "" + +var Servers = []string{"10.10.10.10:8778"} +var AuthServers = []string{"user:passwd@10.10.10.10:8778"} +var MultipleServers = []string{"10.10.10.10:8778", "10.10.10.11:8778"} +var HeapMetric = "/java.lang:type=Memory/HeapMemoryUsage" +var ReadLatencyMetric = "/org.apache.cassandra.metrics:type=Table,keyspace=test_keyspace1,scope=test_table,name=ReadLatency" +var NestedReadLatencyMetric = "/org.apache.cassandra.metrics:type=Table,keyspace=test_keyspace1,scope=*,name=ReadLatency" +var GarbageCollectorMetric1 = "/java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionCount" +var GarbageCollectorMetric2 = "/java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionTime" +var Context = "/jolokia/read" + +type jolokiaClientStub struct { + responseBody string + statusCode int +} + +func (c jolokiaClientStub) MakeRequest(req *http.Request) (*http.Response, error) { + resp := http.Response{} + resp.StatusCode = c.statusCode + resp.Body = ioutil.NopCloser(strings.NewReader(c.responseBody)) + return &resp, nil +} + +// Generates a pointer to an HttpJson object that uses a mock HTTP client. +// Parameters: +// response : Body of the response that the mock HTTP client should return +// statusCode: HTTP status code the mock HTTP client should return +// +// Returns: +// *HttpJson: Pointer to an HttpJson object that uses the generated mock HTTP client +func genJolokiaClientStub(response string, statusCode int, servers []string, metrics []string) *Cassandra { + return &Cassandra{ + jClient: jolokiaClientStub{responseBody: response, statusCode: statusCode}, + Context: Context, + Servers: servers, + Metrics: metrics, + } +} + +// Test that the proper values are ignored or collected for class=Java +func TestHttpJsonJavaMultiValue(t *testing.T) { + cassandra := genJolokiaClientStub(validJavaMultiValueJSON, 200, + MultipleServers, []string{HeapMetric}) + + var acc testutil.Accumulator + acc.SetDebug(true) + err := cassandra.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 2, len(acc.Metrics)) + + fields := map[string]interface{}{ + "HeapMemoryUsage_init": 67108864.0, + "HeapMemoryUsage_committed": 456130560.0, + "HeapMemoryUsage_max": 477626368.0, + "HeapMemoryUsage_used": 203288528.0, + } + tags1 := map[string]string{ + "host": "10.10.10.10", + "mname": "HeapMemoryUsage", + } + + tags2 := map[string]string{ + "host": "10.10.10.11", + "mname": "HeapMemoryUsage", + } + acc.AssertContainsTaggedFields(t, "javaMemory", fields, tags1) + acc.AssertContainsTaggedFields(t, "javaMemory", fields, tags2) +} + +func TestHttpJsonJavaMultiType(t *testing.T) { + cassandra := genJolokiaClientStub(validJavaMultiTypeJSON, 200, AuthServers, []string{GarbageCollectorMetric1, GarbageCollectorMetric2}) + + var acc testutil.Accumulator + acc.SetDebug(true) + err := cassandra.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 2, len(acc.Metrics)) + + fields := map[string]interface{}{ + "CollectionCount": 1.0, + } + + tags := map[string]string{ + "host": "10.10.10.10", + "mname": "ConcurrentMarkSweep", + } + acc.AssertContainsTaggedFields(t, "javaGarbageCollector", fields, tags) +} + +// Test that the proper values are ignored or collected +func TestHttpJsonOn404(t *testing.T) { + + jolokia := genJolokiaClientStub(validJavaMultiValueJSON, 404, Servers, + []string{HeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 0, len(acc.Metrics)) +} + +// Test that the proper values are ignored or collected for class=Cassandra +func TestHttpJsonCassandraMultiValue(t *testing.T) { + cassandra := genJolokiaClientStub(validCassandraMultiValueJSON, 200, Servers, []string{ReadLatencyMetric}) + + var acc testutil.Accumulator + err := cassandra.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Metrics)) + + fields := map[string]interface{}{ + "ReadLatency_999thPercentile": 20.0, + "ReadLatency_99thPercentile": 10.0, + "ReadLatency_Count": 400.0, + "ReadLatency_DurationUnit": "microseconds", + "ReadLatency_Max": 30.0, + "ReadLatency_MeanRate": 3.0, + "ReadLatency_Min": 1.0, + "ReadLatency_RateUnit": "events/second", + } + + tags := map[string]string{ + "host": "10.10.10.10", + "mname": "ReadLatency", + "keyspace": "test_keyspace1", + "scope": "test_table", + } + acc.AssertContainsTaggedFields(t, "cassandraTable", fields, tags) +} + +// Test that the proper values are ignored or collected for class=Cassandra with +// nested values +func TestHttpJsonCassandraNestedMultiValue(t *testing.T) { + cassandra := genJolokiaClientStub(validCassandraNestedMultiValueJSON, 200, Servers, []string{NestedReadLatencyMetric}) + + var acc testutil.Accumulator + acc.SetDebug(true) + err := cassandra.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 2, len(acc.Metrics)) + + fields1 := map[string]interface{}{ + "ReadLatency_999thPercentile": 1.0, + "ReadLatency_Count": 100.0, + "ReadLatency_DurationUnit": "microseconds", + "ReadLatency_OneMinuteRate": 1.0, + "ReadLatency_RateUnit": "events/second", + } + + fields2 := map[string]interface{}{ + "ReadLatency_999thPercentile": 2.0, + "ReadLatency_Count": 200.0, + "ReadLatency_DurationUnit": "microseconds", + "ReadLatency_OneMinuteRate": 2.0, + "ReadLatency_RateUnit": "events/second", + } + + tags1 := map[string]string{ + "host": "10.10.10.10", + "mname": "ReadLatency", + "keyspace": "test_keyspace1", + "scope": "test_table1", + } + + tags2 := map[string]string{ + "host": "10.10.10.10", + "mname": "ReadLatency", + "keyspace": "test_keyspace2", + "scope": "test_table2", + } + + acc.AssertContainsTaggedFields(t, "cassandraTable", fields1, tags1) + acc.AssertContainsTaggedFields(t, "cassandraTable", fields2, tags2) +}