Simplified cassandra plugin config to accept servers and metrics as lists of strings based on comments from @sparrc.

This commit is contained in:
subhachandrachandra 2016-04-06 23:25:08 -07:00 committed by Subhachandra Chandra
parent 44f35f1d35
commit 67fe265d34
4 changed files with 241 additions and 89 deletions

View File

@ -0,0 +1,144 @@
# Telegraf configuration
# Telegraf is entirely plugin driven. All metrics are gathered from the
# declared inputs, and sent to the declared outputs.
# Plugins must be declared in here to be active.
# To deactivate a plugin, comment out the name and any variables.
# Use 'telegraf -config telegraf.conf -test' to see what metrics a config
# file would generate.
# Global tags can be specified here in key="value" format.
[tags]
# dc = "us-east-1" # will tag all metrics with dc=us-east-1
# rack = "1a"
# Configuration for telegraf agent
[agent]
# Default data collection interval for all inputs
interval = "5s"
# Rounds collection interval to 'interval'
# ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
# Telegraf will cache metric_buffer_limit metrics for each output, and will
# flush this buffer on a successful write.
metric_buffer_limit = 10000
# Collection jitter is used to jitter the collection by a random amount.
# Each plugin will sleep for a random time within jitter before collecting.
# This can be used to avoid many plugins querying things like sysfs at the
# same time, which can have a measurable effect on the system.
collection_jitter = "0s"
# Default data flushing interval for all outputs. You should not set this below
# interval. Maximum flush_interval will be flush_interval + flush_jitter
flush_interval = "10s"
# Jitter the flush interval by a random amount. This is primarily to avoid
# large write spikes for users running a large number of telegraf instances.
# ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "1s"
# Run telegraf in debug mode
debug = false
# Run telegraf in quiet mode
quiet = false
# Override default hostname, if empty use os.Hostname()
hostname = ""
###############################################################################
# OUTPUTS #
###############################################################################
# Configuration for influxdb server to send metrics to
[[outputs.influxdb]]
# The full HTTP or UDP endpoint URL for your InfluxDB instance.
# Multiple urls can be specified but it is assumed that they are part of the same
# cluster, this means that only ONE of the urls will be written to each interval.
# urls = ["udp://localhost:8089"] # UDP endpoint example
urls = ["http://192.168.99.1:8086"] # required
#urls = ["http://172.22.229.35:8086"] # required
# The target database for metrics (telegraf will create it if not exists)
database = "cassandra" # required
# Precision of writes, valid values are n, u, ms, s, m, and h
# note: using second precision greatly helps InfluxDB compression
precision = "s"
# Connection timeout (for the connection with InfluxDB), formatted as a string.
# If not provided, will default to 0 (no timeout)
# timeout = "5s"
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
# Set the user agent for HTTP POSTs (can be useful for log differentiation)
# user_agent = "telegraf"
# Set UDP payload size, defaults to InfluxDB UDP Client default (512 bytes)
# udp_payload = 512
###############################################################################
# INPUTS #
###############################################################################
[cassandra]
context = "/jolokia/read"
servers = [":8778"]
metrics = [
"/java.lang:type=Memory/HeapMemoryUsage",
"/java.lang:type=Memory/NonHeapMemoryUsage",
"/java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionTime",
"/java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionCount",
"/java.lang:type=GarbageCollector,name=ParNew/CollectionTime",
"/java.lang:type=GarbageCollector,name=ParNew/CollectionCount",
"/org.apache.cassandra.metrics:type=Compaction,name=CompletedTasks",
"/org.apache.cassandra.metrics:type=Compaction,name=PendingTasks",
"/org.apache.cassandra.metrics:type=Compaction,name=TotalCompactionsCompleted",
"/org.apache.cassandra.metrics:type=Compaction,name=BytesCompacted",
"/org.apache.cassandra.metrics:type=Client,name=connectedNativeClients",
"/org.apache.cassandra.metrics:type=Storage,name=Load",
"/org.apache.cassandra.metrics:type=Storage,name=Exceptions",
"/org.apache.cassandra.metrics:type=ThreadPools,path=internal,scope=CompactionExecutor,name=ActiveTasks",
"/org.apache.cassandra.metrics:type=ThreadPools,path=internal,scope=AntiEntropyStage,name=ActiveTasks",
"/org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=CounterMutationStage,name=PendingTasks",
"/org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=MutationStage,name=PendingTasks",
"/org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadRepairStage,name=PendingTasks",
"/org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadStage,name=PendingTasks",
"/org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=RequestResponseStage,name=PendingTasks",
"/org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=CounterMutationStage,name=CurrentlyBlockedTasks",
"/org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=MutationStage,name=CurrentlyBlockedTasks",
"/org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadRepairStage,name=CurrentlyBlockedTasks",
"/org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadStage,name=CurrentlyBlockedTasks",
"/org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=RequestResponseStage,name=CurrentlyBlockedTasks",
"/org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=TotalLatency",
"/org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=TotalLatency",
"/org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Latency",
"/org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Latency",
"/org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Timeouts",
"/org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Timeouts",
"/org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Unavailables",
"/org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Unavailables",
"/org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Failures",
"/org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Failures",
"/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency",
"/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=CoordinatorReadLatency",
"/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=WriteLatency",
"/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadTotalLatency",
"/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=WriteTotalLatency",
"/org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Hits",
"/org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Requests",
"/org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Entries",
"/org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Size",
"/org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Capacity",
"/org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Hits",
"/org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Requests",
"/org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Entries",
"/org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Size",
"/org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Capacity",
"/org.apache.cassandra.metrics:type=CommitLog,name=PendingTasks",
"/org.apache.cassandra.metrics:type=CommitLog,name=TotalCommitLogSize",
"/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=LiveDiskSpaceUsed",
"/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=TotalDiskSpaceUsed",
]
###############################################################################
# SERVICE INPUTS #
###############################################################################

