From 67fe265d34c1d08f0b611c54c00102701bcc260b Mon Sep 17 00:00:00 2001 From: subhachandrachandra Date: Wed, 6 Apr 2016 23:25:08 -0700 Subject: [PATCH] Simplified cassandra plugin config to accept servers and metrics as lists of strings based on comments from @sparrc. --- Arista/ConfigFiles/telegraf-cassandra.conf | 144 +++++++++++++++++++++ plugins/inputs/cassandra/README.md | 23 +--- plugins/inputs/cassandra/cassandra.go | 123 ++++++++++-------- plugins/inputs/cassandra/cassandra_test.go | 40 +++--- 4 files changed, 241 insertions(+), 89 deletions(-) create mode 100644 Arista/ConfigFiles/telegraf-cassandra.conf diff --git a/Arista/ConfigFiles/telegraf-cassandra.conf b/Arista/ConfigFiles/telegraf-cassandra.conf new file mode 100644 index 000000000..6a97c80eb --- /dev/null +++ b/Arista/ConfigFiles/telegraf-cassandra.conf @@ -0,0 +1,144 @@ +# Telegraf configuration + +# Telegraf is entirely plugin driven. All metrics are gathered from the +# declared inputs, and sent to the declared outputs. + +# Plugins must be declared in here to be active. +# To deactivate a plugin, comment out the name and any variables. + +# Use 'telegraf -config telegraf.conf -test' to see what metrics a config +# file would generate. + +# Global tags can be specified here in key="value" format. +[tags] + # dc = "us-east-1" # will tag all metrics with dc=us-east-1 + # rack = "1a" + +# Configuration for telegraf agent +[agent] + # Default data collection interval for all inputs + interval = "5s" + # Rounds collection interval to 'interval' + # ie, if interval="10s" then always collect on :00, :10, :20, etc. + round_interval = true + + # Telegraf will cache metric_buffer_limit metrics for each output, and will + # flush this buffer on a successful write. + metric_buffer_limit = 10000 + + # Collection jitter is used to jitter the collection by a random amount. + # Each plugin will sleep for a random time within jitter before collecting. + # This can be used to avoid many plugins querying things like sysfs at the + # same time, which can have a measurable effect on the system. + collection_jitter = "0s" + + # Default data flushing interval for all outputs. You should not set this below + # interval. Maximum flush_interval will be flush_interval + flush_jitter + flush_interval = "10s" + # Jitter the flush interval by a random amount. This is primarily to avoid + # large write spikes for users running a large number of telegraf instances. + # ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s + flush_jitter = "1s" + + # Run telegraf in debug mode + debug = false + # Run telegraf in quiet mode + quiet = false + # Override default hostname, if empty use os.Hostname() + hostname = "" + + +############################################################################### +# OUTPUTS # +############################################################################### + +# Configuration for influxdb server to send metrics to +[[outputs.influxdb]] + # The full HTTP or UDP endpoint URL for your InfluxDB instance. + # Multiple urls can be specified but it is assumed that they are part of the same + # cluster, this means that only ONE of the urls will be written to each interval. + # urls = ["udp://localhost:8089"] # UDP endpoint example + urls = ["http://192.168.99.1:8086"] # required + #urls = ["http://172.22.229.35:8086"] # required + # The target database for metrics (telegraf will create it if not exists) + database = "cassandra" # required + # Precision of writes, valid values are n, u, ms, s, m, and h + # note: using second precision greatly helps InfluxDB compression + precision = "s" + + # Connection timeout (for the connection with InfluxDB), formatted as a string. + # If not provided, will default to 0 (no timeout) + # timeout = "5s" + # username = "telegraf" + # password = "metricsmetricsmetricsmetrics" + # Set the user agent for HTTP POSTs (can be useful for log differentiation) + # user_agent = "telegraf" + # Set UDP payload size, defaults to InfluxDB UDP Client default (512 bytes) + # udp_payload = 512 + +############################################################################### +# INPUTS # +############################################################################### +[cassandra] + context = "/jolokia/read" + servers = [":8778"] + metrics = [ + "/java.lang:type=Memory/HeapMemoryUsage", + "/java.lang:type=Memory/NonHeapMemoryUsage", + "/java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionTime", + "/java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionCount", + "/java.lang:type=GarbageCollector,name=ParNew/CollectionTime", + "/java.lang:type=GarbageCollector,name=ParNew/CollectionCount", + "/org.apache.cassandra.metrics:type=Compaction,name=CompletedTasks", + "/org.apache.cassandra.metrics:type=Compaction,name=PendingTasks", + "/org.apache.cassandra.metrics:type=Compaction,name=TotalCompactionsCompleted", + "/org.apache.cassandra.metrics:type=Compaction,name=BytesCompacted", + "/org.apache.cassandra.metrics:type=Client,name=connectedNativeClients", + "/org.apache.cassandra.metrics:type=Storage,name=Load", + "/org.apache.cassandra.metrics:type=Storage,name=Exceptions", + "/org.apache.cassandra.metrics:type=ThreadPools,path=internal,scope=CompactionExecutor,name=ActiveTasks", + "/org.apache.cassandra.metrics:type=ThreadPools,path=internal,scope=AntiEntropyStage,name=ActiveTasks", + "/org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=CounterMutationStage,name=PendingTasks", + "/org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=MutationStage,name=PendingTasks", + "/org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadRepairStage,name=PendingTasks", + "/org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadStage,name=PendingTasks", + "/org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=RequestResponseStage,name=PendingTasks", + "/org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=CounterMutationStage,name=CurrentlyBlockedTasks", + "/org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=MutationStage,name=CurrentlyBlockedTasks", + "/org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadRepairStage,name=CurrentlyBlockedTasks", + "/org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadStage,name=CurrentlyBlockedTasks", + "/org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=RequestResponseStage,name=CurrentlyBlockedTasks", + "/org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=TotalLatency", + "/org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=TotalLatency", + "/org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Latency", + "/org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Latency", + "/org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Timeouts", + "/org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Timeouts", + "/org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Unavailables", + "/org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Unavailables", + "/org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Failures", + "/org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Failures", + "/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency", + "/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=CoordinatorReadLatency", + "/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=WriteLatency", + "/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadTotalLatency", + "/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=WriteTotalLatency", + "/org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Hits", + "/org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Requests", + "/org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Entries", + "/org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Size", + "/org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Capacity", + "/org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Hits", + "/org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Requests", + "/org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Entries", + "/org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Size", + "/org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Capacity", + "/org.apache.cassandra.metrics:type=CommitLog,name=PendingTasks", + "/org.apache.cassandra.metrics:type=CommitLog,name=TotalCommitLogSize", + "/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=LiveDiskSpaceUsed", + "/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=TotalDiskSpaceUsed", + ] + +############################################################################### +# SERVICE INPUTS # +############################################################################### diff --git a/plugins/inputs/cassandra/README.md b/plugins/inputs/cassandra/README.md index f7ec4c715..49b046dc5 100644 --- a/plugins/inputs/cassandra/README.md +++ b/plugins/inputs/cassandra/README.md @@ -2,15 +2,8 @@ #### Plugin arguments: - **context** string: Context root used for jolokia url -- **servers** []Server: List of servers - + **host** string: Server's ip address or hostname - + **port** string: Server's listening port - + **username** string: Server's username for authentication (optional) - + **password** string: Server's password for authentication (optional) -- **metrics** []Metric - + **jmx** string: Jmx path that identifies mbeans attributes - + **pass** []string: Attributes to retain when collecting values (TODO) - + **drop** []string: Attributes to drop when collecting values (TODO) +- **servers** []string: List of servers with the format ":port" +- **metrics** []string: List of Jmx paths that identify mbeans attributes #### Description @@ -24,14 +17,10 @@ Cassandra plugin produces one or more measurements for each metric configured, a Given a configuration like: ```ini -[cassandra] - -[[cassandra.servers]] - host = "127.0.0.1" - port = "878" - -[[cassandra.metrics]] - jmx = "/java.lang:type=Memory/HeapMemoryUsage" +[inputs.cassandra] + context = "/jolokia/read" + servers = [":8778"] + metrics = ["/java.lang:type=Memory/HeapMemoryUsage"] ``` The collected metrics will be: diff --git a/plugins/inputs/cassandra/cassandra.go b/plugins/inputs/cassandra/cassandra.go index ce9230ff3..71f0b17a7 100644 --- a/plugins/inputs/cassandra/cassandra.go +++ b/plugins/inputs/cassandra/cassandra.go @@ -13,16 +13,12 @@ import ( "strings" ) -type Server struct { +/*type Server struct { Host string Username string Password string Port string -} - -type Metric struct { - Jmx string -} +}*/ type JolokiaClient interface { MakeRequest(req *http.Request) (*http.Response, error) @@ -39,19 +35,19 @@ func (c JolokiaClientImpl) MakeRequest(req *http.Request) (*http.Response, error type Cassandra struct { jClient JolokiaClient Context string - Servers []Server - Metrics []Metric + Servers []string + Metrics []string } type javaMetric struct { - server Server - metric Metric + host string + metric string acc telegraf.Accumulator } type cassandraMetric struct { - server Server - metric Metric + host string + metric string acc telegraf.Accumulator } @@ -59,21 +55,20 @@ type jmxMetric interface { addTagsFields(out map[string]interface{}) } -func addServerTags(server Server, tags map[string]string) { - if server.Host != "" && server.Host != "localhost" && - server.Host != "127.0.0.1" { - tags["host"] = server.Host +func addServerTags(host string, tags map[string]string) { + if host != "" && host != "localhost" && host != "127.0.0.1" { + tags["host"] = host } } -func newJavaMetric(server Server, metric Metric, +func newJavaMetric(host string, metric string, acc telegraf.Accumulator) *javaMetric { - return &javaMetric{server: server, metric: metric, acc: acc} + return &javaMetric{host: host, metric: metric, acc: acc} } -func newCassandraMetric(server Server, metric Metric, +func newCassandraMetric(host string, metric string, acc telegraf.Accumulator) *cassandraMetric { - return &cassandraMetric{server: server, metric: metric, acc: acc} + return &cassandraMetric{host: host, metric: metric, acc: acc} } func addValuesAsFields(values map[string]interface{}, fields map[string]interface{}, @@ -125,7 +120,7 @@ func (j javaMetric) addTagsFields(out map[string]interface{}) { tokens := parseJmxMetricRequest(mbean) addTokensToTags(tokens, tags) - addServerTags(j.server, tags) + addServerTags(j.host, tags) if _, ok := tags["mname"]; !ok { //Queries for a single value will not return a "name" tag in the response. @@ -142,7 +137,7 @@ func (j javaMetric) addTagsFields(out map[string]interface{}) { j.acc.AddFields(tokens["class"]+tokens["type"], fields, tags) } else { fmt.Printf("Missing key 'value' in '%s' output response\n%v\n", - j.metric.Jmx, out) + j.metric, out) } } @@ -153,7 +148,7 @@ func addCassandraMetric(mbean string, c cassandraMetric, fields := make(map[string]interface{}) tokens := parseJmxMetricRequest(mbean) addTokensToTags(tokens, tags) - addServerTags(c.server, tags) + addServerTags(c.host, tags) addValuesAsFields(values, fields, tags["mname"]) c.acc.AddFields(tokens["class"]+tokens["type"], fields, tags) @@ -174,7 +169,7 @@ func (c cassandraMetric) addTagsFields(out map[string]interface{}) { } } else { fmt.Printf("Missing key 'value' in '%s' output response\n%v\n", - c.metric.Jmx, out) + c.metric, out) return } } else { @@ -183,7 +178,7 @@ func (c cassandraMetric) addTagsFields(out map[string]interface{}) { c, values.(map[string]interface{})) } else { fmt.Printf("Missing key 'value' in '%s' output response\n%v\n", - c.metric.Jmx, out) + c.metric, out) return } } @@ -193,28 +188,16 @@ func (j *Cassandra) SampleConfig() string { return ` # This is the context root used to compose the jolokia url context = "/jolokia/read" - - # List of cassandra servers exposing jolokia read service - [[cassandra.servers]] - # host can be skipped for localhost. host tag will be set to hostname() - host = "192.168.103.2" - port = "8180" - # username = "myuser" - # password = "mypassword" - - # List of metrics collected on above servers - # Each metric consists of a jmx path. Pass or drop slice attributes will be - # supported in the future. - # This will collect all heap memory usage metrics from the jvm - [[cassandra..metrics]] - jmx = "/java.lang:type=Memory/HeapMemoryUsage" - - # This will collect ReadLatency metrics for all keyspaces and tables. - # "type=Table" in the query works with Cassandra3.0. Older versions might need - # to use "type=ColumnFamily" - [[cassandra..metrics]] - jmx = "/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadL -atency" + ## List of cassandra servers exposing jolokia read service + servers = ["myuser:mypassword@10.10.10.1:8778","10.10.10.2:8778",":8778"] + ## List of metrics collected on above servers + ## Each metric consists of a jmx path. + ## This will collect all heap memory usage metrics from the jvm and + ## ReadLatency metrics for all keyspaces and tables. + ## "type=Table" in the query works with Cassandra3.0. Older versions might + ## need to use "type=ColumnFamily" + metrics = ["/java.lang:type=Memory/HeapMemoryUsage", + "/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency"] ` } @@ -261,6 +244,30 @@ func (j *Cassandra) getAttr(requestUrl *url.URL) (map[string]interface{}, error) return jsonOut, nil } +func parseServerTokens(server string) map[string]string { + serverTokens := make(map[string]string) + + hostAndUser := strings.Split(server, "@") + hostPort := "" + userPasswd := "" + if len(hostAndUser) == 2 { + hostPort = hostAndUser[1] + userPasswd = hostAndUser[0] + } else { + hostPort = hostAndUser[0] + } + hostTokens := strings.Split(hostPort, ":") + serverTokens["host"] = hostTokens[0] + serverTokens["port"] = hostTokens[1] + + if userPasswd != "" { + userTokens := strings.Split(userPasswd, ":") + serverTokens["user"] = userTokens[0] + serverTokens["passwd"] = userTokens[1] + } + return serverTokens +} + func (c *Cassandra) Gather(acc telegraf.Accumulator) error { context := c.Context servers := c.Servers @@ -270,24 +277,26 @@ func (c *Cassandra) Gather(acc telegraf.Accumulator) error { for _, metric := range metrics { var m jmxMetric - if strings.HasPrefix(metric.Jmx, "/java.lang:") { - m = newJavaMetric(server, metric, acc) - } else if strings.HasPrefix(metric.Jmx, + serverTokens := parseServerTokens(server) + + if strings.HasPrefix(metric, "/java.lang:") { + m = newJavaMetric(serverTokens["host"], metric, acc) + } else if strings.HasPrefix(metric, "/org.apache.cassandra.metrics:") { - m = newCassandraMetric(server, metric, acc) + m = newCassandraMetric(serverTokens["host"], metric, acc) } - jmxPath := metric.Jmx // Prepare URL - requestUrl, err := url.Parse("http://" + server.Host + ":" + - server.Port + context + jmxPath) - fmt.Printf("host %s url %s\n", server.Host, requestUrl) + requestUrl, err := url.Parse("http://" + serverTokens["host"] + ":" + + serverTokens["port"] + context + metric) if err != nil { return err } - if server.Username != "" || server.Password != "" { - requestUrl.User = url.UserPassword(server.Username, server.Password) + if serverTokens["user"] != "" && serverTokens["passwd"] != "" { + requestUrl.User = url.UserPassword(serverTokens["user"], + serverTokens["passwd"]) } + fmt.Printf("host %s url %s\n", serverTokens["host"], requestUrl) out, err := c.getAttr(requestUrl) if out["status"] != 200.0 { diff --git a/plugins/inputs/cassandra/cassandra_test.go b/plugins/inputs/cassandra/cassandra_test.go index 4fa12aa8f..184fa3bbb 100644 --- a/plugins/inputs/cassandra/cassandra_test.go +++ b/plugins/inputs/cassandra/cassandra_test.go @@ -106,11 +106,14 @@ const invalidJSON = "I don't think this is JSON" const empty = "" -var Servers = []Server{Server{Host: "10.10.10.10", Port: "8778"}} -var HeapMetric = Metric{Jmx: "/java.lang:type=Memory/HeapMemoryUsage"} -var ReadLatencyMetric = Metric{"/org.apache.cassandra.metrics:type=Table,keyspace=test_keyspace1,scope=test_table,name=ReadLatency"} -var NestedReadLatencyMetric = Metric{"/org.apache.cassandra.metrics:type=Table,keyspace=test_keyspace1,scope=*,name=ReadLatency"} -var GarbageCollectorMetric = Metric{"/java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionCount"} +var Servers = []string{"10.10.10.10:8778"} +var AuthServers = []string{"user:passwd@10.10.10.10:8778"} +var MultipleServers = []string{"10.10.10.10:8778", "10.10.10.11:8778"} +var HeapMetric = "/java.lang:type=Memory/HeapMemoryUsage" +var ReadLatencyMetric = "/org.apache.cassandra.metrics:type=Table,keyspace=test_keyspace1,scope=test_table,name=ReadLatency" +var NestedReadLatencyMetric = "/org.apache.cassandra.metrics:type=Table,keyspace=test_keyspace1,scope=*,name=ReadLatency" +var GarbageCollectorMetric1 = "/java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionCount" +var GarbageCollectorMetric2 = "/java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionTime" var Context = "/jolokia/read" type jolokiaClientStub struct { @@ -132,7 +135,7 @@ func (c jolokiaClientStub) MakeRequest(req *http.Request) (*http.Response, error // // Returns: // *HttpJson: Pointer to an HttpJson object that uses the generated mock HTTP client -func genJolokiaClientStub(response string, statusCode int, servers []Server, metrics []Metric) *Cassandra { +func genJolokiaClientStub(response string, statusCode int, servers []string, metrics []string) *Cassandra { return &Cassandra{ jClient: jolokiaClientStub{responseBody: response, statusCode: statusCode}, Context: Context, @@ -143,14 +146,15 @@ func genJolokiaClientStub(response string, statusCode int, servers []Server, met // Test that the proper values are ignored or collected for class=Java func TestHttpJsonJavaMultiValue(t *testing.T) { - cassandra := genJolokiaClientStub(validJavaMultiValueJSON, 200, Servers, []Metric{HeapMetric}) + cassandra := genJolokiaClientStub(validJavaMultiValueJSON, 200, + MultipleServers, []string{HeapMetric}) var acc testutil.Accumulator acc.SetDebug(true) err := cassandra.Gather(&acc) assert.Nil(t, err) - assert.Equal(t, 1, len(acc.Metrics)) + assert.Equal(t, 2, len(acc.Metrics)) fields := map[string]interface{}{ "HeapMemoryUsage_init": 67108864.0, @@ -158,22 +162,28 @@ func TestHttpJsonJavaMultiValue(t *testing.T) { "HeapMemoryUsage_max": 477626368.0, "HeapMemoryUsage_used": 203288528.0, } - tags := map[string]string{ + tags1 := map[string]string{ "host": "10.10.10.10", "mname": "HeapMemoryUsage", } - acc.AssertContainsTaggedFields(t, "javaMemory", fields, tags) + + tags2 := map[string]string{ + "host": "10.10.10.11", + "mname": "HeapMemoryUsage", + } + acc.AssertContainsTaggedFields(t, "javaMemory", fields, tags1) + acc.AssertContainsTaggedFields(t, "javaMemory", fields, tags2) } func TestHttpJsonJavaMultiType(t *testing.T) { - cassandra := genJolokiaClientStub(validJavaMultiTypeJSON, 200, Servers, []Metric{GarbageCollectorMetric}) + cassandra := genJolokiaClientStub(validJavaMultiTypeJSON, 200, AuthServers, []string{GarbageCollectorMetric1, GarbageCollectorMetric2}) var acc testutil.Accumulator acc.SetDebug(true) err := cassandra.Gather(&acc) assert.Nil(t, err) - assert.Equal(t, 1, len(acc.Metrics)) + assert.Equal(t, 2, len(acc.Metrics)) fields := map[string]interface{}{ "CollectionCount": 1.0, @@ -190,7 +200,7 @@ func TestHttpJsonJavaMultiType(t *testing.T) { func TestHttpJsonOn404(t *testing.T) { jolokia := genJolokiaClientStub(validJavaMultiValueJSON, 404, Servers, - []Metric{HeapMetric}) + []string{HeapMetric}) var acc testutil.Accumulator err := jolokia.Gather(&acc) @@ -201,7 +211,7 @@ func TestHttpJsonOn404(t *testing.T) { // Test that the proper values are ignored or collected for class=Cassandra func TestHttpJsonCassandraMultiValue(t *testing.T) { - cassandra := genJolokiaClientStub(validCassandraMultiValueJSON, 200, Servers, []Metric{ReadLatencyMetric}) + cassandra := genJolokiaClientStub(validCassandraMultiValueJSON, 200, Servers, []string{ReadLatencyMetric}) var acc testutil.Accumulator err := cassandra.Gather(&acc) @@ -232,7 +242,7 @@ func TestHttpJsonCassandraMultiValue(t *testing.T) { // Test that the proper values are ignored or collected for class=Cassandra with // nested values func TestHttpJsonCassandraNestedMultiValue(t *testing.T) { - cassandra := genJolokiaClientStub(validCassandraNestedMultiValueJSON, 200, Servers, []Metric{NestedReadLatencyMetric}) + cassandra := genJolokiaClientStub(validCassandraNestedMultiValueJSON, 200, Servers, []string{NestedReadLatencyMetric}) var acc testutil.Accumulator acc.SetDebug(true)