Add riak plugin

This commit is contained in:
Jason Coene 2016-02-19 14:30:40 -06:00
parent f9b5767dae
commit fa4cecf45d
5 changed files with 550 additions and 0 deletions

View File

@ -187,6 +187,7 @@ Currently implemented sources:
* raindrops * raindrops
* redis * redis
* rethinkdb * rethinkdb
* riak
* sql server (microsoft) * sql server (microsoft)
* twemproxy * twemproxy
* zfs * zfs

View File

@ -40,6 +40,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/raindrops" _ "github.com/influxdata/telegraf/plugins/inputs/raindrops"
_ "github.com/influxdata/telegraf/plugins/inputs/redis" _ "github.com/influxdata/telegraf/plugins/inputs/redis"
_ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb" _ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb"
_ "github.com/influxdata/telegraf/plugins/inputs/riak"
_ "github.com/influxdata/telegraf/plugins/inputs/sensors" _ "github.com/influxdata/telegraf/plugins/inputs/sensors"
_ "github.com/influxdata/telegraf/plugins/inputs/snmp" _ "github.com/influxdata/telegraf/plugins/inputs/snmp"
_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver" _ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"

View File

@ -0,0 +1,76 @@
# Riak Plugin
The Riak plugin gathers metrics from one or more riak instances.
### Configuration:
```toml
# Description
[[inputs.riak]]
# Specify a list of one or more riak http servers
servers = ["http://localhost:8098"]
```
### Measurements & Fields:
Riak provides one measurement named "riak", with the following fields:
- cpu_avg1
- cpu_avg15
- cpu_avg5
- memory_code
- memory_ets
- memory_processes
- memory_system
- memory_total
- node_get_fsm_objsize_100
- node_get_fsm_objsize_95
- node_get_fsm_objsize_99
- node_get_fsm_objsize_mean
- node_get_fsm_objsize_median
- node_get_fsm_siblings_100
- node_get_fsm_siblings_95
- node_get_fsm_siblings_99
- node_get_fsm_siblings_mean
- node_get_fsm_siblings_median
- node_get_fsm_time_100
- node_get_fsm_time_95
- node_get_fsm_time_99
- node_get_fsm_time_mean
- node_get_fsm_time_median
- node_gets
- node_gets_total
- node_put_fsm_time_100
- node_put_fsm_time_95
- node_put_fsm_time_99
- node_put_fsm_time_mean
- node_put_fsm_time_median
- node_puts
- node_puts_total
- pbc_active
- pbc_connects
- pbc_connects_total
- vnode_gets
- vnode_gets_total
- vnode_index_reads
- vnode_index_reads_total
- vnode_index_writes
- vnode_index_writes_total
- vnode_puts
- vnode_puts_total
Measurements of time (such as node_get_fsm_time_mean) are measured in nanoseconds.
### Tags:
All measurements have the following tags:
- server (the host:port of the given server address, ex. `127.0.0.1:8087`)
- nodename (the internal node name received, ex. `riak@127.0.0.1` )
### Example Output:
```
$ ./telegraf -config telegraf.conf -input-filter riak -test
> riak,nodename=riak@127.0.0.1,server=localhost:8098 cpu_avg1=31i,cpu_avg15=69i,cpu_avg5=51i,memory_code=11563738i,memory_ets=5925872i,memory_processes=30236069i,memory_system=93074971i,memory_total=123311040i,node_get_fsm_objsize_100=0i,node_get_fsm_objsize_95=0i,node_get_fsm_objsize_99=0i,node_get_fsm_objsize_mean=0i,node_get_fsm_objsize_median=0i,node_get_fsm_siblings_100=0i,node_get_fsm_siblings_95=0i,node_get_fsm_siblings_99=0i,node_get_fsm_siblings_mean=0i,node_get_fsm_siblings_median=0i,node_get_fsm_time_100=0i,node_get_fsm_time_95=0i,node_get_fsm_time_99=0i,node_get_fsm_time_mean=0i,node_get_fsm_time_median=0i,node_gets=0i,node_gets_total=19i,node_put_fsm_time_100=0i,node_put_fsm_time_95=0i,node_put_fsm_time_99=0i,node_put_fsm_time_mean=0i,node_put_fsm_time_median=0i,node_puts=0i,node_puts_total=0i,pbc_active=0i,pbc_connects=0i,pbc_connects_total=20i,vnode_gets=0i,vnode_gets_total=57i,vnode_index_reads=0i,vnode_index_reads_total=0i,vnode_index_writes=0i,vnode_index_writes_total=0i,vnode_puts=0i,vnode_puts_total=0i 1455913392622482332
```gt