View File

@ -2,15 +2,8 @@
#### Plugin arguments:
- **context** string: Context root used for jolokia url
- **servers** []Server: List of servers
+ **host** string: Server's ip address or hostname
+ **port** string: Server's listening port
+ **username** string: Server's username for authentication (optional)
+ **password** string: Server's password for authentication (optional)
- **metrics** []Metric
+ **jmx** string: Jmx path that identifies mbeans attributes
+ **pass** []string: Attributes to retain when collecting values (TODO)
+ **drop** []string: Attributes to drop when collecting values (TODO)
- **servers** []string: List of servers with the format "<user:passwd@><host>:port"
- **metrics** []string: List of Jmx paths that identify mbeans attributes
#### Description
@ -24,14 +17,10 @@ Cassandra plugin produces one or more measurements for each metric configured, a
Given a configuration like:
```ini
[cassandra]
[[cassandra.servers]]
host = "127.0.0.1"
port = "878"
[[cassandra.metrics]]
jmx = "/java.lang:type=Memory/HeapMemoryUsage"
[inputs.cassandra]
context = "/jolokia/read"
servers = [":8778"]
metrics = ["/java.lang:type=Memory/HeapMemoryUsage"]
```
The collected metrics will be:

View File

@ -13,16 +13,12 @@ import (
"strings"
)
type Server struct {
/*type Server struct {
Host string
Username string
Password string
Port string
}
type Metric struct {
Jmx string
}
}*/
type JolokiaClient interface {
MakeRequest(req *http.Request) (*http.Response, error)
@ -39,19 +35,19 @@ func (c JolokiaClientImpl) MakeRequest(req *http.Request) (*http.Response, error
type Cassandra struct {
jClient JolokiaClient
Context string
Servers []Server
Metrics []Metric
Servers []string
Metrics []string
}
type javaMetric struct {
server Server
metric Metric
host string
metric string
acc telegraf.Accumulator
}
type cassandraMetric struct {
server Server
metric Metric
host string
metric string
acc telegraf.Accumulator
}
@ -59,21 +55,20 @@ type jmxMetric interface {
addTagsFields(out map[string]interface{})
}
func addServerTags(server Server, tags map[string]string) {
if server.Host != "" && server.Host != "localhost" &&
server.Host != "127.0.0.1" {
tags["host"] = server.Host
func addServerTags(host string, tags map[string]string) {
if host != "" && host != "localhost" && host != "127.0.0.1" {
tags["host"] = host
}
}
func newJavaMetric(server Server, metric Metric,
func newJavaMetric(host string, metric string,
acc telegraf.Accumulator) *javaMetric {
return &javaMetric{server: server, metric: metric, acc: acc}
return &javaMetric{host: host, metric: metric, acc: acc}
}
func newCassandraMetric(server Server, metric Metric,
func newCassandraMetric(host string, metric string,
acc telegraf.Accumulator) *cassandraMetric {
return &cassandraMetric{server: server, metric: metric, acc: acc}
return &cassandraMetric{host: host, metric: metric, acc: acc}
}
func addValuesAsFields(values map[string]interface{}, fields map[string]interface{},
@ -125,7 +120,7 @@ func (j javaMetric) addTagsFields(out map[string]interface{}) {
tokens := parseJmxMetricRequest(mbean)
addTokensToTags(tokens, tags)
addServerTags(j.server, tags)
addServerTags(j.host, tags)
if _, ok := tags["mname"]; !ok {
//Queries for a single value will not return a "name" tag in the response.
@ -142,7 +137,7 @@ func (j javaMetric) addTagsFields(out map[string]interface{}) {
j.acc.AddFields(tokens["class"]+tokens["type"], fields, tags)
} else {
fmt.Printf("Missing key 'value' in '%s' output response\n%v\n",
j.metric.Jmx, out)
j.metric, out)
}
}
@ -153,7 +148,7 @@ func addCassandraMetric(mbean string, c cassandraMetric,
fields := make(map[string]interface{})
tokens := parseJmxMetricRequest(mbean)
addTokensToTags(tokens, tags)
addServerTags(c.server, tags)
addServerTags(c.host, tags)
addValuesAsFields(values, fields, tags["mname"])
c.acc.AddFields(tokens["class"]+tokens["type"], fields, tags)
@ -174,7 +169,7 @@ func (c cassandraMetric) addTagsFields(out map[string]interface{}) {
}
} else {
fmt.Printf("Missing key 'value' in '%s' output response\n%v\n",
c.metric.Jmx, out)
c.metric, out)
return
}
} else {
@ -183,7 +178,7 @@ func (c cassandraMetric) addTagsFields(out map[string]interface{}) {
c, values.(map[string]interface{}))
} else {
fmt.Printf("Missing key 'value' in '%s' output response\n%v\n",
c.metric.Jmx, out)
c.metric, out)
return
}
}
@ -193,28 +188,16 @@ func (j *Cassandra) SampleConfig() string {
return `
# This is the context root used to compose the jolokia url
context = "/jolokia/read"
# List of cassandra servers exposing jolokia read service
[[cassandra.servers]]
# host can be skipped for localhost. host tag will be set to hostname()
host = "192.168.103.2"
port = "8180"
# username = "myuser"
# password = "mypassword"
# List of metrics collected on above servers
# Each metric consists of a jmx path. Pass or drop slice attributes will be
# supported in the future.
# This will collect all heap memory usage metrics from the jvm
[[cassandra..metrics]]
jmx = "/java.lang:type=Memory/HeapMemoryUsage"
# This will collect ReadLatency metrics for all keyspaces and tables.
# "type=Table" in the query works with Cassandra3.0. Older versions might need
# to use "type=ColumnFamily"
[[cassandra..metrics]]
jmx = "/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadL
atency"
## List of cassandra servers exposing jolokia read service
servers = ["myuser:mypassword@10.10.10.1:8778","10.10.10.2:8778",":8778"]
## List of metrics collected on above servers
## Each metric consists of a jmx path.
## This will collect all heap memory usage metrics from the jvm and
## ReadLatency metrics for all keyspaces and tables.
## "type=Table" in the query works with Cassandra3.0. Older versions might
## need to use "type=ColumnFamily"
metrics = ["/java.lang:type=Memory/HeapMemoryUsage",
"/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency"]
`
}
@ -261,6 +244,30 @@ func (j *Cassandra) getAttr(requestUrl *url.URL) (map[string]interface{}, error)
return jsonOut, nil
}
func parseServerTokens(server string) map[string]string {
serverTokens := make(map[string]string)
hostAndUser := strings.Split(server, "@")
hostPort := ""
userPasswd := ""
if len(hostAndUser) == 2 {
hostPort = hostAndUser[1]
userPasswd = hostAndUser[0]
} else {
hostPort = hostAndUser[0]
}
hostTokens := strings.Split(hostPort, ":")
serverTokens["host"] = hostTokens[0]
serverTokens["port"] = hostTokens[1]
if userPasswd != "" {
userTokens := strings.Split(userPasswd, ":")
serverTokens["user"] = userTokens[0]
serverTokens["passwd"] = userTokens[1]
}
return serverTokens
}
func (c *Cassandra) Gather(acc telegraf.Accumulator) error {
context := c.Context
servers := c.Servers
@ -270,24 +277,26 @@ func (c *Cassandra) Gather(acc telegraf.Accumulator) error {
for _, metric := range metrics {
var m jmxMetric
if strings.HasPrefix(metric.Jmx, "/java.lang:") {
m = newJavaMetric(server, metric, acc)
} else if strings.HasPrefix(metric.Jmx,
serverTokens := parseServerTokens(server)
if strings.HasPrefix(metric, "/java.lang:") {
m = newJavaMetric(serverTokens["host"], metric, acc)
} else if strings.HasPrefix(metric,
"/org.apache.cassandra.metrics:") {
m = newCassandraMetric(server, metric, acc)
m = newCassandraMetric(serverTokens["host"], metric, acc)
}
jmxPath := metric.Jmx
// Prepare URL
requestUrl, err := url.Parse("http://" + server.Host + ":" +
server.Port + context + jmxPath)
fmt.Printf("host %s url %s\n", server.Host, requestUrl)
requestUrl, err := url.Parse("http://" + serverTokens["host"] + ":" +
serverTokens["port"] + context + metric)
if err != nil {
return err
}
if server.Username != "" || server.Password != "" {
requestUrl.User = url.UserPassword(server.Username, server.Password)
if serverTokens["user"] != "" && serverTokens["passwd"] != "" {
requestUrl.User = url.UserPassword(serverTokens["user"],
serverTokens["passwd"])
}
fmt.Printf("host %s url %s\n", serverTokens["host"], requestUrl)
out, err := c.getAttr(requestUrl)
if out["status"] != 200.0 {

View File

@ -106,11 +106,14 @@ const invalidJSON = "I don't think this is JSON"
const empty = ""
var Servers = []Server{Server{Host: "10.10.10.10", Port: "8778"}}
var HeapMetric = Metric{Jmx: "/java.lang:type=Memory/HeapMemoryUsage"}
var ReadLatencyMetric = Metric{"/org.apache.cassandra.metrics:type=Table,keyspace=test_keyspace1,scope=test_table,name=ReadLatency"}
var NestedReadLatencyMetric = Metric{"/org.apache.cassandra.metrics:type=Table,keyspace=test_keyspace1,scope=*,name=ReadLatency"}
var GarbageCollectorMetric = Metric{"/java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionCount"}
var Servers = []string{"10.10.10.10:8778"}
var AuthServers = []string{"user:passwd@10.10.10.10:8778"}
var MultipleServers = []string{"10.10.10.10:8778", "10.10.10.11:8778"}
var HeapMetric = "/java.lang:type=Memory/HeapMemoryUsage"
var ReadLatencyMetric = "/org.apache.cassandra.metrics:type=Table,keyspace=test_keyspace1,scope=test_table,name=ReadLatency"
var NestedReadLatencyMetric = "/org.apache.cassandra.metrics:type=Table,keyspace=test_keyspace1,scope=*,name=ReadLatency"
var GarbageCollectorMetric1 = "/java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionCount"
var GarbageCollectorMetric2 = "/java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionTime"
var Context = "/jolokia/read"
type jolokiaClientStub struct {
@ -132,7 +135,7 @@ func (c jolokiaClientStub) MakeRequest(req *http.Request) (*http.Response, error
//
// Returns:
// *HttpJson: Pointer to an HttpJson object that uses the generated mock HTTP client
func genJolokiaClientStub(response string, statusCode int, servers []Server, metrics []Metric) *Cassandra {
func genJolokiaClientStub(response string, statusCode int, servers []string, metrics []string) *Cassandra {
return &Cassandra{
jClient: jolokiaClientStub{responseBody: response, statusCode: statusCode},
Context: Context,
@ -143,14 +146,15 @@ func genJolokiaClientStub(response string, statusCode int, servers []Server, met
// Test that the proper values are ignored or collected for class=Java
func TestHttpJsonJavaMultiValue(t *testing.T) {
cassandra := genJolokiaClientStub(validJavaMultiValueJSON, 200, Servers, []Metric{HeapMetric})
cassandra := genJolokiaClientStub(validJavaMultiValueJSON, 200,
MultipleServers, []string{HeapMetric})
var acc testutil.Accumulator
acc.SetDebug(true)
err := cassandra.Gather(&acc)
assert.Nil(t, err)
assert.Equal(t, 1, len(acc.Metrics))
assert.Equal(t, 2, len(acc.Metrics))
fields := map[string]interface{}{
"HeapMemoryUsage_init": 67108864.0,
@ -158,22 +162,28 @@ func TestHttpJsonJavaMultiValue(t *testing.T) {
"HeapMemoryUsage_max": 477626368.0,
"HeapMemoryUsage_used": 203288528.0,
}
tags := map[string]string{
tags1 := map[string]string{
"host": "10.10.10.10",
"mname": "HeapMemoryUsage",
}
acc.AssertContainsTaggedFields(t, "javaMemory", fields, tags)
tags2 := map[string]string{
"host": "10.10.10.11",
"mname": "HeapMemoryUsage",
}
acc.AssertContainsTaggedFields(t, "javaMemory", fields, tags1)
acc.AssertContainsTaggedFields(t, "javaMemory", fields, tags2)
}
func TestHttpJsonJavaMultiType(t *testing.T) {
cassandra := genJolokiaClientStub(validJavaMultiTypeJSON, 200, Servers, []Metric{GarbageCollectorMetric})
cassandra := genJolokiaClientStub(validJavaMultiTypeJSON, 200, AuthServers, []string{GarbageCollectorMetric1, GarbageCollectorMetric2})
var acc testutil.Accumulator
acc.SetDebug(true)
err := cassandra.Gather(&acc)
assert.Nil(t, err)
assert.Equal(t, 1, len(acc.Metrics))
assert.Equal(t, 2, len(acc.Metrics))
fields := map[string]interface{}{
"CollectionCount": 1.0,
@ -190,7 +200,7 @@ func TestHttpJsonJavaMultiType(t *testing.T) {
func TestHttpJsonOn404(t *testing.T) {
jolokia := genJolokiaClientStub(validJavaMultiValueJSON, 404, Servers,
[]Metric{HeapMetric})
[]string{HeapMetric})
var acc testutil.Accumulator
err := jolokia.Gather(&acc)
@ -201,7 +211,7 @@ func TestHttpJsonOn404(t *testing.T) {
// Test that the proper values are ignored or collected for class=Cassandra
func TestHttpJsonCassandraMultiValue(t *testing.T) {
cassandra := genJolokiaClientStub(validCassandraMultiValueJSON, 200, Servers, []Metric{ReadLatencyMetric})
cassandra := genJolokiaClientStub(validCassandraMultiValueJSON, 200, Servers, []string{ReadLatencyMetric})
var acc testutil.Accumulator
err := cassandra.Gather(&acc)
@ -232,7 +242,7 @@ func TestHttpJsonCassandraMultiValue(t *testing.T) {
// Test that the proper values are ignored or collected for class=Cassandra with
// nested values
func TestHttpJsonCassandraNestedMultiValue(t *testing.T) {
cassandra := genJolokiaClientStub(validCassandraNestedMultiValueJSON, 200, Servers, []Metric{NestedReadLatencyMetric})
cassandra := genJolokiaClientStub(validCassandraNestedMultiValueJSON, 200, Servers, []string{NestedReadLatencyMetric})
var acc testutil.Accumulator
acc.SetDebug(true)