diff --git a/README.md b/README.md index d93494ecb..9b1e7b171 100644 --- a/README.md +++ b/README.md @@ -187,6 +187,7 @@ Currently implemented sources: * raindrops * redis * rethinkdb +* riak * sql server (microsoft) * twemproxy * zfs diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 74331e54b..79deb7c99 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -40,6 +40,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/raindrops" _ "github.com/influxdata/telegraf/plugins/inputs/redis" _ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb" + _ "github.com/influxdata/telegraf/plugins/inputs/riak" _ "github.com/influxdata/telegraf/plugins/inputs/sensors" _ "github.com/influxdata/telegraf/plugins/inputs/snmp" _ "github.com/influxdata/telegraf/plugins/inputs/sqlserver" diff --git a/plugins/inputs/riak/README.md b/plugins/inputs/riak/README.md new file mode 100644 index 000000000..3aa39ae09 --- /dev/null +++ b/plugins/inputs/riak/README.md @@ -0,0 +1,76 @@ +# Riak Plugin + +The Riak plugin gathers metrics from one or more riak instances. + +### Configuration: + +```toml +# Description +[[inputs.riak]] + # Specify a list of one or more riak http servers + servers = ["http://localhost:8098"] +``` + +### Measurements & Fields: + +Riak provides one measurement named "riak", with the following fields: + +- cpu_avg1 +- cpu_avg15 +- cpu_avg5 +- memory_code +- memory_ets +- memory_processes +- memory_system +- memory_total +- node_get_fsm_objsize_100 +- node_get_fsm_objsize_95 +- node_get_fsm_objsize_99 +- node_get_fsm_objsize_mean +- node_get_fsm_objsize_median +- node_get_fsm_siblings_100 +- node_get_fsm_siblings_95 +- node_get_fsm_siblings_99 +- node_get_fsm_siblings_mean +- node_get_fsm_siblings_median +- node_get_fsm_time_100 +- node_get_fsm_time_95 +- node_get_fsm_time_99 +- node_get_fsm_time_mean +- node_get_fsm_time_median +- node_gets +- node_gets_total +- node_put_fsm_time_100 +- node_put_fsm_time_95 +- node_put_fsm_time_99 +- node_put_fsm_time_mean +- node_put_fsm_time_median +- node_puts +- node_puts_total +- pbc_active +- pbc_connects +- pbc_connects_total +- vnode_gets +- vnode_gets_total +- vnode_index_reads +- vnode_index_reads_total +- vnode_index_writes +- vnode_index_writes_total +- vnode_puts +- vnode_puts_total + +Measurements of time (such as node_get_fsm_time_mean) are measured in nanoseconds. + +### Tags: + +All measurements have the following tags: + +- server (the host:port of the given server address, ex. `127.0.0.1:8087`) +- nodename (the internal node name received, ex. `riak@127.0.0.1` ) + +### Example Output: + +``` +$ ./telegraf -config telegraf.conf -input-filter riak -test +> riak,nodename=riak@127.0.0.1,server=localhost:8098 cpu_avg1=31i,cpu_avg15=69i,cpu_avg5=51i,memory_code=11563738i,memory_ets=5925872i,memory_processes=30236069i,memory_system=93074971i,memory_total=123311040i,node_get_fsm_objsize_100=0i,node_get_fsm_objsize_95=0i,node_get_fsm_objsize_99=0i,node_get_fsm_objsize_mean=0i,node_get_fsm_objsize_median=0i,node_get_fsm_siblings_100=0i,node_get_fsm_siblings_95=0i,node_get_fsm_siblings_99=0i,node_get_fsm_siblings_mean=0i,node_get_fsm_siblings_median=0i,node_get_fsm_time_100=0i,node_get_fsm_time_95=0i,node_get_fsm_time_99=0i,node_get_fsm_time_mean=0i,node_get_fsm_time_median=0i,node_gets=0i,node_gets_total=19i,node_put_fsm_time_100=0i,node_put_fsm_time_95=0i,node_put_fsm_time_99=0i,node_put_fsm_time_mean=0i,node_put_fsm_time_median=0i,node_puts=0i,node_puts_total=0i,pbc_active=0i,pbc_connects=0i,pbc_connects_total=20i,vnode_gets=0i,vnode_gets_total=57i,vnode_index_reads=0i,vnode_index_reads_total=0i,vnode_index_writes=0i,vnode_index_writes_total=0i,vnode_puts=0i,vnode_puts_total=0i 1455913392622482332 +```gt \ No newline at end of file diff --git a/plugins/inputs/riak/riak.go b/plugins/inputs/riak/riak.go new file mode 100644 index 000000000..6750c75a0 --- /dev/null +++ b/plugins/inputs/riak/riak.go @@ -0,0 +1,196 @@ +package riak + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +// Type Riak gathers statistics from one or more Riak instances +type Riak struct { + // Servers is a slice of servers as http addresses (ex. http://127.0.0.1:8098) + Servers []string + + client *http.Client +} + +// NewRiak return a new instance of Riak with a default http client +func NewRiak() *Riak { + return &Riak{client: http.DefaultClient} +} + +// Type riakStats represents the data that is received from Riak +type riakStats struct { + CpuAvg1 int64 `json:"cpu_avg1"` + CpuAvg15 int64 `json:"cpu_avg15"` + CpuAvg5 int64 `json:"cpu_avg5"` + MemoryCode int64 `json:"memory_code"` + MemoryEts int64 `json:"memory_ets"` + MemoryProcesses int64 `json:"memory_processes"` + MemorySystem int64 `json:"memory_system"` + MemoryTotal int64 `json:"memory_total"` + NodeGetFsmObjsize100 int64 `json:"node_get_fsm_objsize_100"` + NodeGetFsmObjsize95 int64 `json:"node_get_fsm_objsize_95"` + NodeGetFsmObjsize99 int64 `json:"node_get_fsm_objsize_99"` + NodeGetFsmObjsizeMean int64 `json:"node_get_fsm_objsize_mean"` + NodeGetFsmObjsizeMedian int64 `json:"node_get_fsm_objsize_median"` + NodeGetFsmSiblings100 int64 `json:"node_get_fsm_siblings_100"` + NodeGetFsmSiblings95 int64 `json:"node_get_fsm_siblings_95"` + NodeGetFsmSiblings99 int64 `json:"node_get_fsm_siblings_99"` + NodeGetFsmSiblingsMean int64 `json:"node_get_fsm_siblings_mean"` + NodeGetFsmSiblingsMedian int64 `json:"node_get_fsm_siblings_median"` + NodeGetFsmTime100 int64 `json:"node_get_fsm_time_100"` + NodeGetFsmTime95 int64 `json:"node_get_fsm_time_95"` + NodeGetFsmTime99 int64 `json:"node_get_fsm_time_99"` + NodeGetFsmTimeMean int64 `json:"node_get_fsm_time_mean"` + NodeGetFsmTimeMedian int64 `json:"node_get_fsm_time_median"` + NodeGets int64 `json:"node_gets"` + NodeGetsTotal int64 `json:"node_gets_total"` + Nodename string `json:"nodename"` + NodePutFsmTime100 int64 `json:"node_put_fsm_time_100"` + NodePutFsmTime95 int64 `json:"node_put_fsm_time_95"` + NodePutFsmTime99 int64 `json:"node_put_fsm_time_99"` + NodePutFsmTimeMean int64 `json:"node_put_fsm_time_mean"` + NodePutFsmTimeMedian int64 `json:"node_put_fsm_time_median"` + NodePuts int64 `json:"node_puts"` + NodePutsTotal int64 `json:"node_puts_total"` + PbcActive int64 `json:"pbc_active"` + PbcConnects int64 `json:"pbc_connects"` + PbcConnectsTotal int64 `json:"pbc_connects_total"` + VnodeGets int64 `json:"vnode_gets"` + VnodeGetsTotal int64 `json:"vnode_gets_total"` + VnodeIndexReads int64 `json:"vnode_index_reads"` + VnodeIndexReadsTotal int64 `json:"vnode_index_reads_total"` + VnodeIndexWrites int64 `json:"vnode_index_writes"` + VnodeIndexWritesTotal int64 `json:"vnode_index_writes_total"` + VnodePuts int64 `json:"vnode_puts"` + VnodePutsTotal int64 `json:"vnode_puts_total"` +} + +// A sample configuration to only gather stats from localhost, default port. +const sampleConfig = ` + # Specify a list of one or more riak http servers + servers = ["http://localhost:8098"] +` + +// Returns a sample configuration for the plugin +func (r *Riak) SampleConfig() string { + return sampleConfig +} + +// Returns a description of the plugin +func (r *Riak) Description() string { + return "Read metrics one or many Riak servers" +} + +// Reads stats from all configured servers. +func (r *Riak) Gather(acc telegraf.Accumulator) error { + // Default to a single server at localhost (default port) if none specified + if len(r.Servers) == 0 { + r.Servers = []string{"http://127.0.0.1:8098"} + } + + // Range over all servers, gathering stats. Returns early in case of any error. + for _, s := range r.Servers { + if err := r.gatherServer(s, acc); err != nil { + return err + } + } + + return nil +} + +// Gathers stats from a single server, adding them to the accumulator +func (r *Riak) gatherServer(s string, acc telegraf.Accumulator) error { + // Parse the given URL to extract the server tag + u, err := url.Parse(s) + if err != nil { + return fmt.Errorf("riak unable to parse given server url %s: %s", s, err) + } + + // Perform the GET request to the riak /stats endpoint + resp, err := r.client.Get(s + "/stats") + if err != nil { + return err + } + defer resp.Body.Close() + + // Successful responses will always return status code 200 + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("riak responded with unexepcted status code %d", resp.StatusCode) + } + + // Decode the response JSON into a new stats struct + stats := &riakStats{} + if err := json.NewDecoder(resp.Body).Decode(stats); err != nil { + return fmt.Errorf("unable to decode riak response: %s", err) + } + + // Build a map of tags + tags := map[string]string{ + "nodename": stats.Nodename, + "server": u.Host, + } + + // Build a map of field values + fields := map[string]interface{}{ + "cpu_avg1": stats.CpuAvg1, + "cpu_avg15": stats.CpuAvg15, + "cpu_avg5": stats.CpuAvg5, + "memory_code": stats.MemoryCode, + "memory_ets": stats.MemoryEts, + "memory_processes": stats.MemoryProcesses, + "memory_system": stats.MemorySystem, + "memory_total": stats.MemoryTotal, + "node_get_fsm_objsize_100": stats.NodeGetFsmObjsize100, + "node_get_fsm_objsize_95": stats.NodeGetFsmObjsize95, + "node_get_fsm_objsize_99": stats.NodeGetFsmObjsize99, + "node_get_fsm_objsize_mean": stats.NodeGetFsmObjsizeMean, + "node_get_fsm_objsize_median": stats.NodeGetFsmObjsizeMedian, + "node_get_fsm_siblings_100": stats.NodeGetFsmSiblings100, + "node_get_fsm_siblings_95": stats.NodeGetFsmSiblings95, + "node_get_fsm_siblings_99": stats.NodeGetFsmSiblings99, + "node_get_fsm_siblings_mean": stats.NodeGetFsmSiblingsMean, + "node_get_fsm_siblings_median": stats.NodeGetFsmSiblingsMedian, + "node_get_fsm_time_100": stats.NodeGetFsmTime100, + "node_get_fsm_time_95": stats.NodeGetFsmTime95, + "node_get_fsm_time_99": stats.NodeGetFsmTime99, + "node_get_fsm_time_mean": stats.NodeGetFsmTimeMean, + "node_get_fsm_time_median": stats.NodeGetFsmTimeMedian, + "node_gets": stats.NodeGets, + "node_gets_total": stats.NodeGetsTotal, + "node_put_fsm_time_100": stats.NodePutFsmTime100, + "node_put_fsm_time_95": stats.NodePutFsmTime95, + "node_put_fsm_time_99": stats.NodePutFsmTime99, + "node_put_fsm_time_mean": stats.NodePutFsmTimeMean, + "node_put_fsm_time_median": stats.NodePutFsmTimeMedian, + "node_puts": stats.NodePuts, + "node_puts_total": stats.NodePutsTotal, + "pbc_active": stats.PbcActive, + "pbc_connects": stats.PbcConnects, + "pbc_connects_total": stats.PbcConnectsTotal, + "vnode_gets": stats.VnodeGets, + "vnode_gets_total": stats.VnodeGetsTotal, + "vnode_index_reads": stats.VnodeIndexReads, + "vnode_index_reads_total": stats.VnodeIndexReadsTotal, + "vnode_index_writes": stats.VnodeIndexWrites, + "vnode_index_writes_total": stats.VnodeIndexWritesTotal, + "vnode_puts": stats.VnodePuts, + "vnode_puts_total": stats.VnodePutsTotal, + } + + // Accumulate the tags and values + acc.AddFields("riak", fields, tags) + + return nil +} + +func init() { + inputs.Add("riak", func() telegraf.Input { + return NewRiak() + }) +} diff --git a/plugins/inputs/riak/riak_test.go b/plugins/inputs/riak/riak_test.go new file mode 100644 index 000000000..f92a98973 --- /dev/null +++ b/plugins/inputs/riak/riak_test.go @@ -0,0 +1,276 @@ +package riak + +import ( + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/influxdata/telegraf/testutil" + + "github.com/stretchr/testify/require" +) + +func TestRiak(t *testing.T) { + // Create a test server with the const response JSON + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w, response) + })) + defer ts.Close() + + // Parse the URL of the test server, used to verify the expected host + u, err := url.Parse(ts.URL) + require.NoError(t, err) + + // Create a new Riak instance with our given test server + riak := NewRiak() + riak.Servers = []string{ts.URL} + + // Create a test accumulator + acc := &testutil.Accumulator{} + + // Gather data from the test server + err = riak.Gather(acc) + require.NoError(t, err) + + // Expect the correct values for all known keys + expectFields := map[string]interface{}{ + "cpu_avg1": int64(504), + "cpu_avg15": int64(294), + "cpu_avg5": int64(325), + "memory_code": int64(12329143), + "memory_ets": int64(17330176), + "memory_processes": int64(58454730), + "memory_system": int64(120401678), + "memory_total": int64(178856408), + "node_get_fsm_objsize_100": int64(73596), + "node_get_fsm_objsize_95": int64(36663), + "node_get_fsm_objsize_99": int64(51552), + "node_get_fsm_objsize_mean": int64(13241), + "node_get_fsm_objsize_median": int64(10365), + "node_get_fsm_siblings_100": int64(1), + "node_get_fsm_siblings_95": int64(1), + "node_get_fsm_siblings_99": int64(1), + "node_get_fsm_siblings_mean": int64(1), + "node_get_fsm_siblings_median": int64(1), + "node_get_fsm_time_100": int64(230445), + "node_get_fsm_time_95": int64(24259), + "node_get_fsm_time_99": int64(96653), + "node_get_fsm_time_mean": int64(6851), + "node_get_fsm_time_median": int64(2368), + "node_gets": int64(1116), + "node_gets_total": int64(1026058217), + "node_put_fsm_time_100": int64(267390), + "node_put_fsm_time_95": int64(38286), + "node_put_fsm_time_99": int64(84422), + "node_put_fsm_time_mean": int64(10832), + "node_put_fsm_time_median": int64(4085), + "node_puts": int64(1155), + "node_puts_total": int64(444895769), + "pbc_active": int64(360), + "pbc_connects": int64(120), + "pbc_connects_total": int64(66793268), + "vnode_gets": int64(14629), + "vnode_gets_total": int64(3748432761), + "vnode_index_reads": int64(20), + "vnode_index_reads_total": int64(3438296), + "vnode_index_writes": int64(4293), + "vnode_index_writes_total": int64(1515986619), + "vnode_puts": int64(4308), + "vnode_puts_total": int64(1519062272), + } + + // Expect the correct values for all tags + expectTags := map[string]string{ + "nodename": "riak@127.0.0.1", + "server": u.Host, + } + + acc.AssertContainsTaggedFields(t, "riak", expectFields, expectTags) +} + +var response = ` +{ + "riak_kv_stat_ts": 1455908558, + "vnode_gets": 14629, + "vnode_gets_total": 3748432761, + "vnode_puts": 4308, + "vnode_puts_total": 1519062272, + "vnode_index_refreshes": 0, + "vnode_index_refreshes_total": 0, + "vnode_index_reads": 20, + "vnode_index_reads_total": 3438296, + "vnode_index_writes": 4293, + "vnode_index_writes_total": 1515986619, + "vnode_index_writes_postings": 1, + "vnode_index_writes_postings_total": 265613, + "vnode_index_deletes": 0, + "vnode_index_deletes_total": 0, + "vnode_index_deletes_postings": 0, + "vnode_index_deletes_postings_total": 1, + "node_gets": 1116, + "node_gets_total": 1026058217, + "node_get_fsm_siblings_mean": 1, + "node_get_fsm_siblings_median": 1, + "node_get_fsm_siblings_95": 1, + "node_get_fsm_siblings_99": 1, + "node_get_fsm_siblings_100": 1, + "node_get_fsm_objsize_mean": 13241, + "node_get_fsm_objsize_median": 10365, + "node_get_fsm_objsize_95": 36663, + "node_get_fsm_objsize_99": 51552, + "node_get_fsm_objsize_100": 73596, + "node_get_fsm_time_mean": 6851, + "node_get_fsm_time_median": 2368, + "node_get_fsm_time_95": 24259, + "node_get_fsm_time_99": 96653, + "node_get_fsm_time_100": 230445, + "node_puts": 1155, + "node_puts_total": 444895769, + "node_put_fsm_time_mean": 10832, + "node_put_fsm_time_median": 4085, + "node_put_fsm_time_95": 38286, + "node_put_fsm_time_99": 84422, + "node_put_fsm_time_100": 267390, + "read_repairs": 2, + "read_repairs_total": 7918375, + "coord_redirs_total": 118238575, + "executing_mappers": 0, + "precommit_fail": 0, + "postcommit_fail": 0, + "index_fsm_create": 0, + "index_fsm_create_error": 0, + "index_fsm_active": 0, + "list_fsm_create": 0, + "list_fsm_create_error": 0, + "list_fsm_active": 0, + "pbc_active": 360, + "pbc_connects": 120, + "pbc_connects_total": 66793268, + "late_put_fsm_coordinator_ack": 152, + "node_get_fsm_active": 1, + "node_get_fsm_active_60s": 1029, + "node_get_fsm_in_rate": 21, + "node_get_fsm_out_rate": 21, + "node_get_fsm_rejected": 0, + "node_get_fsm_rejected_60s": 0, + "node_get_fsm_rejected_total": 0, + "node_put_fsm_active": 69, + "node_put_fsm_active_60s": 1053, + "node_put_fsm_in_rate": 30, + "node_put_fsm_out_rate": 31, + "node_put_fsm_rejected": 0, + "node_put_fsm_rejected_60s": 0, + "node_put_fsm_rejected_total": 0, + "read_repairs_primary_outofdate_one": 4, + "read_repairs_primary_outofdate_count": 14761552, + "read_repairs_primary_notfound_one": 0, + "read_repairs_primary_notfound_count": 65879, + "read_repairs_fallback_outofdate_one": 0, + "read_repairs_fallback_outofdate_count": 23761, + "read_repairs_fallback_notfound_one": 0, + "read_repairs_fallback_notfound_count": 455697, + "leveldb_read_block_error": 0, + "riak_pipe_stat_ts": 1455908558, + "pipeline_active": 0, + "pipeline_create_count": 0, + "pipeline_create_one": 0, + "pipeline_create_error_count": 0, + "pipeline_create_error_one": 0, + "cpu_nprocs": 362, + "cpu_avg1": 504, + "cpu_avg5": 325, + "cpu_avg15": 294, + "mem_total": 33695432704, + "mem_allocated": 33454874624, + "nodename": "riak@127.0.0.1", + "connected_nodes": [], + "sys_driver_version": "2.0", + "sys_global_heaps_size": 0, + "sys_heap_type": "private", + "sys_logical_processors": 8, + "sys_otp_release": "R15B01", + "sys_process_count": 2201, + "sys_smp_support": true, + "sys_system_version": "Erlang R15B01 (erts-5.9.1) [source] [64-bit] [smp:8:8] [async-threads:64] [kernel-poll:true]", + "sys_system_architecture": "x86_64-unknown-linux-gnu", + "sys_threads_enabled": true, + "sys_thread_pool_size": 64, + "sys_wordsize": 8, + "ring_members": [ + "riak@127.0.0.1" + ], + "ring_num_partitions": 256, + "ring_ownership": "[{'riak@127.0.0.1',256}]", + "ring_creation_size": 256, + "storage_backend": "riak_kv_eleveldb_backend", + "erlydtl_version": "0.7.0", + "riak_control_version": "1.4.12-0-g964c5db", + "cluster_info_version": "1.2.4", + "riak_search_version": "1.4.12-0-g7fe0e00", + "merge_index_version": "1.3.2-0-gcb38ee7", + "riak_kv_version": "1.4.12-0-gc6bbd66", + "sidejob_version": "0.2.0", + "riak_api_version": "1.4.12-0-gd9e1cc8", + "riak_pipe_version": "1.4.12-0-g986a226", + "riak_core_version": "1.4.10", + "bitcask_version": "1.6.8-0-gea14cb0", + "basho_stats_version": "1.0.3", + "webmachine_version": "1.10.4-0-gfcff795", + "mochiweb_version": "1.5.1p6", + "inets_version": "5.9", + "erlang_js_version": "1.2.2", + "runtime_tools_version": "1.8.8", + "os_mon_version": "2.2.9", + "riak_sysmon_version": "1.1.3", + "ssl_version": "5.0.1", + "public_key_version": "0.15", + "crypto_version": "2.1", + "sasl_version": "2.2.1", + "lager_version": "2.0.1", + "goldrush_version": "0.1.5", + "compiler_version": "4.8.1", + "syntax_tools_version": "1.6.8", + "stdlib_version": "1.18.1", + "kernel_version": "2.15.1", + "memory_total": 178856408, + "memory_processes": 58454730, + "memory_processes_used": 58371238, + "memory_system": 120401678, + "memory_atom": 586345, + "memory_atom_used": 563485, + "memory_binary": 48677920, + "memory_code": 12329143, + "memory_ets": 17330176, + "riak_core_stat_ts": 1455908559, + "ignored_gossip_total": 0, + "rings_reconciled_total": 5459, + "rings_reconciled": 0, + "gossip_received": 6, + "rejected_handoffs": 94, + "handoff_timeouts": 0, + "dropped_vnode_requests_total": 0, + "converge_delay_min": 0, + "converge_delay_max": 0, + "converge_delay_mean": 0, + "converge_delay_last": 0, + "rebalance_delay_min": 0, + "rebalance_delay_max": 0, + "rebalance_delay_mean": 0, + "rebalance_delay_last": 0, + "riak_kv_vnodes_running": 16, + "riak_kv_vnodeq_min": 0, + "riak_kv_vnodeq_median": 0, + "riak_kv_vnodeq_mean": 0, + "riak_kv_vnodeq_max": 0, + "riak_kv_vnodeq_total": 0, + "riak_pipe_vnodes_running": 16, + "riak_pipe_vnodeq_min": 0, + "riak_pipe_vnodeq_median": 0, + "riak_pipe_vnodeq_mean": 0, + "riak_pipe_vnodeq_max": 0, + "riak_pipe_vnodeq_total": 0 +} +`