diff --git a/plugins/all/all.go b/plugins/all/all.go index 602d919d6..a8b345954 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -1,5 +1,6 @@ package all import ( + _ "github.com/influxdb/tivan/plugins/redis" _ "github.com/influxdb/tivan/plugins/system" ) diff --git a/plugins/redis/gatherer.go b/plugins/redis/gatherer.go new file mode 100644 index 000000000..1acb5820a --- /dev/null +++ b/plugins/redis/gatherer.go @@ -0,0 +1,139 @@ +package redis + +import ( + "bufio" + "errors" + "fmt" + "net" + "strconv" + "strings" + + "github.com/influxdb/tivan/plugins" +) + +type Gatherer struct { + Disabled bool + Address string + + c net.Conn + buf []byte +} + +var Tracking = map[string]string{ + "uptime_in_seconds": "redis_uptime", + "connected_clients": "redis_clients", + "used_memory": "redis_used_memory", + "used_memory_rss": "redis_used_memory_rss", + "used_memory_peak": "redis_used_memory_peak", + "used_memory_lua": "redis_used_memory_lua", + "rdb_changes_since_last_save": "redis_rdb_changes_since_last_save", + "total_connections_received": "redis_total_connections_received", + "total_commands_processed": "redis_total_commands_processed", + "instantaneous_ops_per_sec": "redis_instantaneous_ops_per_sec", + "sync_full": "redis_sync_full", + "sync_partial_ok": "redis_sync_partial_ok", + "sync_partial_err": "redis_sync_partial_err", + "expired_keys": "redis_expired_keys", + "evicted_keys": "redis_evicted_keys", + "keyspace_hits": "redis_keyspace_hits", + "keyspace_misses": "redis_keyspace_misses", + "pubsub_channels": "redis_pubsub_channels", + "pubsub_patterns": "redis_pubsub_patterns", + "latest_fork_usec": "redis_latest_fork_usec", + "connected_slaves": "redis_connected_slaves", + "master_repl_offset": "redis_master_repl_offset", + "repl_backlog_active": "redis_repl_backlog_active", + "repl_backlog_size": "redis_repl_backlog_size", + "repl_backlog_histlen": "redis_repl_backlog_histlen", + "mem_fragmentation_ratio": "redis_mem_fragmentation_ratio", + "used_cpu_sys": "redis_used_cpu_sys", + "used_cpu_user": "redis_used_cpu_user", + "used_cpu_sys_children": "redis_used_cpu_sys_children", + "used_cpu_user_children": "redis_used_cpu_user_children", +} + +var ErrProtocolError = errors.New("redis protocol error") + +func (g *Gatherer) Gather(acc plugins.Accumulator) error { + if g.Address == "" || g.Disabled { + return nil + } + + if g.c == nil { + c, err := net.Dial("tcp", g.Address) + if err != nil { + return err + } + + g.c = c + } + + g.c.Write([]byte("info\n")) + + r := bufio.NewReader(g.c) + + line, err := r.ReadString('\n') + if err != nil { + return err + } + + if line[0] != '$' { + return fmt.Errorf("bad line start: %s", ErrProtocolError) + } + + line = strings.TrimSpace(line) + + szStr := line[1:] + + sz, err := strconv.Atoi(szStr) + if err != nil { + return fmt.Errorf("bad size string <<%s>>: %s", szStr, ErrProtocolError) + } + + var read int + + for read < sz { + line, err := r.ReadString('\n') + if err != nil { + return err + } + + read += len(line) + + if len(line) == 1 || line[0] == '#' { + continue + } + + parts := strings.SplitN(line, ":", 2) + + name := string(parts[0]) + + metric, ok := Tracking[name] + if !ok { + continue + } + + val := strings.TrimSpace(parts[1]) + + ival, err := strconv.ParseUint(val, 10, 64) + if err == nil { + acc.Add(metric, ival, nil) + continue + } + + fval, err := strconv.ParseFloat(val, 64) + if err != nil { + return err + } + + acc.Add(metric, fval, nil) + } + + return nil +} + +func init() { + plugins.Add("redis", func() plugins.Plugin { + return &Gatherer{} + }) +} diff --git a/plugins/redis/redis_test.go b/plugins/redis/redis_test.go new file mode 100644 index 000000000..b7d3af11c --- /dev/null +++ b/plugins/redis/redis_test.go @@ -0,0 +1,189 @@ +package redis + +import ( + "bufio" + "fmt" + "net" + "testing" + + "github.com/influxdb/tivan/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRedisGeneratesMetrics(t *testing.T) { + l, err := net.Listen("tcp", ":0") + require.NoError(t, err) + + defer l.Close() + + go func() { + c, err := l.Accept() + if err != nil { + return + } + + buf := bufio.NewReader(c) + + for { + line, err := buf.ReadString('\n') + if err != nil { + return + } + + if line != "info\n" { + return + } + + fmt.Fprintf(c, "$%d\n", len(testOutput)) + c.Write([]byte(testOutput)) + } + }() + + addr := l.Addr().String() + + r := &Gatherer{ + Address: addr, + } + + var acc testutil.Accumulator + + err = r.Gather(&acc) + require.NoError(t, err) + + checkInt := []struct { + name string + value uint64 + }{ + {"redis_uptime", 238}, + {"redis_clients", 1}, + {"redis_used_memory", 1003936}, + {"redis_used_memory_rss", 811008}, + {"redis_used_memory_peak", 1003936}, + {"redis_used_memory_lua", 33792}, + {"redis_rdb_changes_since_last_save", 0}, + {"redis_total_connections_received", 2}, + {"redis_total_commands_processed", 1}, + {"redis_instantaneous_ops_per_sec", 0}, + {"redis_sync_full", 0}, + {"redis_sync_partial_ok", 0}, + {"redis_sync_partial_err", 0}, + {"redis_expired_keys", 0}, + {"redis_evicted_keys", 0}, + {"redis_keyspace_hits", 0}, + {"redis_keyspace_misses", 0}, + {"redis_pubsub_channels", 0}, + {"redis_pubsub_patterns", 0}, + {"redis_latest_fork_usec", 0}, + {"redis_connected_slaves", 0}, + {"redis_master_repl_offset", 0}, + {"redis_repl_backlog_active", 0}, + {"redis_repl_backlog_size", 1048576}, + {"redis_repl_backlog_histlen", 0}, + } + + for _, c := range checkInt { + assert.NoError(t, acc.ValidateValue(c.name, c.value)) + } + + checkFloat := []struct { + name string + value float64 + }{ + {"redis_mem_fragmentation_ratio", 0.81}, + {"redis_used_cpu_sys", 0.14}, + {"redis_used_cpu_user", 0.05}, + {"redis_used_cpu_sys_children", 0.00}, + {"redis_used_cpu_user_children", 0.00}, + } + + for _, c := range checkFloat { + assert.NoError(t, acc.ValidateValue(c.name, c.value)) + } +} + +const testOutput = `# Server +redis_version:2.8.9 +redis_git_sha1:00000000 +redis_git_dirty:0 +redis_build_id:9ccc8119ea98f6e1 +redis_mode:standalone +os:Darwin 14.1.0 x86_64 +arch_bits:64 +multiplexing_api:kqueue +gcc_version:4.2.1 +process_id:40235 +run_id:37d020620aadf0627282c0f3401405d774a82664 +tcp_port:6379 +uptime_in_seconds:238 +uptime_in_days:0 +hz:10 +lru_clock:2364819 +config_file:/usr/local/etc/redis.conf + +# Clients +connected_clients:1 +client_longest_output_list:0 +client_biggest_input_buf:0 +blocked_clients:0 + +# Memory +used_memory:1003936 +used_memory_human:980.41K +used_memory_rss:811008 +used_memory_peak:1003936 +used_memory_peak_human:980.41K +used_memory_lua:33792 +mem_fragmentation_ratio:0.81 +mem_allocator:libc + +# Persistence +loading:0 +rdb_changes_since_last_save:0 +rdb_bgsave_in_progress:0 +rdb_last_save_time:1428427941 +rdb_last_bgsave_status:ok +rdb_last_bgsave_time_sec:-1 +rdb_current_bgsave_time_sec:-1 +aof_enabled:0 +aof_rewrite_in_progress:0 +aof_rewrite_scheduled:0 +aof_last_rewrite_time_sec:-1 +aof_current_rewrite_time_sec:-1 +aof_last_bgrewrite_status:ok +aof_last_write_status:ok + +# Stats +total_connections_received:2 +total_commands_processed:1 +instantaneous_ops_per_sec:0 +rejected_connections:0 +sync_full:0 +sync_partial_ok:0 +sync_partial_err:0 +expired_keys:0 +evicted_keys:0 +keyspace_hits:0 +keyspace_misses:0 +pubsub_channels:0 +pubsub_patterns:0 +latest_fork_usec:0 + +# Replication +role:master +connected_slaves:0 +master_repl_offset:0 +repl_backlog_active:0 +repl_backlog_size:1048576 +repl_backlog_first_byte_offset:0 +repl_backlog_histlen:0 + +# CPU +used_cpu_sys:0.14 +used_cpu_user:0.05 +used_cpu_sys_children:0.00 +used_cpu_user_children:0.00 + +# Keyspace + +` diff --git a/testdata/influx.toml b/testdata/influx.toml index f91934be3..f29c48c80 100644 --- a/testdata/influx.toml +++ b/testdata/influx.toml @@ -10,3 +10,5 @@ password = "root" database = "tivan" tags = { dc = "us-phx-1" } +[redis] +address = ":6379" diff --git a/testutil/accumulator.go b/testutil/accumulator.go index ad594a7e3..98fcaca44 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -56,3 +56,7 @@ func (a *Accumulator) ValidateTaggedValue(name string, val interface{}, tags map return fmt.Errorf("unknown value %s with tags %v", name, tags) } + +func (a *Accumulator) ValidateValue(name string, val interface{}) error { + return a.ValidateTaggedValue(name, val, nil) +}