Redis: include per-db keyspace info

Closes #205
This commit is contained in:
Cameron Sparr 2015-09-22 18:13:35 -07:00
parent b92a0d5126
commit f8d64a7378
7 changed files with 111 additions and 203 deletions

View File

@ -1,3 +1,12 @@
## v0.1.10 [unreleased]
### Release Notes
### Features
- [#205](https://github.com/influxdb/telegraf/issues/205): Include per-db redis keyspace info
### Bugfixes
## v0.1.9 [2015-09-22]
### Release Notes
@ -14,7 +23,9 @@ file with only the cpu plugin defined, and the influxdb output defined.
- **Breaking Change**: The CPU collection plugin has been refactored to fix some
bugs and outdated dependency issues. At the same time, I also decided to fix
a naming consistency issue, so cpu_percentageIdle will become cpu_usage_idle.
Also, all CPU time measurements now have it indicated in their name, so cpu_idle will become cpu_time_idle. Additionally, cpu_time measurements are going to be dropped in the default config.
Also, all CPU time measurements now have it indicated in their name, so cpu_idle
will become cpu_time_idle. Additionally, cpu_time measurements are going to be
dropped in the default config.
- **Breaking Change**: The memory plugin has been refactored and some measurements
have been renamed for consistency. Some measurements have also been removed from being outputted. They are still being collected by gopsutil, and could easily be
re-added in a "verbose" mode if there is demand for it.

View File

@ -33,7 +33,7 @@ ifeq ($(UNAME), Linux)
endif
test: prepare docker-compose
$(GOBIN)/godep go test -v ./...
$(GOBIN)/godep go test ./...
test-short: prepare
$(GOBIN)/godep go test -short ./...

View File

@ -6,7 +6,7 @@ import (
"fmt"
"net"
"net/url"
// "strconv"
"strconv"
"strings"
"sync"
@ -137,88 +137,90 @@ func (r *Redis) gatherServer(addr *url.URL, acc plugins.Accumulator) error {
}
}
c.Write([]byte("info\r\n"))
c.Write([]byte("INFO\r\n"))
c.Write([]byte("EOF\r\n"))
rdr := bufio.NewReader(c)
// Setup tags for all redis metrics
_, rPort, err := net.SplitHostPort(addr.Host)
if err != nil {
rPort = defaultPort
}
tags := map[string]string{"host": addr.String(), "port": rPort}
return gatherInfoOutput(rdr, acc, tags)
}
// gatherInfoOutput gathers
func gatherInfoOutput(
rdr *bufio.Reader,
acc plugins.Accumulator,
tags map[string]string,
) error {
scanner := bufio.NewScanner(rdr)
for scanner.Scan() {
fmt.Println(scanner.Text())
}
if err := scanner.Err(); err != nil {
fmt.Println("reading standard input:", err)
line := scanner.Text()
if strings.Contains(line, "ERR") {
break
}
// line, err := rdr.ReadString('\n')
// if err != nil {
// return err
// }
if len(line) == 0 || line[0] == '#' {
continue
}
// if line[0] != '$' {
// return fmt.Errorf("bad line start: %s", ErrProtocolError)
// }
parts := strings.SplitN(line, ":", 2)
if len(parts) < 2 {
continue
}
// line = strings.TrimSpace(line)
name := string(parts[0])
metric, ok := Tracking[name]
if !ok {
kline := strings.TrimSpace(string(parts[1]))
gatherKeyspaceLine(name, kline, acc, tags)
continue
}
// szStr := line[0:]
val := strings.TrimSpace(parts[1])
ival, err := strconv.ParseUint(val, 10, 64)
if err == nil {
acc.Add(metric, ival, tags)
continue
}
// sz, err := strconv.Atoi(szStr)
// if err != nil {
// return fmt.Errorf("bad size string <<%s>>: %s", szStr, ErrProtocolError)
// }
// var read int
// for read < sz {
// line, err := rdr.ReadString('\n')
// fmt.Printf(line)
// if err != nil {
// return err
// }
// read += len(line)
// if len(line) == 1 || line[0] == '#' {
// continue
// }
// _, rPort, err := net.SplitHostPort(addr.Host)
// if err != nil {
// rPort = defaultPort
// }
// tags := map[string]string{"host": addr.String(), "port": rPort}
// parts := strings.SplitN(line, ":", 2)
// if len(parts) < 2 {
// continue
// }
// name := string(parts[0])
// metric, ok := Tracking[name]
// if !ok {
// // See if this is the keyspace line
// if strings.Contains(string(parts[1]), "keys=") {
// tags["database"] = name
// acc.Add("foo", 999, tags)
// }
// continue
// }
// val := strings.TrimSpace(parts[1])
// ival, err := strconv.ParseUint(val, 10, 64)
// if err == nil {
// acc.Add(metric, ival, tags)
// continue
// }
// fval, err := strconv.ParseFloat(val, 64)
// if err != nil {
// return err
// }
// acc.Add(metric, fval, tags)
// }
fval, err := strconv.ParseFloat(val, 64)
if err != nil {
return err
}
acc.Add(metric, fval, tags)
}
return nil
}
// Parse the special Keyspace line at end of redis stats
// This is a special line that looks something like:
// db0:keys=2,expires=0,avg_ttl=0
// And there is one for each db on the redis instance
func gatherKeyspaceLine(
name string,
line string,
acc plugins.Accumulator,
tags map[string]string,
) {
if strings.Contains(line, "keys=") {
tags["database"] = name
dbparts := strings.Split(line, ",")
for _, dbp := range dbparts {
kv := strings.Split(dbp, "=")
ival, err := strconv.ParseUint(kv[1], 10, 64)
if err == nil {
acc.Add(kv[0], ival, tags)
}
}
}
}
func init() {
plugins.Add("redis", func() plugins.Plugin {
return &Redis{}

View File

@ -3,7 +3,7 @@ package redis
import (
"bufio"
"fmt"
"net"
"strings"
"testing"
"github.com/influxdb/telegraf/testutil"
@ -11,40 +11,12 @@ import (
"github.com/stretchr/testify/require"
)
func TestRedisGeneratesMetrics(t *testing.T) {
func TestRedisConnect(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
l, err := net.Listen("tcp", ":0")
require.NoError(t, err)
defer l.Close()
go func() {
c, err := l.Accept()
if err != nil {
return
}
buf := bufio.NewReader(c)
for {
line, err := buf.ReadString('\n')
if err != nil {
return
}
if line != "info\r\n" {
return
}
fmt.Fprintf(c, "$%d\n", len(testOutput))
c.Write([]byte(testOutput))
}
}()
addr := fmt.Sprintf("redis://%s", l.Addr().String())
addr := fmt.Sprintf(testutil.GetLocalHost() + ":6379")
r := &Redis{
Servers: []string{addr},
@ -52,102 +24,16 @@ func TestRedisGeneratesMetrics(t *testing.T) {
var acc testutil.Accumulator
err = r.Gather(&acc)
require.NoError(t, err)
checkInt := []struct {
name string
value uint64
}{
{"uptime", 238},
{"clients", 1},
{"used_memory", 1003936},
{"used_memory_rss", 811008},
{"used_memory_peak", 1003936},
{"used_memory_lua", 33792},
{"rdb_changes_since_last_save", 0},
{"total_connections_received", 2},
{"total_commands_processed", 1},
{"instantaneous_ops_per_sec", 0},
{"sync_full", 0},
{"sync_partial_ok", 0},
{"sync_partial_err", 0},
{"expired_keys", 0},
{"evicted_keys", 0},
{"keyspace_hits", 0},
{"keyspace_misses", 0},
{"pubsub_channels", 0},
{"pubsub_patterns", 0},
{"latest_fork_usec", 0},
{"connected_slaves", 0},
{"master_repl_offset", 0},
{"repl_backlog_active", 0},
{"repl_backlog_size", 1048576},
{"repl_backlog_histlen", 0},
}
for _, c := range checkInt {
assert.True(t, acc.CheckValue(c.name, c.value))
}
checkFloat := []struct {
name string
value float64
}{
{"mem_fragmentation_ratio", 0.81},
{"used_cpu_sys", 0.14},
{"used_cpu_user", 0.05},
{"used_cpu_sys_children", 0.00},
{"used_cpu_user_children", 0.00},
}
for _, c := range checkFloat {
assert.True(t, acc.CheckValue(c.name, c.value))
}
}
func TestRedisCanPullStatsFromMultipleServers(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
l, err := net.Listen("tcp", ":0")
require.NoError(t, err)
defer l.Close()
go func() {
c, err := l.Accept()
if err != nil {
return
}
buf := bufio.NewReader(c)
for {
line, err := buf.ReadString('\n')
if err != nil {
return
}
if line != "info\r\n" {
return
}
fmt.Fprintf(c, "$%d\n", len(testOutput))
c.Write([]byte(testOutput))
}
}()
addr := fmt.Sprintf("redis://%s", l.Addr().String())
r := &Redis{
Servers: []string{addr},
}
var acc testutil.Accumulator
err = r.Gather(&acc)
err := r.Gather(&acc)
require.NoError(t, err)
}
func TestRedis_ParseMetrics(t *testing.T) {
var acc testutil.Accumulator
tags := map[string]string{"host": "redis.net"}
rdr := bufio.NewReader(strings.NewReader(testOutput))
err := gatherInfoOutput(rdr, &acc, tags)
require.NoError(t, err)
checkInt := []struct {
@ -179,6 +65,9 @@ func TestRedisCanPullStatsFromMultipleServers(t *testing.T) {
{"repl_backlog_active", 0},
{"repl_backlog_size", 1048576},
{"repl_backlog_histlen", 0},
{"keys", 2},
{"expires", 0},
{"avg_ttl", 0},
}
for _, c := range checkInt {
@ -284,5 +173,7 @@ used_cpu_sys_children:0.00
used_cpu_user_children:0.00
# Keyspace
db0:keys=2,expires=0,avg_ttl=0
(error) ERR unknown command 'eof'
`

View File

@ -59,7 +59,7 @@ exit_if_fail godep go install -v ./...
# Run the tests
exit_if_fail godep go vet ./...
exit_if_fail godep go test -v -short ./...
exit_if_fail godep go test -short ./...
# Build binaries
build "linux" "amd64" $VERSION

View File

@ -26,7 +26,6 @@ kafka:
ADVERTISED_HOST:
ADVERTISED_PORT: 9092
opentsdb:
image: lancope/opentsdb
ports:
@ -43,3 +42,8 @@ opentsdb:
image: lancope/opentsdb
ports:
- "24242:4242"
redis:
image: redis
ports:
- "6379:6379"

View File

@ -71,7 +71,7 @@ func (a *Accumulator) CheckValue(measurement string, val interface{}) bool {
return p.Values["value"] == val
}
}
fmt.Printf("CheckValue failed, measurement %s, value %s", measurement, val)
return false
}