Add redis plugin

This commit is contained in:
Evan Phoenix 2015-04-07 11:54:21 -07:00
parent 81f41059f4
commit 04b2bbd30b
5 changed files with 335 additions and 0 deletions

View File

@ -1,5 +1,6 @@
package all package all
import ( import (
_ "github.com/influxdb/tivan/plugins/redis"
_ "github.com/influxdb/tivan/plugins/system" _ "github.com/influxdb/tivan/plugins/system"
) )

139
plugins/redis/gatherer.go Normal file
View File

@ -0,0 +1,139 @@
package redis
import (
"bufio"
"errors"
"fmt"
"net"
"strconv"
"strings"
"github.com/influxdb/tivan/plugins"
)
type Gatherer struct {
Disabled bool
Address string
c net.Conn
buf []byte
}
var Tracking = map[string]string{
"uptime_in_seconds": "redis_uptime",
"connected_clients": "redis_clients",
"used_memory": "redis_used_memory",
"used_memory_rss": "redis_used_memory_rss",
"used_memory_peak": "redis_used_memory_peak",
"used_memory_lua": "redis_used_memory_lua",
"rdb_changes_since_last_save": "redis_rdb_changes_since_last_save",
"total_connections_received": "redis_total_connections_received",
"total_commands_processed": "redis_total_commands_processed",
"instantaneous_ops_per_sec": "redis_instantaneous_ops_per_sec",
"sync_full": "redis_sync_full",
"sync_partial_ok": "redis_sync_partial_ok",
"sync_partial_err": "redis_sync_partial_err",
"expired_keys": "redis_expired_keys",
"evicted_keys": "redis_evicted_keys",
"keyspace_hits": "redis_keyspace_hits",
"keyspace_misses": "redis_keyspace_misses",
"pubsub_channels": "redis_pubsub_channels",
"pubsub_patterns": "redis_pubsub_patterns",
"latest_fork_usec": "redis_latest_fork_usec",
"connected_slaves": "redis_connected_slaves",
"master_repl_offset": "redis_master_repl_offset",
"repl_backlog_active": "redis_repl_backlog_active",
"repl_backlog_size": "redis_repl_backlog_size",
"repl_backlog_histlen": "redis_repl_backlog_histlen",
"mem_fragmentation_ratio": "redis_mem_fragmentation_ratio",
"used_cpu_sys": "redis_used_cpu_sys",
"used_cpu_user": "redis_used_cpu_user",
"used_cpu_sys_children": "redis_used_cpu_sys_children",
"used_cpu_user_children": "redis_used_cpu_user_children",
}
var ErrProtocolError = errors.New("redis protocol error")
func (g *Gatherer) Gather(acc plugins.Accumulator) error {
if g.Address == "" || g.Disabled {
return nil
}
if g.c == nil {
c, err := net.Dial("tcp", g.Address)
if err != nil {
return err
}
g.c = c
}
g.c.Write([]byte("info\n"))
r := bufio.NewReader(g.c)
line, err := r.ReadString('\n')
if err != nil {
return err
}
if line[0] != '$' {
return fmt.Errorf("bad line start: %s", ErrProtocolError)
}
line = strings.TrimSpace(line)
szStr := line[1:]
sz, err := strconv.Atoi(szStr)
if err != nil {
return fmt.Errorf("bad size string <<%s>>: %s", szStr, ErrProtocolError)
}
var read int
for read < sz {
line, err := r.ReadString('\n')
if err != nil {
return err
}
read += len(line)
if len(line) == 1 || line[0] == '#' {
continue
}
parts := strings.SplitN(line, ":", 2)
name := string(parts[0])
metric, ok := Tracking[name]
if !ok {
continue
}
val := strings.TrimSpace(parts[1])
ival, err := strconv.ParseUint(val, 10, 64)
if err == nil {
acc.Add(metric, ival, nil)
continue
}
fval, err := strconv.ParseFloat(val, 64)
if err != nil {
return err
}
acc.Add(metric, fval, nil)
}
return nil
}
func init() {
plugins.Add("redis", func() plugins.Plugin {
return &Gatherer{}
})
}

189
plugins/redis/redis_test.go Normal file
View File

