Added cassandra plugin to access metrics using jolokia and push them to influxdb.
This commit is contained in:
parent
ae8cf8c35e
commit
ed809fed68
|
@ -9,6 +9,7 @@ no longer insert field names when the field is simply named `value`. This is
|
||||||
because the `value` field is redundant in the graphite/librato context.
|
because the `value` field is redundant in the graphite/librato context.
|
||||||
|
|
||||||
### Features
|
### Features
|
||||||
|
- [#952](https://github.com/influxdata/telegraf/pull/952): Cassandra input plugin. Thanks @subhachandrachandra!
|
||||||
- [#976](https://github.com/influxdata/telegraf/pull/976): Reduce allocations in the UDP and statsd inputs.
|
- [#976](https://github.com/influxdata/telegraf/pull/976): Reduce allocations in the UDP and statsd inputs.
|
||||||
- [#979](https://github.com/influxdata/telegraf/pull/979): Reduce allocations in the TCP listener.
|
- [#979](https://github.com/influxdata/telegraf/pull/979): Reduce allocations in the TCP listener.
|
||||||
- [#992](https://github.com/influxdata/telegraf/pull/992): Refactor allocations in TCP/UDP listeners.
|
- [#992](https://github.com/influxdata/telegraf/pull/992): Refactor allocations in TCP/UDP listeners.
|
||||||
|
|
|
@ -160,6 +160,7 @@ Currently implemented sources:
|
||||||
* [aerospike](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/aerospike)
|
* [aerospike](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/aerospike)
|
||||||
* [apache](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/apache)
|
* [apache](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/apache)
|
||||||
* [bcache](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/bcache)
|
* [bcache](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/bcache)
|
||||||
|
* [cassandra](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/cassandra)
|
||||||
* [couchbase](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/couchbase)
|
* [couchbase](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/couchbase)
|
||||||
* [couchdb](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/couchdb)
|
* [couchdb](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/couchdb)
|
||||||
* [disque](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/disque)
|
* [disque](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/disque)
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/aerospike"
|
_ "github.com/influxdata/telegraf/plugins/inputs/aerospike"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/apache"
|
_ "github.com/influxdata/telegraf/plugins/inputs/apache"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
|
_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/inputs/cassandra"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/cloudwatch"
|
_ "github.com/influxdata/telegraf/plugins/inputs/cloudwatch"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/couchbase"
|
_ "github.com/influxdata/telegraf/plugins/inputs/couchbase"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/couchdb"
|
_ "github.com/influxdata/telegraf/plugins/inputs/couchdb"
|
||||||
|
|
|
@ -0,0 +1,125 @@
|
||||||
|
# Telegraf plugin: Cassandra
|
||||||
|
|
||||||
|
#### Plugin arguments:
|
||||||
|
- **context** string: Context root used for jolokia url
|
||||||
|
- **servers** []string: List of servers with the format "<user:passwd@><host>:port"
|
||||||
|
- **metrics** []string: List of Jmx paths that identify mbeans attributes
|
||||||
|
|
||||||
|
#### Description
|
||||||
|
|
||||||
|
The Cassandra plugin collects Cassandra/JVM metrics exposed as MBean's attributes through jolokia REST endpoint. All metrics are collected for each server configured.
|
||||||
|
|
||||||
|
See: https://jolokia.org/ and [Cassandra Documentation](http://docs.datastax.com/en/cassandra/3.x/cassandra/operations/monitoringCassandraTOC.html)
|
||||||
|
|
||||||
|
# Measurements:
|
||||||
|
Cassandra plugin produces one or more measurements for each metric configured, adding Server's name as `host` tag. More than one measurement is generated when querying table metrics with a wildcard for the keyspace or table name.
|
||||||
|
|
||||||
|
Given a configuration like:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[inputs.cassandra]]
|
||||||
|
context = "/jolokia/read"
|
||||||
|
servers = [":8778"]
|
||||||
|
metrics = ["/java.lang:type=Memory/HeapMemoryUsage"]
|
||||||
|
```
|
||||||
|
|
||||||
|
The collected metrics will be:
|
||||||
|
|
||||||
|
```
|
||||||
|
javaMemory,host=myHost,mname=HeapMemoryUsage HeapMemoryUsage_committed=1040187392,HeapMemoryUsage_init=1050673152,HeapMemoryUsage_max=1040187392,HeapMemoryUsage_used=368155000 1459551767230567084
|
||||||
|
```
|
||||||
|
|
||||||
|
# Useful Metrics:
|
||||||
|
|
||||||
|
Here is a list of metrics that might be useful to monitor your cassandra cluster. This was put together from multiple sources on the web.
|
||||||
|
|
||||||
|
- [How to monitor Cassandra performance metrics](https://www.datadoghq.com/blog/how-to-monitor-cassandra-performance-metrics)
|
||||||
|
- [Cassandra Documentation](http://docs.datastax.com/en/cassandra/3.x/cassandra/operations/monitoringCassandraTOC.html)
|
||||||
|
|
||||||
|
####measurement = javaGarbageCollector
|
||||||
|
|
||||||
|
- /java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionTime
|
||||||
|
- /java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionCount
|
||||||
|
- /java.lang:type=GarbageCollector,name=ParNew/CollectionTime
|
||||||
|
- /java.lang:type=GarbageCollector,name=ParNew/CollectionCount
|
||||||
|
|
||||||
|
####measurement = javaMemory
|
||||||
|
|
||||||
|
- /java.lang:type=Memory/HeapMemoryUsage
|
||||||
|
- /java.lang:type=Memory/NonHeapMemoryUsage
|
||||||
|
|
||||||
|
####measurement = cassandraCache
|
||||||
|
|
||||||
|
- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Hit
|
||||||
|
- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Requests
|
||||||
|
- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Entries
|
||||||
|
- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Size
|
||||||
|
- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Capacity
|
||||||
|
- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Hit
|
||||||
|
- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Requests
|
||||||
|
- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Entries
|
||||||
|
- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Size
|
||||||
|
- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Capacity
|
||||||
|
|
||||||
|
####measurement = cassandraClient
|
||||||
|
|
||||||
|
- /org.apache.cassandra.metrics:type=Client,name=connectedNativeClients
|
||||||
|
|
||||||
|
####measurement = cassandraClientRequest
|
||||||
|
|
||||||
|
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=TotalLatency
|
||||||
|
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=TotalLatency
|
||||||
|
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Latency
|
||||||
|
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Latency
|
||||||
|
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Timeouts
|
||||||
|
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Timeouts
|
||||||
|
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Unavailables
|
||||||
|
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Unavailables
|
||||||
|
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Failures
|
||||||
|
- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Failures
|
||||||
|
|
||||||
|
####measurement = cassandraCommitLog
|
||||||
|
|
||||||
|
- /org.apache.cassandra.metrics:type=CommitLog,name=PendingTasks
|
||||||
|
- /org.apache.cassandra.metrics:type=CommitLog,name=TotalCommitLogSize
|
||||||
|
|
||||||
|
####measurement = cassandraCompaction
|
||||||
|
|
||||||
|
- /org.apache.cassandra.metrics:type=Compaction,name=CompletedTask
|
||||||
|
- /org.apache.cassandra.metrics:type=Compaction,name=PendingTasks
|
||||||
|
- /org.apache.cassandra.metrics:type=Compaction,name=TotalCompactionsCompleted
|
||||||
|
- /org.apache.cassandra.metrics:type=Compaction,name=BytesCompacted
|
||||||
|
|
||||||
|
####measurement = cassandraStorage
|
||||||
|
|
||||||
|
- /org.apache.cassandra.metrics:type=Storage,name=Load
|
||||||
|
- /org.apache.cassandra.metrics:type=Storage,name=Exceptions
|
||||||
|
|
||||||
|
####measurement = cassandraTable
|
||||||
|
Using wildcards for "keyspace" and "scope" can create a lot of series as metrics will be reported for every table and keyspace including internal system tables. Specify a keyspace name and/or a table name to limit them.
|
||||||
|
|
||||||
|
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=LiveDiskSpaceUsed
|
||||||
|
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=TotalDiskSpaceUsed
|
||||||
|
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=ReadLatency
|
||||||
|
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=CoordinatorReadLatency
|
||||||
|
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=WriteLatency
|
||||||
|
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=ReadTotalLatency
|
||||||
|
- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=WriteTotalLatency
|
||||||
|
|
||||||
|
|
||||||
|
####measurement = cassandraThreadPools
|
||||||
|
|
||||||
|
- /org.apache.cassandra.metrics:type=ThreadPools,path=internal,scope=CompactionExecutor,name=ActiveTasks
|
||||||
|
- /org.apache.cassandra.metrics:type=ThreadPools,path=internal,scope=AntiEntropyStage,name=ActiveTasks
|
||||||
|
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=CounterMutationStage,name=PendingTasks
|
||||||
|
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=CounterMutationStage,name=CurrentlyBlockedTasks
|
||||||
|
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=MutationStage,name=PendingTasks
|
||||||
|
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=MutationStage,name=CurrentlyBlockedTasks
|
||||||
|
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadRepairStage,name=PendingTasks
|
||||||
|
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadRepairStage,name=CurrentlyBlockedTasks
|
||||||
|
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadStage,name=PendingTasks
|
||||||
|
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadStage,name=CurrentlyBlockedTasks
|
||||||
|
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=RequestResponseStage,name=PendingTasks
|
||||||
|
- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=RequestResponseStage,name=CurrentlyBlockedTasks
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,318 @@
|
||||||
|
package cassandra
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
//"reflect"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
/*type Server struct {
|
||||||
|
Host string
|
||||||
|
Username string
|
||||||
|
Password string
|
||||||
|
Port string
|
||||||
|
}*/
|
||||||
|
|
||||||
|
type JolokiaClient interface {
|
||||||
|
MakeRequest(req *http.Request) (*http.Response, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type JolokiaClientImpl struct {
|
||||||
|
client *http.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c JolokiaClientImpl) MakeRequest(req *http.Request) (*http.Response, error) {
|
||||||
|
return c.client.Do(req)
|
||||||
|
}
|
||||||
|
|
||||||
|
type Cassandra struct {
|
||||||
|
jClient JolokiaClient
|
||||||
|
Context string
|
||||||
|
Servers []string
|
||||||
|
Metrics []string
|
||||||
|
}
|
||||||
|
|
||||||
|
type javaMetric struct {
|
||||||
|
host string
|
||||||
|
metric string
|
||||||
|
acc telegraf.Accumulator
|
||||||
|
}
|
||||||
|
|
||||||
|
type cassandraMetric struct {
|
||||||
|
host string
|
||||||
|
metric string
|
||||||
|
acc telegraf.Accumulator
|
||||||
|
}
|
||||||
|
|
||||||
|
type jmxMetric interface {
|
||||||
|
addTagsFields(out map[string]interface{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func addServerTags(host string, tags map[string]string) {
|
||||||
|
if host != "" && host != "localhost" && host != "127.0.0.1" {
|
||||||
|
tags["host"] = host
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newJavaMetric(host string, metric string,
|
||||||
|
acc telegraf.Accumulator) *javaMetric {
|
||||||
|
return &javaMetric{host: host, metric: metric, acc: acc}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newCassandraMetric(host string, metric string,
|
||||||
|
acc telegraf.Accumulator) *cassandraMetric {
|
||||||
|
return &cassandraMetric{host: host, metric: metric, acc: acc}
|
||||||
|
}
|
||||||
|
|
||||||
|
func addValuesAsFields(values map[string]interface{}, fields map[string]interface{},
|
||||||
|
mname string) {
|
||||||
|
for k, v := range values {
|
||||||
|
if v != nil {
|
||||||
|
fields[mname+"_"+k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseJmxMetricRequest(mbean string) map[string]string {
|
||||||
|
tokens := make(map[string]string)
|
||||||
|
classAndPairs := strings.Split(mbean, ":")
|
||||||
|
if classAndPairs[0] == "org.apache.cassandra.metrics" {
|
||||||
|
tokens["class"] = "cassandra"
|
||||||
|
} else if classAndPairs[0] == "java.lang" {
|
||||||
|
tokens["class"] = "java"
|
||||||
|
} else {
|
||||||
|
return tokens
|
||||||
|
}
|
||||||
|
pairs := strings.Split(classAndPairs[1], ",")
|
||||||
|
for _, pair := range pairs {
|
||||||
|
p := strings.Split(pair, "=")
|
||||||
|
tokens[p[0]] = p[1]
|
||||||
|
}
|
||||||
|
return tokens
|
||||||
|
}
|
||||||
|
|
||||||
|
func addTokensToTags(tokens map[string]string, tags map[string]string) {
|
||||||
|
for k, v := range tokens {
|
||||||
|
if k == "name" {
|
||||||
|
tags["mname"] = v // name seems to a reserved word in influxdb
|
||||||
|
} else if k == "class" || k == "type" {
|
||||||
|
continue // class and type are used in the metric name
|
||||||
|
} else {
|
||||||
|
tags[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j javaMetric) addTagsFields(out map[string]interface{}) {
|
||||||
|
tags := make(map[string]string)
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
|
||||||
|
a := out["request"].(map[string]interface{})
|
||||||
|
attribute := a["attribute"].(string)
|
||||||
|
mbean := a["mbean"].(string)
|
||||||
|
|
||||||
|
tokens := parseJmxMetricRequest(mbean)
|
||||||
|
addTokensToTags(tokens, tags)
|
||||||
|
addServerTags(j.host, tags)
|
||||||
|
|
||||||
|
if _, ok := tags["mname"]; !ok {
|
||||||
|
//Queries for a single value will not return a "name" tag in the response.
|
||||||
|
tags["mname"] = attribute
|
||||||
|
}
|
||||||
|
|
||||||
|
if values, ok := out["value"]; ok {
|
||||||
|
switch t := values.(type) {
|
||||||
|
case map[string]interface{}:
|
||||||
|
addValuesAsFields(values.(map[string]interface{}), fields, attribute)
|
||||||
|
case interface{}:
|
||||||
|
fields[attribute] = t
|
||||||
|
}
|
||||||
|
j.acc.AddFields(tokens["class"]+tokens["type"], fields, tags)
|
||||||
|
} else {
|
||||||
|
fmt.Printf("Missing key 'value' in '%s' output response\n%v\n",
|
||||||
|
j.metric, out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func addCassandraMetric(mbean string, c cassandraMetric,
|
||||||
|
values map[string]interface{}) {
|
||||||
|
|
||||||
|
tags := make(map[string]string)
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
tokens := parseJmxMetricRequest(mbean)
|
||||||
|
addTokensToTags(tokens, tags)
|
||||||
|
addServerTags(c.host, tags)
|
||||||
|
addValuesAsFields(values, fields, tags["mname"])
|
||||||
|
c.acc.AddFields(tokens["class"]+tokens["type"], fields, tags)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c cassandraMetric) addTagsFields(out map[string]interface{}) {
|
||||||
|
|
||||||
|
r := out["request"]
|
||||||
|
|
||||||
|
tokens := parseJmxMetricRequest(r.(map[string]interface{})["mbean"].(string))
|
||||||
|
// Requests with wildcards for keyspace or table names will return nested
|
||||||
|
// maps in the json response
|
||||||
|
if tokens["type"] == "Table" && (tokens["keyspace"] == "*" ||
|
||||||
|
tokens["scope"] == "*") {
|
||||||
|
if valuesMap, ok := out["value"]; ok {
|
||||||
|
for k, v := range valuesMap.(map[string]interface{}) {
|
||||||
|
addCassandraMetric(k, c, v.(map[string]interface{}))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
fmt.Printf("Missing key 'value' in '%s' output response\n%v\n",
|
||||||
|
c.metric, out)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if values, ok := out["value"]; ok {
|
||||||
|
addCassandraMetric(r.(map[string]interface{})["mbean"].(string),
|
||||||
|
c, values.(map[string]interface{}))
|
||||||
|
} else {
|
||||||
|
fmt.Printf("Missing key 'value' in '%s' output response\n%v\n",
|
||||||
|
c.metric, out)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *Cassandra) SampleConfig() string {
|
||||||
|
return `
|
||||||
|
# This is the context root used to compose the jolokia url
|
||||||
|
context = "/jolokia/read"
|
||||||
|
## List of cassandra servers exposing jolokia read service
|
||||||
|
servers = ["myuser:mypassword@10.10.10.1:8778","10.10.10.2:8778",":8778"]
|
||||||
|
## List of metrics collected on above servers
|
||||||
|
## Each metric consists of a jmx path.
|
||||||
|
## This will collect all heap memory usage metrics from the jvm and
|
||||||
|
## ReadLatency metrics for all keyspaces and tables.
|
||||||
|
## "type=Table" in the query works with Cassandra3.0. Older versions might
|
||||||
|
## need to use "type=ColumnFamily"
|
||||||
|
metrics = [
|
||||||
|
"/java.lang:type=Memory/HeapMemoryUsage",
|
||||||
|
"/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency"
|
||||||
|
]
|
||||||
|
`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *Cassandra) Description() string {
|
||||||
|
return "Read Cassandra metrics through Jolokia"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *Cassandra) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
|
||||||
|
// Create + send request
|
||||||
|
req, err := http.NewRequest("GET", requestUrl.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := j.jClient.MakeRequest(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
// Process response
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)",
|
||||||
|
requestUrl,
|
||||||
|
resp.StatusCode,
|
||||||
|
http.StatusText(resp.StatusCode),
|
||||||
|
http.StatusOK,
|
||||||
|
http.StatusText(http.StatusOK))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// read body
|
||||||
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unmarshal json
|
||||||
|
var jsonOut map[string]interface{}
|
||||||
|
if err = json.Unmarshal([]byte(body), &jsonOut); err != nil {
|
||||||
|
return nil, errors.New("Error decoding JSON response")
|
||||||
|
}
|
||||||
|
|
||||||
|
return jsonOut, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseServerTokens(server string) map[string]string {
|
||||||
|
serverTokens := make(map[string]string)
|
||||||
|
|
||||||
|
hostAndUser := strings.Split(server, "@")
|
||||||
|
hostPort := ""
|
||||||
|
userPasswd := ""
|
||||||
|
if len(hostAndUser) == 2 {
|
||||||
|
hostPort = hostAndUser[1]
|
||||||
|
userPasswd = hostAndUser[0]
|
||||||
|
} else {
|
||||||
|
hostPort = hostAndUser[0]
|
||||||
|
}
|
||||||
|
hostTokens := strings.Split(hostPort, ":")
|
||||||
|
serverTokens["host"] = hostTokens[0]
|
||||||
|
serverTokens["port"] = hostTokens[1]
|
||||||
|
|
||||||
|
if userPasswd != "" {
|
||||||
|
userTokens := strings.Split(userPasswd, ":")
|
||||||
|
serverTokens["user"] = userTokens[0]
|
||||||
|
serverTokens["passwd"] = userTokens[1]
|
||||||
|
}
|
||||||
|
return serverTokens
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cassandra) Gather(acc telegraf.Accumulator) error {
|
||||||
|
context := c.Context
|
||||||
|
servers := c.Servers
|
||||||
|
metrics := c.Metrics
|
||||||
|
|
||||||
|
for _, server := range servers {
|
||||||
|
for _, metric := range metrics {
|
||||||
|
var m jmxMetric
|
||||||
|
|
||||||
|
serverTokens := parseServerTokens(server)
|
||||||
|
|
||||||
|
if strings.HasPrefix(metric, "/java.lang:") {
|
||||||
|
m = newJavaMetric(serverTokens["host"], metric, acc)
|
||||||
|
} else if strings.HasPrefix(metric,
|
||||||
|
"/org.apache.cassandra.metrics:") {
|
||||||
|
m = newCassandraMetric(serverTokens["host"], metric, acc)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepare URL
|
||||||
|
requestUrl, err := url.Parse("http://" + serverTokens["host"] + ":" +
|
||||||
|
serverTokens["port"] + context + metric)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if serverTokens["user"] != "" && serverTokens["passwd"] != "" {
|
||||||
|
requestUrl.User = url.UserPassword(serverTokens["user"],
|
||||||
|
serverTokens["passwd"])
|
||||||
|
}
|
||||||
|
fmt.Printf("host %s url %s\n", serverTokens["host"], requestUrl)
|
||||||
|
|
||||||
|
out, err := c.getAttr(requestUrl)
|
||||||
|
if out["status"] != 200.0 {
|
||||||
|
fmt.Printf("URL returned with status %v\n", out["status"])
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
m.addTagsFields(out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
inputs.Add("cassandra", func() telegraf.Input {
|
||||||
|
return &Cassandra{jClient: &JolokiaClientImpl{client: &http.Client{}}}
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,286 @@
|
||||||
|
package cassandra
|
||||||
|
|
||||||
|
import (
|
||||||
|
_ "fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
_ "github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
const validJavaMultiValueJSON = `
|
||||||
|
{
|
||||||
|
"request":{
|
||||||
|
"mbean":"java.lang:type=Memory",
|
||||||
|
"attribute":"HeapMemoryUsage",
|
||||||
|
"type":"read"
|
||||||
|
},
|
||||||
|
"value":{
|
||||||
|
"init":67108864,
|
||||||
|
"committed":456130560,
|
||||||
|
"max":477626368,
|
||||||
|
"used":203288528
|
||||||
|
},
|
||||||
|
"timestamp":1446129191,
|
||||||
|
"status":200
|
||||||
|
}`
|
||||||
|
|
||||||
|
const validCassandraMultiValueJSON = `
|
||||||
|
{
|
||||||
|
"request": {
|
||||||
|
"mbean": "org.apache.cassandra.metrics:keyspace=test_keyspace1,name=ReadLatency,scope=test_table,type=Table",
|
||||||
|
"type": "read"},
|
||||||
|
"status": 200,
|
||||||
|
"timestamp": 1458089229,
|
||||||
|
"value": {
|
||||||
|
"999thPercentile": 20.0,
|
||||||
|
"99thPercentile": 10.0,
|
||||||
|
"Count": 400,
|
||||||
|
"DurationUnit": "microseconds",
|
||||||
|
"Max": 30.0,
|
||||||
|
"Mean": null,
|
||||||
|
"MeanRate": 3.0,
|
||||||
|
"Min": 1.0,
|
||||||
|
"RateUnit": "events/second",
|
||||||
|
"StdDev": null
|
||||||
|
}
|
||||||
|
}`
|
||||||
|
|
||||||
|
const validCassandraNestedMultiValueJSON = `
|
||||||
|
{
|
||||||
|
"request": {
|
||||||
|
"mbean": "org.apache.cassandra.metrics:keyspace=test_keyspace1,name=ReadLatency,scope=*,type=Table",
|
||||||
|
"type": "read"},
|
||||||
|
"status": 200,
|
||||||
|
"timestamp": 1458089184,
|
||||||
|
"value": {
|
||||||
|
"org.apache.cassandra.metrics:keyspace=test_keyspace1,name=ReadLatency,scope=test_table1,type=Table":
|
||||||
|
{ "999thPercentile": 1.0,
|
||||||
|
"Count": 100,
|
||||||
|
"DurationUnit": "microseconds",
|
||||||
|
"OneMinuteRate": 1.0,
|
||||||
|
"RateUnit": "events/second",
|
||||||
|
"StdDev": null
|
||||||
|
},
|
||||||
|
"org.apache.cassandra.metrics:keyspace=test_keyspace2,name=ReadLatency,scope=test_table2,type=Table":
|
||||||
|
{ "999thPercentile": 2.0,
|
||||||
|
"Count": 200,
|
||||||
|
"DurationUnit": "microseconds",
|
||||||
|
"OneMinuteRate": 2.0,
|
||||||
|
"RateUnit": "events/second",
|
||||||
|
"StdDev": null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}`
|
||||||
|
|
||||||
|
const validSingleValueJSON = `
|
||||||
|
{
|
||||||
|
"request":{
|
||||||
|
"path":"used",
|
||||||
|
"mbean":"java.lang:type=Memory",
|
||||||
|
"attribute":"HeapMemoryUsage",
|
||||||
|
"type":"read"
|
||||||
|
},
|
||||||
|
"value":209274376,
|
||||||
|
"timestamp":1446129256,
|
||||||
|
"status":200
|
||||||
|
}`
|
||||||
|
|
||||||
|
const validJavaMultiTypeJSON = `
|
||||||
|
{
|
||||||
|
"request":{
|
||||||
|
"mbean":"java.lang:name=ConcurrentMarkSweep,type=GarbageCollector",
|
||||||
|
"attribute":"CollectionCount",
|
||||||
|
"type":"read"
|
||||||
|
},
|
||||||
|
"value":1,
|
||||||
|
"timestamp":1459316570,
|
||||||
|
"status":200
|
||||||
|
}`
|
||||||
|
|
||||||
|
const invalidJSON = "I don't think this is JSON"
|
||||||
|
|
||||||
|
const empty = ""
|
||||||
|
|
||||||
|
var Servers = []string{"10.10.10.10:8778"}
|
||||||
|
var AuthServers = []string{"user:passwd@10.10.10.10:8778"}
|
||||||
|
var MultipleServers = []string{"10.10.10.10:8778", "10.10.10.11:8778"}
|
||||||
|
var HeapMetric = "/java.lang:type=Memory/HeapMemoryUsage"
|
||||||
|
var ReadLatencyMetric = "/org.apache.cassandra.metrics:type=Table,keyspace=test_keyspace1,scope=test_table,name=ReadLatency"
|
||||||
|
var NestedReadLatencyMetric = "/org.apache.cassandra.metrics:type=Table,keyspace=test_keyspace1,scope=*,name=ReadLatency"
|
||||||
|
var GarbageCollectorMetric1 = "/java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionCount"
|
||||||
|
var GarbageCollectorMetric2 = "/java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionTime"
|
||||||
|
var Context = "/jolokia/read"
|
||||||
|
|
||||||
|
type jolokiaClientStub struct {
|
||||||
|
responseBody string
|
||||||
|
statusCode int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c jolokiaClientStub) MakeRequest(req *http.Request) (*http.Response, error) {
|
||||||
|
resp := http.Response{}
|
||||||
|
resp.StatusCode = c.statusCode
|
||||||
|
resp.Body = ioutil.NopCloser(strings.NewReader(c.responseBody))
|
||||||
|
return &resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generates a pointer to an HttpJson object that uses a mock HTTP client.
|
||||||
|
// Parameters:
|
||||||
|
// response : Body of the response that the mock HTTP client should return
|
||||||
|
// statusCode: HTTP status code the mock HTTP client should return
|
||||||
|
//
|
||||||
|
// Returns:
|
||||||
|
// *HttpJson: Pointer to an HttpJson object that uses the generated mock HTTP client
|
||||||
|
func genJolokiaClientStub(response string, statusCode int, servers []string, metrics []string) *Cassandra {
|
||||||
|
return &Cassandra{
|
||||||
|
jClient: jolokiaClientStub{responseBody: response, statusCode: statusCode},
|
||||||
|
Context: Context,
|
||||||
|
Servers: servers,
|
||||||
|
Metrics: metrics,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that the proper values are ignored or collected for class=Java
|
||||||
|
func TestHttpJsonJavaMultiValue(t *testing.T) {
|
||||||
|
cassandra := genJolokiaClientStub(validJavaMultiValueJSON, 200,
|
||||||
|
MultipleServers, []string{HeapMetric})
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
acc.SetDebug(true)
|
||||||
|
err := cassandra.Gather(&acc)
|
||||||
|
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, 2, len(acc.Metrics))
|
||||||
|
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"HeapMemoryUsage_init": 67108864.0,
|
||||||
|
"HeapMemoryUsage_committed": 456130560.0,
|
||||||
|
"HeapMemoryUsage_max": 477626368.0,
|
||||||
|
"HeapMemoryUsage_used": 203288528.0,
|
||||||
|
}
|
||||||
|
tags1 := map[string]string{
|
||||||
|
"host": "10.10.10.10",
|
||||||
|
"mname": "HeapMemoryUsage",
|
||||||
|
}
|
||||||
|
|
||||||
|
tags2 := map[string]string{
|
||||||
|
"host": "10.10.10.11",
|
||||||
|
"mname": "HeapMemoryUsage",
|
||||||
|
}
|
||||||
|
acc.AssertContainsTaggedFields(t, "javaMemory", fields, tags1)
|
||||||
|
acc.AssertContainsTaggedFields(t, "javaMemory", fields, tags2)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHttpJsonJavaMultiType(t *testing.T) {
|
||||||
|
cassandra := genJolokiaClientStub(validJavaMultiTypeJSON, 200, AuthServers, []string{GarbageCollectorMetric1, GarbageCollectorMetric2})
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
acc.SetDebug(true)
|
||||||
|
err := cassandra.Gather(&acc)
|
||||||
|
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, 2, len(acc.Metrics))
|
||||||
|
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"CollectionCount": 1.0,
|
||||||
|
}
|
||||||
|
|
||||||
|
tags := map[string]string{
|
||||||
|
"host": "10.10.10.10",
|
||||||
|
"mname": "ConcurrentMarkSweep",
|
||||||
|
}
|
||||||
|
acc.AssertContainsTaggedFields(t, "javaGarbageCollector", fields, tags)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that the proper values are ignored or collected
|
||||||
|
func TestHttpJsonOn404(t *testing.T) {
|
||||||
|
|
||||||
|
jolokia := genJolokiaClientStub(validJavaMultiValueJSON, 404, Servers,
|
||||||
|
[]string{HeapMetric})
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
err := jolokia.Gather(&acc)
|
||||||
|
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, 0, len(acc.Metrics))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that the proper values are ignored or collected for class=Cassandra
|
||||||
|
func TestHttpJsonCassandraMultiValue(t *testing.T) {
|
||||||
|
cassandra := genJolokiaClientStub(validCassandraMultiValueJSON, 200, Servers, []string{ReadLatencyMetric})
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
err := cassandra.Gather(&acc)
|
||||||
|
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, 1, len(acc.Metrics))
|
||||||
|
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"ReadLatency_999thPercentile": 20.0,
|
||||||
|
"ReadLatency_99thPercentile": 10.0,
|
||||||
|
"ReadLatency_Count": 400.0,
|
||||||
|
"ReadLatency_DurationUnit": "microseconds",
|
||||||
|
"ReadLatency_Max": 30.0,
|
||||||
|
"ReadLatency_MeanRate": 3.0,
|
||||||
|
"ReadLatency_Min": 1.0,
|
||||||
|
"ReadLatency_RateUnit": "events/second",
|
||||||
|
}
|
||||||
|
|
||||||
|
tags := map[string]string{
|
||||||
|
"host": "10.10.10.10",
|
||||||
|
"mname": "ReadLatency",
|
||||||
|
"keyspace": "test_keyspace1",
|
||||||
|
"scope": "test_table",
|
||||||
|
}
|
||||||
|
acc.AssertContainsTaggedFields(t, "cassandraTable", fields, tags)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that the proper values are ignored or collected for class=Cassandra with
|
||||||
|
// nested values
|
||||||
|
func TestHttpJsonCassandraNestedMultiValue(t *testing.T) {
|
||||||
|
cassandra := genJolokiaClientStub(validCassandraNestedMultiValueJSON, 200, Servers, []string{NestedReadLatencyMetric})
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
acc.SetDebug(true)
|
||||||
|
err := cassandra.Gather(&acc)
|
||||||
|
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, 2, len(acc.Metrics))
|
||||||
|
|
||||||
|
fields1 := map[string]interface{}{
|
||||||
|
"ReadLatency_999thPercentile": 1.0,
|
||||||
|
"ReadLatency_Count": 100.0,
|
||||||
|
"ReadLatency_DurationUnit": "microseconds",
|
||||||
|
"ReadLatency_OneMinuteRate": 1.0,
|
||||||
|
"ReadLatency_RateUnit": "events/second",
|
||||||
|
}
|
||||||
|
|
||||||
|
fields2 := map[string]interface{}{
|
||||||
|
"ReadLatency_999thPercentile": 2.0,
|
||||||
|
"ReadLatency_Count": 200.0,
|
||||||
|
"ReadLatency_DurationUnit": "microseconds",
|
||||||
|
"ReadLatency_OneMinuteRate": 2.0,
|
||||||
|
"ReadLatency_RateUnit": "events/second",
|
||||||
|
}
|
||||||
|
|
||||||
|
tags1 := map[string]string{
|
||||||
|
"host": "10.10.10.10",
|
||||||
|
"mname": "ReadLatency",
|
||||||
|
"keyspace": "test_keyspace1",
|
||||||
|
"scope": "test_table1",
|
||||||
|
}
|
||||||
|
|
||||||
|
tags2 := map[string]string{
|
||||||
|
"host": "10.10.10.10",
|
||||||
|
"mname": "ReadLatency",
|
||||||
|
"keyspace": "test_keyspace2",
|
||||||
|
"scope": "test_table2",
|
||||||
|
}
|
||||||
|
|
||||||
|
acc.AssertContainsTaggedFields(t, "cassandraTable", fields1, tags1)
|
||||||
|
acc.AssertContainsTaggedFields(t, "cassandraTable", fields2, tags2)
|
||||||
|
}
|
Loading…
Reference in New Issue