From 487c2cabfe01f41947bcb3d1ecf038a40e5ded98 Mon Sep 17 00:00:00 2001 From: subhachandrachandra Date: Fri, 1 Apr 2016 16:13:21 -0700 Subject: [PATCH] Added cassandra plugin to access metrics using jolokia and push them to influxdb. Conflicts: plugins/inputs/all/all.go --- plugins/inputs/all/all.go | 1 + plugins/inputs/cassandra/README.md | 41 +++ plugins/inputs/cassandra/cassandra.go | 307 +++++++++++++++++++++ plugins/inputs/cassandra/cassandra_test.go | 276 ++++++++++++++++++ 4 files changed, 625 insertions(+) create mode 100644 plugins/inputs/cassandra/README.md create mode 100644 plugins/inputs/cassandra/cassandra.go create mode 100644 plugins/inputs/cassandra/cassandra_test.go diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 4f7d45f60..b73fac840 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -4,6 +4,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/aerospike" _ "github.com/influxdata/telegraf/plugins/inputs/apache" _ "github.com/influxdata/telegraf/plugins/inputs/bcache" + _ "github.com/influxdata/telegraf/plugins/inputs/cassandra" _ "github.com/influxdata/telegraf/plugins/inputs/couchbase" _ "github.com/influxdata/telegraf/plugins/inputs/couchdb" _ "github.com/influxdata/telegraf/plugins/inputs/disque" diff --git a/plugins/inputs/cassandra/README.md b/plugins/inputs/cassandra/README.md new file mode 100644 index 000000000..f7ec4c715 --- /dev/null +++ b/plugins/inputs/cassandra/README.md @@ -0,0 +1,41 @@ +# Telegraf plugin: Cassandra + +#### Plugin arguments: +- **context** string: Context root used for jolokia url +- **servers** []Server: List of servers + + **host** string: Server's ip address or hostname + + **port** string: Server's listening port + + **username** string: Server's username for authentication (optional) + + **password** string: Server's password for authentication (optional) +- **metrics** []Metric + + **jmx** string: Jmx path that identifies mbeans attributes + + **pass** []string: Attributes to retain when collecting values (TODO) + + **drop** []string: Attributes to drop when collecting values (TODO) + +#### Description + +The Cassandra plugin collects Cassandra/JVM metrics exposed as MBean's attributes through jolokia REST endpoint. All metrics are collected for each server configured. + +See: https://jolokia.org/ + +# Measurements: +Cassandra plugin produces one or more measurements for each metric configured, adding Server's name as `host` tag. More than one measurement is generated when querying table metrics with a wildcard for the keyspace or table name. + +Given a configuration like: + +```ini +[cassandra] + +[[cassandra.servers]] + host = "127.0.0.1" + port = "878" + +[[cassandra.metrics]] + jmx = "/java.lang:type=Memory/HeapMemoryUsage" +``` + +The collected metrics will be: + +``` +javaMemory,host=myHost,mname=HeapMemoryUsage HeapMemoryUsage_committed=1040187392,HeapMemoryUsage_init=1050673152,HeapMemoryUsage_max=1040187392,HeapMemoryUsage_used=368155000 1459551767230567084 +``` diff --git a/plugins/inputs/cassandra/cassandra.go b/plugins/inputs/cassandra/cassandra.go new file mode 100644 index 000000000..ce9230ff3 --- /dev/null +++ b/plugins/inputs/cassandra/cassandra.go @@ -0,0 +1,307 @@ +package cassandra + +import ( + "encoding/json" + "errors" + "fmt" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "io/ioutil" + "net/http" + "net/url" + //"reflect" + "strings" +) + +type Server struct { + Host string + Username string + Password string + Port string +} + +type Metric struct { + Jmx string +} + +type JolokiaClient interface { + MakeRequest(req *http.Request) (*http.Response, error) +} + +type JolokiaClientImpl struct { + client *http.Client +} + +func (c JolokiaClientImpl) MakeRequest(req *http.Request) (*http.Response, error) { + return c.client.Do(req) +} + +type Cassandra struct { + jClient JolokiaClient + Context string + Servers []Server + Metrics []Metric +} + +type javaMetric struct { + server Server + metric Metric + acc telegraf.Accumulator +} + +type cassandraMetric struct { + server Server + metric Metric + acc telegraf.Accumulator +} + +type jmxMetric interface { + addTagsFields(out map[string]interface{}) +} + +func addServerTags(server Server, tags map[string]string) { + if server.Host != "" && server.Host != "localhost" && + server.Host != "127.0.0.1" { + tags["host"] = server.Host + } +} + +func newJavaMetric(server Server, metric Metric, + acc telegraf.Accumulator) *javaMetric { + return &javaMetric{server: server, metric: metric, acc: acc} +} + +func newCassandraMetric(server Server, metric Metric, + acc telegraf.Accumulator) *cassandraMetric { + return &cassandraMetric{server: server, metric: metric, acc: acc} +} + +func addValuesAsFields(values map[string]interface{}, fields map[string]interface{}, + mname string) { + for k, v := range values { + if v != nil { + fields[mname+"_"+k] = v + } + } +} + +func parseJmxMetricRequest(mbean string) map[string]string { + tokens := make(map[string]string) + classAndPairs := strings.Split(mbean, ":") + if classAndPairs[0] == "org.apache.cassandra.metrics" { + tokens["class"] = "cassandra" + } else if classAndPairs[0] == "java.lang" { + tokens["class"] = "java" + } else { + return tokens + } + pairs := strings.Split(classAndPairs[1], ",") + for _, pair := range pairs { + p := strings.Split(pair, "=") + tokens[p[0]] = p[1] + } + return tokens +} + +func addTokensToTags(tokens map[string]string, tags map[string]string) { + for k, v := range tokens { + if k == "name" { + tags["mname"] = v // name seems to a reserved word in influxdb + } else if k == "class" || k == "type" { + continue // class and type are used in the metric name + } else { + tags[k] = v + } + } +} + +func (j javaMetric) addTagsFields(out map[string]interface{}) { + tags := make(map[string]string) + fields := make(map[string]interface{}) + + a := out["request"].(map[string]interface{}) + attribute := a["attribute"].(string) + mbean := a["mbean"].(string) + + tokens := parseJmxMetricRequest(mbean) + addTokensToTags(tokens, tags) + addServerTags(j.server, tags) + + if _, ok := tags["mname"]; !ok { + //Queries for a single value will not return a "name" tag in the response. + tags["mname"] = attribute + } + + if values, ok := out["value"]; ok { + switch t := values.(type) { + case map[string]interface{}: + addValuesAsFields(values.(map[string]interface{}), fields, attribute) + case interface{}: + fields[attribute] = t + } + j.acc.AddFields(tokens["class"]+tokens["type"], fields, tags) + } else { + fmt.Printf("Missing key 'value' in '%s' output response\n%v\n", + j.metric.Jmx, out) + } +} + +func addCassandraMetric(mbean string, c cassandraMetric, + values map[string]interface{}) { + + tags := make(map[string]string) + fields := make(map[string]interface{}) + tokens := parseJmxMetricRequest(mbean) + addTokensToTags(tokens, tags) + addServerTags(c.server, tags) + addValuesAsFields(values, fields, tags["mname"]) + c.acc.AddFields(tokens["class"]+tokens["type"], fields, tags) + +} + +func (c cassandraMetric) addTagsFields(out map[string]interface{}) { + + r := out["request"] + + tokens := parseJmxMetricRequest(r.(map[string]interface{})["mbean"].(string)) + // Requests with wildcards for keyspace or table names will return nested + // maps in the json response + if tokens["type"] == "Table" && (tokens["keyspace"] == "*" || + tokens["scope"] == "*") { + if valuesMap, ok := out["value"]; ok { + for k, v := range valuesMap.(map[string]interface{}) { + addCassandraMetric(k, c, v.(map[string]interface{})) + } + } else { + fmt.Printf("Missing key 'value' in '%s' output response\n%v\n", + c.metric.Jmx, out) + return + } + } else { + if values, ok := out["value"]; ok { + addCassandraMetric(r.(map[string]interface{})["mbean"].(string), + c, values.(map[string]interface{})) + } else { + fmt.Printf("Missing key 'value' in '%s' output response\n%v\n", + c.metric.Jmx, out) + return + } + } +} + +func (j *Cassandra) SampleConfig() string { + return ` + # This is the context root used to compose the jolokia url + context = "/jolokia/read" + + # List of cassandra servers exposing jolokia read service + [[cassandra.servers]] + # host can be skipped for localhost. host tag will be set to hostname() + host = "192.168.103.2" + port = "8180" + # username = "myuser" + # password = "mypassword" + + # List of metrics collected on above servers + # Each metric consists of a jmx path. Pass or drop slice attributes will be + # supported in the future. + # This will collect all heap memory usage metrics from the jvm + [[cassandra..metrics]] + jmx = "/java.lang:type=Memory/HeapMemoryUsage" + + # This will collect ReadLatency metrics for all keyspaces and tables. + # "type=Table" in the query works with Cassandra3.0. Older versions might need + # to use "type=ColumnFamily" + [[cassandra..metrics]] + jmx = "/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadL +atency" +` +} + +func (j *Cassandra) Description() string { + return "Read Cassandra metrics through Jolokia" +} + +func (j *Cassandra) getAttr(requestUrl *url.URL) (map[string]interface{}, error) { + // Create + send request + req, err := http.NewRequest("GET", requestUrl.String(), nil) + if err != nil { + return nil, err + } + + resp, err := j.jClient.MakeRequest(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + // Process response + if resp.StatusCode != http.StatusOK { + err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)", + requestUrl, + resp.StatusCode, + http.StatusText(resp.StatusCode), + http.StatusOK, + http.StatusText(http.StatusOK)) + return nil, err + } + + // read body + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + // Unmarshal json + var jsonOut map[string]interface{} + if err = json.Unmarshal([]byte(body), &jsonOut); err != nil { + return nil, errors.New("Error decoding JSON response") + } + + return jsonOut, nil +} + +func (c *Cassandra) Gather(acc telegraf.Accumulator) error { + context := c.Context + servers := c.Servers + metrics := c.Metrics + + for _, server := range servers { + for _, metric := range metrics { + var m jmxMetric + + if strings.HasPrefix(metric.Jmx, "/java.lang:") { + m = newJavaMetric(server, metric, acc) + } else if strings.HasPrefix(metric.Jmx, + "/org.apache.cassandra.metrics:") { + m = newCassandraMetric(server, metric, acc) + } + jmxPath := metric.Jmx + + // Prepare URL + requestUrl, err := url.Parse("http://" + server.Host + ":" + + server.Port + context + jmxPath) + fmt.Printf("host %s url %s\n", server.Host, requestUrl) + if err != nil { + return err + } + if server.Username != "" || server.Password != "" { + requestUrl.User = url.UserPassword(server.Username, server.Password) + } + + out, err := c.getAttr(requestUrl) + if out["status"] != 200.0 { + fmt.Printf("URL returned with status %v\n", out["status"]) + continue + } + m.addTagsFields(out) + } + } + return nil +} + +func init() { + inputs.Add("cassandra", func() telegraf.Input { + return &Cassandra{jClient: &JolokiaClientImpl{client: &http.Client{}}} + }) +} diff --git a/plugins/inputs/cassandra/cassandra_test.go b/plugins/inputs/cassandra/cassandra_test.go new file mode 100644 index 000000000..4fa12aa8f --- /dev/null +++ b/plugins/inputs/cassandra/cassandra_test.go @@ -0,0 +1,276 @@ +package cassandra + +import ( + _ "fmt" + "io/ioutil" + "net/http" + "strings" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + _ "github.com/stretchr/testify/require" +) + +const validJavaMultiValueJSON = ` +{ + "request":{ + "mbean":"java.lang:type=Memory", + "attribute":"HeapMemoryUsage", + "type":"read" + }, + "value":{ + "init":67108864, + "committed":456130560, + "max":477626368, + "used":203288528 + }, + "timestamp":1446129191, + "status":200 +}` + +const validCassandraMultiValueJSON = ` +{ + "request": { + "mbean": "org.apache.cassandra.metrics:keyspace=test_keyspace1,name=ReadLatency,scope=test_table,type=Table", + "type": "read"}, + "status": 200, + "timestamp": 1458089229, + "value": { + "999thPercentile": 20.0, + "99thPercentile": 10.0, + "Count": 400, + "DurationUnit": "microseconds", + "Max": 30.0, + "Mean": null, + "MeanRate": 3.0, + "Min": 1.0, + "RateUnit": "events/second", + "StdDev": null + } +}` + +const validCassandraNestedMultiValueJSON = ` +{ + "request": { + "mbean": "org.apache.cassandra.metrics:keyspace=test_keyspace1,name=ReadLatency,scope=*,type=Table", + "type": "read"}, + "status": 200, + "timestamp": 1458089184, + "value": { + "org.apache.cassandra.metrics:keyspace=test_keyspace1,name=ReadLatency,scope=test_table1,type=Table": + { "999thPercentile": 1.0, + "Count": 100, + "DurationUnit": "microseconds", + "OneMinuteRate": 1.0, + "RateUnit": "events/second", + "StdDev": null + }, + "org.apache.cassandra.metrics:keyspace=test_keyspace2,name=ReadLatency,scope=test_table2,type=Table": + { "999thPercentile": 2.0, + "Count": 200, + "DurationUnit": "microseconds", + "OneMinuteRate": 2.0, + "RateUnit": "events/second", + "StdDev": null + } + } +}` + +const validSingleValueJSON = ` +{ + "request":{ + "path":"used", + "mbean":"java.lang:type=Memory", + "attribute":"HeapMemoryUsage", + "type":"read" + }, + "value":209274376, + "timestamp":1446129256, + "status":200 +}` + +const validJavaMultiTypeJSON = ` +{ + "request":{ + "mbean":"java.lang:name=ConcurrentMarkSweep,type=GarbageCollector", + "attribute":"CollectionCount", + "type":"read" + }, + "value":1, + "timestamp":1459316570, + "status":200 +}` + +const invalidJSON = "I don't think this is JSON" + +const empty = "" + +var Servers = []Server{Server{Host: "10.10.10.10", Port: "8778"}} +var HeapMetric = Metric{Jmx: "/java.lang:type=Memory/HeapMemoryUsage"} +var ReadLatencyMetric = Metric{"/org.apache.cassandra.metrics:type=Table,keyspace=test_keyspace1,scope=test_table,name=ReadLatency"} +var NestedReadLatencyMetric = Metric{"/org.apache.cassandra.metrics:type=Table,keyspace=test_keyspace1,scope=*,name=ReadLatency"} +var GarbageCollectorMetric = Metric{"/java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionCount"} +var Context = "/jolokia/read" + +type jolokiaClientStub struct { + responseBody string + statusCode int +} + +func (c jolokiaClientStub) MakeRequest(req *http.Request) (*http.Response, error) { + resp := http.Response{} + resp.StatusCode = c.statusCode + resp.Body = ioutil.NopCloser(strings.NewReader(c.responseBody)) + return &resp, nil +} + +// Generates a pointer to an HttpJson object that uses a mock HTTP client. +// Parameters: +// response : Body of the response that the mock HTTP client should return +// statusCode: HTTP status code the mock HTTP client should return +// +// Returns: +// *HttpJson: Pointer to an HttpJson object that uses the generated mock HTTP client +func genJolokiaClientStub(response string, statusCode int, servers []Server, metrics []Metric) *Cassandra { + return &Cassandra{ + jClient: jolokiaClientStub{responseBody: response, statusCode: statusCode}, + Context: Context, + Servers: servers, + Metrics: metrics, + } +} + +// Test that the proper values are ignored or collected for class=Java +func TestHttpJsonJavaMultiValue(t *testing.T) { + cassandra := genJolokiaClientStub(validJavaMultiValueJSON, 200, Servers, []Metric{HeapMetric}) + + var acc testutil.Accumulator + acc.SetDebug(true) + err := cassandra.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Metrics)) + + fields := map[string]interface{}{ + "HeapMemoryUsage_init": 67108864.0, + "HeapMemoryUsage_committed": 456130560.0, + "HeapMemoryUsage_max": 477626368.0, + "HeapMemoryUsage_used": 203288528.0, + } + tags := map[string]string{ + "host": "10.10.10.10", + "mname": "HeapMemoryUsage", + } + acc.AssertContainsTaggedFields(t, "javaMemory", fields, tags) +} + +func TestHttpJsonJavaMultiType(t *testing.T) { + cassandra := genJolokiaClientStub(validJavaMultiTypeJSON, 200, Servers, []Metric{GarbageCollectorMetric}) + + var acc testutil.Accumulator + acc.SetDebug(true) + err := cassandra.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Metrics)) + + fields := map[string]interface{}{ + "CollectionCount": 1.0, + } + + tags := map[string]string{ + "host": "10.10.10.10", + "mname": "ConcurrentMarkSweep", + } + acc.AssertContainsTaggedFields(t, "javaGarbageCollector", fields, tags) +} + +// Test that the proper values are ignored or collected +func TestHttpJsonOn404(t *testing.T) { + + jolokia := genJolokiaClientStub(validJavaMultiValueJSON, 404, Servers, + []Metric{HeapMetric}) + + var acc testutil.Accumulator + err := jolokia.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 0, len(acc.Metrics)) +} + +// Test that the proper values are ignored or collected for class=Cassandra +func TestHttpJsonCassandraMultiValue(t *testing.T) { + cassandra := genJolokiaClientStub(validCassandraMultiValueJSON, 200, Servers, []Metric{ReadLatencyMetric}) + + var acc testutil.Accumulator + err := cassandra.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 1, len(acc.Metrics)) + + fields := map[string]interface{}{ + "ReadLatency_999thPercentile": 20.0, + "ReadLatency_99thPercentile": 10.0, + "ReadLatency_Count": 400.0, + "ReadLatency_DurationUnit": "microseconds", + "ReadLatency_Max": 30.0, + "ReadLatency_MeanRate": 3.0, + "ReadLatency_Min": 1.0, + "ReadLatency_RateUnit": "events/second", + } + + tags := map[string]string{ + "host": "10.10.10.10", + "mname": "ReadLatency", + "keyspace": "test_keyspace1", + "scope": "test_table", + } + acc.AssertContainsTaggedFields(t, "cassandraTable", fields, tags) +} + +// Test that the proper values are ignored or collected for class=Cassandra with +// nested values +func TestHttpJsonCassandraNestedMultiValue(t *testing.T) { + cassandra := genJolokiaClientStub(validCassandraNestedMultiValueJSON, 200, Servers, []Metric{NestedReadLatencyMetric}) + + var acc testutil.Accumulator + acc.SetDebug(true) + err := cassandra.Gather(&acc) + + assert.Nil(t, err) + assert.Equal(t, 2, len(acc.Metrics)) + + fields1 := map[string]interface{}{ + "ReadLatency_999thPercentile": 1.0, + "ReadLatency_Count": 100.0, + "ReadLatency_DurationUnit": "microseconds", + "ReadLatency_OneMinuteRate": 1.0, + "ReadLatency_RateUnit": "events/second", + } + + fields2 := map[string]interface{}{ + "ReadLatency_999thPercentile": 2.0, + "ReadLatency_Count": 200.0, + "ReadLatency_DurationUnit": "microseconds", + "ReadLatency_OneMinuteRate": 2.0, + "ReadLatency_RateUnit": "events/second", + } + + tags1 := map[string]string{ + "host": "10.10.10.10", + "mname": "ReadLatency", + "keyspace": "test_keyspace1", + "scope": "test_table1", + } + + tags2 := map[string]string{ + "host": "10.10.10.10", + "mname": "ReadLatency", + "keyspace": "test_keyspace2", + "scope": "test_table2", + } + + acc.AssertContainsTaggedFields(t, "cassandraTable", fields1, tags1) + acc.AssertContainsTaggedFields(t, "cassandraTable", fields2, tags2) +}