196
plugins/inputs/riak/riak.go Normal file
View File

@ -0,0 +1,196 @@
package riak
import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
// Type Riak gathers statistics from one or more Riak instances
type Riak struct {
// Servers is a slice of servers as http addresses (ex. http://127.0.0.1:8098)
Servers []string
client *http.Client
}
// NewRiak return a new instance of Riak with a default http client
func NewRiak() *Riak {
return &Riak{client: http.DefaultClient}
}
// Type riakStats represents the data that is received from Riak
type riakStats struct {
CpuAvg1 int64 `json:"cpu_avg1"`
CpuAvg15 int64 `json:"cpu_avg15"`
CpuAvg5 int64 `json:"cpu_avg5"`
MemoryCode int64 `json:"memory_code"`
MemoryEts int64 `json:"memory_ets"`
MemoryProcesses int64 `json:"memory_processes"`
MemorySystem int64 `json:"memory_system"`
MemoryTotal int64 `json:"memory_total"`
NodeGetFsmObjsize100 int64 `json:"node_get_fsm_objsize_100"`
NodeGetFsmObjsize95 int64 `json:"node_get_fsm_objsize_95"`
NodeGetFsmObjsize99 int64 `json:"node_get_fsm_objsize_99"`
NodeGetFsmObjsizeMean int64 `json:"node_get_fsm_objsize_mean"`
NodeGetFsmObjsizeMedian int64 `json:"node_get_fsm_objsize_median"`
NodeGetFsmSiblings100 int64 `json:"node_get_fsm_siblings_100"`
NodeGetFsmSiblings95 int64 `json:"node_get_fsm_siblings_95"`
NodeGetFsmSiblings99 int64 `json:"node_get_fsm_siblings_99"`
NodeGetFsmSiblingsMean int64 `json:"node_get_fsm_siblings_mean"`
NodeGetFsmSiblingsMedian int64 `json:"node_get_fsm_siblings_median"`
NodeGetFsmTime100 int64 `json:"node_get_fsm_time_100"`
NodeGetFsmTime95 int64 `json:"node_get_fsm_time_95"`
NodeGetFsmTime99 int64 `json:"node_get_fsm_time_99"`
NodeGetFsmTimeMean int64 `json:"node_get_fsm_time_mean"`
NodeGetFsmTimeMedian int64 `json:"node_get_fsm_time_median"`
NodeGets int64 `json:"node_gets"`
NodeGetsTotal int64 `json:"node_gets_total"`
Nodename string `json:"nodename"`
NodePutFsmTime100 int64 `json:"node_put_fsm_time_100"`
NodePutFsmTime95 int64 `json:"node_put_fsm_time_95"`
NodePutFsmTime99 int64 `json:"node_put_fsm_time_99"`
NodePutFsmTimeMean int64 `json:"node_put_fsm_time_mean"`
NodePutFsmTimeMedian int64 `json:"node_put_fsm_time_median"`
NodePuts int64 `json:"node_puts"`
NodePutsTotal int64 `json:"node_puts_total"`
PbcActive int64 `json:"pbc_active"`
PbcConnects int64 `json:"pbc_connects"`
PbcConnectsTotal int64 `json:"pbc_connects_total"`
VnodeGets int64 `json:"vnode_gets"`
VnodeGetsTotal int64 `json:"vnode_gets_total"`
VnodeIndexReads int64 `json:"vnode_index_reads"`
VnodeIndexReadsTotal int64 `json:"vnode_index_reads_total"`
VnodeIndexWrites int64 `json:"vnode_index_writes"`
VnodeIndexWritesTotal int64 `json:"vnode_index_writes_total"`
VnodePuts int64 `json:"vnode_puts"`
VnodePutsTotal int64 `json:"vnode_puts_total"`
}
// A sample configuration to only gather stats from localhost, default port.
const sampleConfig = `
# Specify a list of one or more riak http servers
servers = ["http://localhost:8098"]
`
// Returns a sample configuration for the plugin
func (r *Riak) SampleConfig() string {
return sampleConfig
}
// Returns a description of the plugin
func (r *Riak) Description() string {
return "Read metrics one or many Riak servers"
}
// Reads stats from all configured servers.
func (r *Riak) Gather(acc telegraf.Accumulator) error {
// Default to a single server at localhost (default port) if none specified
if len(r.Servers) == 0 {
r.Servers = []string{"http://127.0.0.1:8098"}
}
// Range over all servers, gathering stats. Returns early in case of any error.
for _, s := range r.Servers {
if err := r.gatherServer(s, acc); err != nil {
return err
}
}
return nil
}
// Gathers stats from a single server, adding them to the accumulator
func (r *Riak) gatherServer(s string, acc telegraf.Accumulator) error {
// Parse the given URL to extract the server tag
u, err := url.Parse(s)
if err != nil {
return fmt.Errorf("riak unable to parse given server url %s: %s", s, err)
}
// Perform the GET request to the riak /stats endpoint
resp, err := r.client.Get(s + "/stats")
if err != nil {
return err
}
defer resp.Body.Close()
// Successful responses will always return status code 200
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("riak responded with unexepcted status code %d", resp.StatusCode)
}
// Decode the response JSON into a new stats struct
stats := &riakStats{}
if err := json.NewDecoder(resp.Body).Decode(stats); err != nil {
return fmt.Errorf("unable to decode riak response: %s", err)
}
// Build a map of tags
tags := map[string]string{
"nodename": stats.Nodename,
"server": u.Host,
}
// Build a map of field values
fields := map[string]interface{}{
"cpu_avg1": stats.CpuAvg1,
"cpu_avg15": stats.CpuAvg15,
"cpu_avg5": stats.CpuAvg5,
"memory_code": stats.MemoryCode,
"memory_ets": stats.MemoryEts,
"memory_processes": stats.MemoryProcesses,
"memory_system": stats.MemorySystem,
"memory_total": stats.MemoryTotal,
"node_get_fsm_objsize_100": stats.NodeGetFsmObjsize100,
"node_get_fsm_objsize_95": stats.NodeGetFsmObjsize95,
"node_get_fsm_objsize_99": stats.NodeGetFsmObjsize99,
"node_get_fsm_objsize_mean": stats.NodeGetFsmObjsizeMean,
"node_get_fsm_objsize_median": stats.NodeGetFsmObjsizeMedian,
"node_get_fsm_siblings_100": stats.NodeGetFsmSiblings100,
"node_get_fsm_siblings_95": stats.NodeGetFsmSiblings95,
"node_get_fsm_siblings_99": stats.NodeGetFsmSiblings99,
"node_get_fsm_siblings_mean": stats.NodeGetFsmSiblingsMean,
"node_get_fsm_siblings_median": stats.NodeGetFsmSiblingsMedian,
"node_get_fsm_time_100": stats.NodeGetFsmTime100,
"node_get_fsm_time_95": stats.NodeGetFsmTime95,
"node_get_fsm_time_99": stats.NodeGetFsmTime99,
"node_get_fsm_time_mean": stats.NodeGetFsmTimeMean,
"node_get_fsm_time_median": stats.NodeGetFsmTimeMedian,
"node_gets": stats.NodeGets,
"node_gets_total": stats.NodeGetsTotal,
"node_put_fsm_time_100": stats.NodePutFsmTime100,
"node_put_fsm_time_95": stats.NodePutFsmTime95,
"node_put_fsm_time_99": stats.NodePutFsmTime99,
"node_put_fsm_time_mean": stats.NodePutFsmTimeMean,
"node_put_fsm_time_median": stats.NodePutFsmTimeMedian,
"node_puts": stats.NodePuts,
"node_puts_total": stats.NodePutsTotal,
"pbc_active": stats.PbcActive,
"pbc_connects": stats.PbcConnects,
"pbc_connects_total": stats.PbcConnectsTotal,
"vnode_gets": stats.VnodeGets,
"vnode_gets_total": stats.VnodeGetsTotal,
"vnode_index_reads": stats.VnodeIndexReads,
"vnode_index_reads_total": stats.VnodeIndexReadsTotal,
"vnode_index_writes": stats.VnodeIndexWrites,
"vnode_index_writes_total": stats.VnodeIndexWritesTotal,
"vnode_puts": stats.VnodePuts,
"vnode_puts_total": stats.VnodePutsTotal,
}
// Accumulate the tags and values
acc.AddFields("riak", fields, tags)
return nil
}
func init() {
inputs.Add("riak", func() telegraf.Input {
return NewRiak()
})
}

