Add ability to query many redis servers

This commit is contained in:
Evan Phoenix 2015-05-18 09:26:10 -07:00
parent fc30ae6cc4
commit 5b9f7e7bf3
2 changed files with 127 additions and 6 deletions

View File

@ -7,13 +7,15 @@ import (
"net"
"strconv"
"strings"
"sync"
"github.com/influxdb/tivan/plugins"
)
type Gatherer struct {
type Redis struct {
Disabled bool
Address string
Servers []string
c net.Conn
buf []byte
@ -54,13 +56,41 @@ var Tracking = map[string]string{
var ErrProtocolError = errors.New("redis protocol error")
func (g *Gatherer) Gather(acc plugins.Accumulator) error {
if g.Address == "" || g.Disabled {
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (g *Redis) Gather(acc plugins.Accumulator) error {
if g.Disabled {
return nil
}
var wg sync.WaitGroup
var outerr error
var servers []string
if g.Address != "" {
servers = append(servers, g.Address)
}
servers = append(servers, g.Servers...)
for _, serv := range servers {
wg.Add(1)
go func(serv string) {
defer wg.Done()
outerr = g.gatherServer(serv, acc)
}(serv)
}
wg.Wait()
return outerr
}
func (g *Redis) gatherServer(addr string, acc plugins.Accumulator) error {
if g.c == nil {
c, err := net.Dial("tcp", g.Address)
c, err := net.Dial("tcp", addr)
if err != nil {
return err
}
@ -134,6 +164,6 @@ func (g *Gatherer) Gather(acc plugins.Accumulator) error {
func init() {
plugins.Add("redis", func() plugins.Plugin {
return &Gatherer{}
return &Redis{}
})
}

View File

@ -42,7 +42,7 @@ func TestRedisGeneratesMetrics(t *testing.T) {
addr := l.Addr().String()
r := &Gatherer{
r := &Redis{
Address: addr,
}
@ -102,6 +102,97 @@ func TestRedisGeneratesMetrics(t *testing.T) {
}
}
func TestRedisCanPullStatsFromMultipleServers(t *testing.T) {
l, err := net.Listen("tcp", ":0")
require.NoError(t, err)
defer l.Close()
go func() {
c, err := l.Accept()
if err != nil {
return
}
buf := bufio.NewReader(c)
for {
line, err := buf.ReadString('\n')
if err != nil {
return
}
if line != "info\n" {
return
}
fmt.Fprintf(c, "$%d\n", len(testOutput))
c.Write([]byte(testOutput))
}
}()
addr := l.Addr().String()
r := &Redis{
Servers: []string{addr},
}
var acc testutil.Accumulator
err = r.Gather(&acc)
require.NoError(t, err)
checkInt := []struct {
name string
value uint64
}{
{"redis_uptime", 238},
{"redis_clients", 1},
{"redis_used_memory", 1003936},
{"redis_used_memory_rss", 811008},
{"redis_used_memory_peak", 1003936},
{"redis_used_memory_lua", 33792},
{"redis_rdb_changes_since_last_save", 0},
{"redis_total_connections_received", 2},
{"redis_total_commands_processed", 1},
{"redis_instantaneous_ops_per_sec", 0},
{"redis_sync_full", 0},
{"redis_sync_partial_ok", 0},
{"redis_sync_partial_err", 0},
{"redis_expired_keys", 0},
{"redis_evicted_keys", 0},
{"redis_keyspace_hits", 0},
{"redis_keyspace_misses", 0},
{"redis_pubsub_channels", 0},
{"redis_pubsub_patterns", 0},
{"redis_latest_fork_usec", 0},
{"redis_connected_slaves", 0},
{"redis_master_repl_offset", 0},
{"redis_repl_backlog_active", 0},
{"redis_repl_backlog_size", 1048576},
{"redis_repl_backlog_histlen", 0},
}
for _, c := range checkInt {
assert.NoError(t, acc.ValidateValue(c.name, c.value))
}
checkFloat := []struct {
name string
value float64
}{
{"redis_mem_fragmentation_ratio", 0.81},
{"redis_used_cpu_sys", 0.14},
{"redis_used_cpu_user", 0.05},
{"redis_used_cpu_sys_children", 0.00},
{"redis_used_cpu_user_children", 0.00},
}
for _, c := range checkFloat {
assert.NoError(t, acc.ValidateValue(c.name, c.value))
}
}
const testOutput = `# Server
redis_version:2.8.9
redis_git_sha1:00000000