@ -0,0 +1,189 @@
package redis
import (
"bufio"
"fmt"
"net"
"testing"
"github.com/influxdb/tivan/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestRedisGeneratesMetrics(t *testing.T) {
l, err := net.Listen("tcp", ":0")
require.NoError(t, err)
defer l.Close()
go func() {
c, err := l.Accept()
if err != nil {
return
}
buf := bufio.NewReader(c)
for {
line, err := buf.ReadString('\n')
if err != nil {
return
}
if line != "info\n" {
return
}
fmt.Fprintf(c, "$%d\n", len(testOutput))
c.Write([]byte(testOutput))
}
}()
addr := l.Addr().String()
r := &Gatherer{
Address: addr,
}
var acc testutil.Accumulator
err = r.Gather(&acc)
require.NoError(t, err)
checkInt := []struct {
name string
value uint64
}{
{"redis_uptime", 238},
{"redis_clients", 1},
{"redis_used_memory", 1003936},
{"redis_used_memory_rss", 811008},
{"redis_used_memory_peak", 1003936},
{"redis_used_memory_lua", 33792},
{"redis_rdb_changes_since_last_save", 0},
{"redis_total_connections_received", 2},
{"redis_total_commands_processed", 1},
{"redis_instantaneous_ops_per_sec", 0},
{"redis_sync_full", 0},
{"redis_sync_partial_ok", 0},
{"redis_sync_partial_err", 0},
{"redis_expired_keys", 0},
{"redis_evicted_keys", 0},
{"redis_keyspace_hits", 0},
{"redis_keyspace_misses", 0},
{"redis_pubsub_channels", 0},
{"redis_pubsub_patterns", 0},
{"redis_latest_fork_usec", 0},
{"redis_connected_slaves", 0},
{"redis_master_repl_offset", 0},
{"redis_repl_backlog_active", 0},
{"redis_repl_backlog_size", 1048576},
{"redis_repl_backlog_histlen", 0},
}
for _, c := range checkInt {
assert.NoError(t, acc.ValidateValue(c.name, c.value))
}
checkFloat := []struct {
name string
value float64
}{
{"redis_mem_fragmentation_ratio", 0.81},
{"redis_used_cpu_sys", 0.14},
{"redis_used_cpu_user", 0.05},
{"redis_used_cpu_sys_children", 0.00},
{"redis_used_cpu_user_children", 0.00},
}
for _, c := range checkFloat {
assert.NoError(t, acc.ValidateValue(c.name, c.value))
}
}
const testOutput = `# Server
redis_version:2.8.9
redis_git_sha1:00000000
redis_git_dirty:0
redis_build_id:9ccc8119ea98f6e1
redis_mode:standalone
os:Darwin 14.1.0 x86_64
arch_bits:64
multiplexing_api:kqueue
gcc_version:4.2.1
process_id:40235
run_id:37d020620aadf0627282c0f3401405d774a82664
tcp_port:6379
uptime_in_seconds:238
uptime_in_days:0
hz:10
lru_clock:2364819
config_file:/usr/local/etc/redis.conf
# Clients
connected_clients:1
client_longest_output_list:0
client_biggest_input_buf:0
blocked_clients:0
# Memory
used_memory:1003936
used_memory_human:980.41K
used_memory_rss:811008
used_memory_peak:1003936
used_memory_peak_human:980.41K
used_memory_lua:33792
mem_fragmentation_ratio:0.81
mem_allocator:libc
# Persistence
loading:0
rdb_changes_since_last_save:0
rdb_bgsave_in_progress:0
rdb_last_save_time:1428427941
rdb_last_bgsave_status:ok
rdb_last_bgsave_time_sec:-1
rdb_current_bgsave_time_sec:-1
aof_enabled:0
aof_rewrite_in_progress:0
aof_rewrite_scheduled:0
aof_last_rewrite_time_sec:-1
aof_current_rewrite_time_sec:-1
aof_last_bgrewrite_status:ok
aof_last_write_status:ok
# Stats
total_connections_received:2
total_commands_processed:1
instantaneous_ops_per_sec:0
rejected_connections:0
sync_full:0
sync_partial_ok:0
sync_partial_err:0
expired_keys:0
evicted_keys:0
keyspace_hits:0
keyspace_misses:0
pubsub_channels:0
pubsub_patterns:0
latest_fork_usec:0
# Replication
role:master
connected_slaves:0
master_repl_offset:0
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0
# CPU
used_cpu_sys:0.14
used_cpu_user:0.05
used_cpu_sys_children:0.00
used_cpu_user_children:0.00
# Keyspace
`

View File

@ -10,3 +10,5 @@ password = "root"
database = "tivan" database = "tivan"
tags = { dc = "us-phx-1" } tags = { dc = "us-phx-1" }
[redis]
address = ":6379"

View File

@ -56,3 +56,7 @@ func (a *Accumulator) ValidateTaggedValue(name string, val interface{}, tags map
return fmt.Errorf("unknown value %s with tags %v", name, tags) return fmt.Errorf("unknown value %s with tags %v", name, tags)
} }
func (a *Accumulator) ValidateValue(name string, val interface{}) error {
return a.ValidateTaggedValue(name, val, nil)
}