View File

@ -0,0 +1,276 @@
package riak
import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestRiak(t *testing.T) {
// Create a test server with the const response JSON
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, response)
}))
defer ts.Close()
// Parse the URL of the test server, used to verify the expected host
u, err := url.Parse(ts.URL)
require.NoError(t, err)
// Create a new Riak instance with our given test server
riak := NewRiak()
riak.Servers = []string{ts.URL}
// Create a test accumulator
acc := &testutil.Accumulator{}
// Gather data from the test server
err = riak.Gather(acc)
require.NoError(t, err)
// Expect the correct values for all known keys
expectFields := map[string]interface{}{
"cpu_avg1": int64(504),
"cpu_avg15": int64(294),
"cpu_avg5": int64(325),
"memory_code": int64(12329143),
"memory_ets": int64(17330176),
"memory_processes": int64(58454730),
"memory_system": int64(120401678),
"memory_total": int64(178856408),
"node_get_fsm_objsize_100": int64(73596),
"node_get_fsm_objsize_95": int64(36663),
"node_get_fsm_objsize_99": int64(51552),
"node_get_fsm_objsize_mean": int64(13241),
"node_get_fsm_objsize_median": int64(10365),
"node_get_fsm_siblings_100": int64(1),
"node_get_fsm_siblings_95": int64(1),
"node_get_fsm_siblings_99": int64(1),
"node_get_fsm_siblings_mean": int64(1),
"node_get_fsm_siblings_median": int64(1),
"node_get_fsm_time_100": int64(230445),
"node_get_fsm_time_95": int64(24259),
"node_get_fsm_time_99": int64(96653),
"node_get_fsm_time_mean": int64(6851),
"node_get_fsm_time_median": int64(2368),
"node_gets": int64(1116),
"node_gets_total": int64(1026058217),
"node_put_fsm_time_100": int64(267390),
"node_put_fsm_time_95": int64(38286),
"node_put_fsm_time_99": int64(84422),
"node_put_fsm_time_mean": int64(10832),
"node_put_fsm_time_median": int64(4085),
"node_puts": int64(1155),
"node_puts_total": int64(444895769),
"pbc_active": int64(360),
"pbc_connects": int64(120),
"pbc_connects_total": int64(66793268),
"vnode_gets": int64(14629),
"vnode_gets_total": int64(3748432761),
"vnode_index_reads": int64(20),
"vnode_index_reads_total": int64(3438296),
"vnode_index_writes": int64(4293),
"vnode_index_writes_total": int64(1515986619),
"vnode_puts": int64(4308),
"vnode_puts_total": int64(1519062272),
}
// Expect the correct values for all tags
expectTags := map[string]string{
"nodename": "riak@127.0.0.1",
"server": u.Host,
}
acc.AssertContainsTaggedFields(t, "riak", expectFields, expectTags)
}
var response = `
{
"riak_kv_stat_ts": 1455908558,
"vnode_gets": 14629,
"vnode_gets_total": 3748432761,
"vnode_puts": 4308,
"vnode_puts_total": 1519062272,
"vnode_index_refreshes": 0,
"vnode_index_refreshes_total": 0,
"vnode_index_reads": 20,
"vnode_index_reads_total": 3438296,
"vnode_index_writes": 4293,
"vnode_index_writes_total": 1515986619,
"vnode_index_writes_postings": 1,
"vnode_index_writes_postings_total": 265613,
"vnode_index_deletes": 0,
"vnode_index_deletes_total": 0,
"vnode_index_deletes_postings": 0,
"vnode_index_deletes_postings_total": 1,
"node_gets": 1116,
"node_gets_total": 1026058217,
"node_get_fsm_siblings_mean": 1,
"node_get_fsm_siblings_median": 1,
"node_get_fsm_siblings_95": 1,
"node_get_fsm_siblings_99": 1,
"node_get_fsm_siblings_100": 1,
"node_get_fsm_objsize_mean": 13241,
"node_get_fsm_objsize_median": 10365,
"node_get_fsm_objsize_95": 36663,
"node_get_fsm_objsize_99": 51552,
"node_get_fsm_objsize_100": 73596,
"node_get_fsm_time_mean": 6851,
"node_get_fsm_time_median": 2368,
"node_get_fsm_time_95": 24259,
"node_get_fsm_time_99": 96653,
"node_get_fsm_time_100": 230445,
"node_puts": 1155,
"node_puts_total": 444895769,
"node_put_fsm_time_mean": 10832,
"node_put_fsm_time_median": 4085,
"node_put_fsm_time_95": 38286,
"node_put_fsm_time_99": 84422,
"node_put_fsm_time_100": 267390,
"read_repairs": 2,
"read_repairs_total": 7918375,
"coord_redirs_total": 118238575,
"executing_mappers": 0,
"precommit_fail": 0,
"postcommit_fail": 0,
"index_fsm_create": 0,
"index_fsm_create_error": 0,
"index_fsm_active": 0,
"list_fsm_create": 0,
"list_fsm_create_error": 0,
"list_fsm_active": 0,
"pbc_active": 360,
"pbc_connects": 120,
"pbc_connects_total": 66793268,
"late_put_fsm_coordinator_ack": 152,
"node_get_fsm_active": 1,
"node_get_fsm_active_60s": 1029,
"node_get_fsm_in_rate": 21,
"node_get_fsm_out_rate": 21,
"node_get_fsm_rejected": 0,
"node_get_fsm_rejected_60s": 0,
"node_get_fsm_rejected_total": 0,
"node_put_fsm_active": 69,
"node_put_fsm_active_60s": 1053,
"node_put_fsm_in_rate": 30,
"node_put_fsm_out_rate": 31,
"node_put_fsm_rejected": 0,
"node_put_fsm_rejected_60s": 0,
"node_put_fsm_rejected_total": 0,
"read_repairs_primary_outofdate_one": 4,
"read_repairs_primary_outofdate_count": 14761552,
"read_repairs_primary_notfound_one": 0,
"read_repairs_primary_notfound_count": 65879,
"read_repairs_fallback_outofdate_one": 0,
"read_repairs_fallback_outofdate_count": 23761,
"read_repairs_fallback_notfound_one": 0,
"read_repairs_fallback_notfound_count": 455697,
"leveldb_read_block_error": 0,
"riak_pipe_stat_ts": 1455908558,
"pipeline_active": 0,
"pipeline_create_count": 0,
"pipeline_create_one": 0,
"pipeline_create_error_count": 0,
"pipeline_create_error_one": 0,
"cpu_nprocs": 362,
"cpu_avg1": 504,
"cpu_avg5": 325,
"cpu_avg15": 294,
"mem_total": 33695432704,
"mem_allocated": 33454874624,
"nodename": "riak@127.0.0.1",
"connected_nodes": [],
"sys_driver_version": "2.0",
"sys_global_heaps_size": 0,
"sys_heap_type": "private",
"sys_logical_processors": 8,
"sys_otp_release": "R15B01",
"sys_process_count": 2201,
"sys_smp_support": true,
"sys_system_version": "Erlang R15B01 (erts-5.9.1) [source] [64-bit] [smp:8:8] [async-threads:64] [kernel-poll:true]",
"sys_system_architecture": "x86_64-unknown-linux-gnu",
"sys_threads_enabled": true,
"sys_thread_pool_size": 64,
"sys_wordsize": 8,
"ring_members": [
"riak@127.0.0.1"
],
"ring_num_partitions": 256,
"ring_ownership": "[{'riak@127.0.0.1',256}]",
"ring_creation_size": 256,
"storage_backend": "riak_kv_eleveldb_backend",
"erlydtl_version": "0.7.0",
"riak_control_version": "1.4.12-0-g964c5db",
"cluster_info_version": "1.2.4",
"riak_search_version": "1.4.12-0-g7fe0e00",
"merge_index_version": "1.3.2-0-gcb38ee7",
"riak_kv_version": "1.4.12-0-gc6bbd66",
"sidejob_version": "0.2.0",
"riak_api_version": "1.4.12-0-gd9e1cc8",
"riak_pipe_version": "1.4.12-0-g986a226",
"riak_core_version": "1.4.10",
"bitcask_version": "1.6.8-0-gea14cb0",
"basho_stats_version": "1.0.3",
"webmachine_version": "1.10.4-0-gfcff795",
"mochiweb_version": "1.5.1p6",
"inets_version": "5.9",
"erlang_js_version": "1.2.2",
"runtime_tools_version": "1.8.8",
"os_mon_version": "2.2.9",
"riak_sysmon_version": "1.1.3",
"ssl_version": "5.0.1",
"public_key_version": "0.15",
"crypto_version": "2.1",
"sasl_version": "2.2.1",
"lager_version": "2.0.1",
"goldrush_version": "0.1.5",
"compiler_version": "4.8.1",
"syntax_tools_version": "1.6.8",
"stdlib_version": "1.18.1",
"kernel_version": "2.15.1",
"memory_total": 178856408,
"memory_processes": 58454730,
"memory_processes_used": 58371238,
"memory_system": 120401678,
"memory_atom": 586345,
"memory_atom_used": 563485,
"memory_binary": 48677920,
"memory_code": 12329143,
"memory_ets": 17330176,
"riak_core_stat_ts": 1455908559,
"ignored_gossip_total": 0,
"rings_reconciled_total": 5459,
"rings_reconciled": 0,
"gossip_received": 6,
"rejected_handoffs": 94,
"handoff_timeouts": 0,
"dropped_vnode_requests_total": 0,
"converge_delay_min": 0,
"converge_delay_max": 0,
"converge_delay_mean": 0,
"converge_delay_last": 0,
"rebalance_delay_min": 0,
"rebalance_delay_max": 0,
"rebalance_delay_mean": 0,
"rebalance_delay_last": 0,
"riak_kv_vnodes_running": 16,
"riak_kv_vnodeq_min": 0,
"riak_kv_vnodeq_median": 0,
"riak_kv_vnodeq_mean": 0,
"riak_kv_vnodeq_max": 0,
"riak_kv_vnodeq_total": 0,
"riak_pipe_vnodes_running": 16,
"riak_pipe_vnodeq_min": 0,
"riak_pipe_vnodeq_median": 0,
"riak_pipe_vnodeq_mean": 0,
"riak_pipe_vnodeq_max": 0,
"riak_pipe_vnodeq_total": 0
}
`