From f8d64a73783a1d92df2a06595521ae3f3af153d9 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 22 Sep 2015 18:13:35 -0700 Subject: [PATCH] Redis: include per-db keyspace info Closes #205 --- CHANGELOG.md | 13 ++- Makefile | 2 +- plugins/redis/redis.go | 154 ++++++++++++++++++------------------ plugins/redis/redis_test.go | 135 +++---------------------------- scripts/circle-test.sh | 2 +- scripts/docker-compose.yml | 6 +- testutil/accumulator.go | 2 +- 7 files changed, 111 insertions(+), 203 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 22256fe38..33b3bd220 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +## v0.1.10 [unreleased] + +### Release Notes + +### Features +- [#205](https://github.com/influxdb/telegraf/issues/205): Include per-db redis keyspace info + +### Bugfixes + ## v0.1.9 [2015-09-22] ### Release Notes @@ -14,7 +23,9 @@ file with only the cpu plugin defined, and the influxdb output defined. - **Breaking Change**: The CPU collection plugin has been refactored to fix some bugs and outdated dependency issues. At the same time, I also decided to fix a naming consistency issue, so cpu_percentageIdle will become cpu_usage_idle. -Also, all CPU time measurements now have it indicated in their name, so cpu_idle will become cpu_time_idle. Additionally, cpu_time measurements are going to be dropped in the default config. +Also, all CPU time measurements now have it indicated in their name, so cpu_idle +will become cpu_time_idle. Additionally, cpu_time measurements are going to be +dropped in the default config. - **Breaking Change**: The memory plugin has been refactored and some measurements have been renamed for consistency. Some measurements have also been removed from being outputted. They are still being collected by gopsutil, and could easily be re-added in a "verbose" mode if there is demand for it. diff --git a/Makefile b/Makefile index 751cde8bb..60555765e 100644 --- a/Makefile +++ b/Makefile @@ -33,7 +33,7 @@ ifeq ($(UNAME), Linux) endif test: prepare docker-compose - $(GOBIN)/godep go test -v ./... + $(GOBIN)/godep go test ./... test-short: prepare $(GOBIN)/godep go test -short ./... diff --git a/plugins/redis/redis.go b/plugins/redis/redis.go index a7a8aca10..8270c2d6a 100644 --- a/plugins/redis/redis.go +++ b/plugins/redis/redis.go @@ -6,7 +6,7 @@ import ( "fmt" "net" "net/url" - // "strconv" + "strconv" "strings" "sync" @@ -137,88 +137,90 @@ func (r *Redis) gatherServer(addr *url.URL, acc plugins.Accumulator) error { } } - c.Write([]byte("info\r\n")) - + c.Write([]byte("INFO\r\n")) + c.Write([]byte("EOF\r\n")) rdr := bufio.NewReader(c) + + // Setup tags for all redis metrics + _, rPort, err := net.SplitHostPort(addr.Host) + if err != nil { + rPort = defaultPort + } + tags := map[string]string{"host": addr.String(), "port": rPort} + + return gatherInfoOutput(rdr, acc, tags) +} + +// gatherInfoOutput gathers +func gatherInfoOutput( + rdr *bufio.Reader, + acc plugins.Accumulator, + tags map[string]string, +) error { scanner := bufio.NewScanner(rdr) for scanner.Scan() { - fmt.Println(scanner.Text()) + line := scanner.Text() + if strings.Contains(line, "ERR") { + break + } + + if len(line) == 0 || line[0] == '#' { + continue + } + + parts := strings.SplitN(line, ":", 2) + if len(parts) < 2 { + continue + } + + name := string(parts[0]) + metric, ok := Tracking[name] + if !ok { + kline := strings.TrimSpace(string(parts[1])) + gatherKeyspaceLine(name, kline, acc, tags) + continue + } + + val := strings.TrimSpace(parts[1]) + ival, err := strconv.ParseUint(val, 10, 64) + if err == nil { + acc.Add(metric, ival, tags) + continue + } + + fval, err := strconv.ParseFloat(val, 64) + if err != nil { + return err + } + + acc.Add(metric, fval, tags) } - if err := scanner.Err(); err != nil { - fmt.Println("reading standard input:", err) - } - - // line, err := rdr.ReadString('\n') - // if err != nil { - // return err - // } - - // if line[0] != '$' { - // return fmt.Errorf("bad line start: %s", ErrProtocolError) - // } - - // line = strings.TrimSpace(line) - - // szStr := line[0:] - - // sz, err := strconv.Atoi(szStr) - // if err != nil { - // return fmt.Errorf("bad size string <<%s>>: %s", szStr, ErrProtocolError) - // } - - // var read int - - // for read < sz { - // line, err := rdr.ReadString('\n') - // fmt.Printf(line) - // if err != nil { - // return err - // } - - // read += len(line) - // if len(line) == 1 || line[0] == '#' { - // continue - // } - - // _, rPort, err := net.SplitHostPort(addr.Host) - // if err != nil { - // rPort = defaultPort - // } - // tags := map[string]string{"host": addr.String(), "port": rPort} - - // parts := strings.SplitN(line, ":", 2) - // if len(parts) < 2 { - // continue - // } - // name := string(parts[0]) - // metric, ok := Tracking[name] - // if !ok { - // // See if this is the keyspace line - // if strings.Contains(string(parts[1]), "keys=") { - // tags["database"] = name - // acc.Add("foo", 999, tags) - // } - // continue - // } - - // val := strings.TrimSpace(parts[1]) - // ival, err := strconv.ParseUint(val, 10, 64) - // if err == nil { - // acc.Add(metric, ival, tags) - // continue - // } - - // fval, err := strconv.ParseFloat(val, 64) - // if err != nil { - // return err - // } - - // acc.Add(metric, fval, tags) - // } - return nil } +// Parse the special Keyspace line at end of redis stats +// This is a special line that looks something like: +// db0:keys=2,expires=0,avg_ttl=0 +// And there is one for each db on the redis instance +func gatherKeyspaceLine( + name string, + line string, + acc plugins.Accumulator, + tags map[string]string, +) { + if strings.Contains(line, "keys=") { + tags["database"] = name + dbparts := strings.Split(line, ",") + for _, dbp := range dbparts { + kv := strings.Split(dbp, "=") + ival, err := strconv.ParseUint(kv[1], 10, 64) + if err == nil { + acc.Add(kv[0], ival, tags) + } + } + } +} + func init() { plugins.Add("redis", func() plugins.Plugin { return &Redis{} diff --git a/plugins/redis/redis_test.go b/plugins/redis/redis_test.go index adf38ef75..2b369e0a1 100644 --- a/plugins/redis/redis_test.go +++ b/plugins/redis/redis_test.go @@ -3,7 +3,7 @@ package redis import ( "bufio" "fmt" - "net" + "strings" "testing" "github.com/influxdb/telegraf/testutil" @@ -11,40 +11,12 @@ import ( "github.com/stretchr/testify/require" ) -func TestRedisGeneratesMetrics(t *testing.T) { +func TestRedisConnect(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } - l, err := net.Listen("tcp", ":0") - require.NoError(t, err) - - defer l.Close() - - go func() { - c, err := l.Accept() - if err != nil { - return - } - - buf := bufio.NewReader(c) - - for { - line, err := buf.ReadString('\n') - if err != nil { - return - } - - if line != "info\r\n" { - return - } - - fmt.Fprintf(c, "$%d\n", len(testOutput)) - c.Write([]byte(testOutput)) - } - }() - - addr := fmt.Sprintf("redis://%s", l.Addr().String()) + addr := fmt.Sprintf(testutil.GetLocalHost() + ":6379") r := &Redis{ Servers: []string{addr}, @@ -52,102 +24,16 @@ func TestRedisGeneratesMetrics(t *testing.T) { var acc testutil.Accumulator - err = r.Gather(&acc) + err := r.Gather(&acc) require.NoError(t, err) - - checkInt := []struct { - name string - value uint64 - }{ - {"uptime", 238}, - {"clients", 1}, - {"used_memory", 1003936}, - {"used_memory_rss", 811008}, - {"used_memory_peak", 1003936}, - {"used_memory_lua", 33792}, - {"rdb_changes_since_last_save", 0}, - {"total_connections_received", 2}, - {"total_commands_processed", 1}, - {"instantaneous_ops_per_sec", 0}, - {"sync_full", 0}, - {"sync_partial_ok", 0}, - {"sync_partial_err", 0}, - {"expired_keys", 0}, - {"evicted_keys", 0}, - {"keyspace_hits", 0}, - {"keyspace_misses", 0}, - {"pubsub_channels", 0}, - {"pubsub_patterns", 0}, - {"latest_fork_usec", 0}, - {"connected_slaves", 0}, - {"master_repl_offset", 0}, - {"repl_backlog_active", 0}, - {"repl_backlog_size", 1048576}, - {"repl_backlog_histlen", 0}, - } - - for _, c := range checkInt { - assert.True(t, acc.CheckValue(c.name, c.value)) - } - - checkFloat := []struct { - name string - value float64 - }{ - {"mem_fragmentation_ratio", 0.81}, - {"used_cpu_sys", 0.14}, - {"used_cpu_user", 0.05}, - {"used_cpu_sys_children", 0.00}, - {"used_cpu_user_children", 0.00}, - } - - for _, c := range checkFloat { - assert.True(t, acc.CheckValue(c.name, c.value)) - } } -func TestRedisCanPullStatsFromMultipleServers(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integration test in short mode") - } - - l, err := net.Listen("tcp", ":0") - require.NoError(t, err) - - defer l.Close() - - go func() { - c, err := l.Accept() - if err != nil { - return - } - - buf := bufio.NewReader(c) - - for { - line, err := buf.ReadString('\n') - if err != nil { - return - } - - if line != "info\r\n" { - return - } - - fmt.Fprintf(c, "$%d\n", len(testOutput)) - c.Write([]byte(testOutput)) - } - }() - - addr := fmt.Sprintf("redis://%s", l.Addr().String()) - - r := &Redis{ - Servers: []string{addr}, - } - +func TestRedis_ParseMetrics(t *testing.T) { var acc testutil.Accumulator + tags := map[string]string{"host": "redis.net"} + rdr := bufio.NewReader(strings.NewReader(testOutput)) - err = r.Gather(&acc) + err := gatherInfoOutput(rdr, &acc, tags) require.NoError(t, err) checkInt := []struct { @@ -179,6 +65,9 @@ func TestRedisCanPullStatsFromMultipleServers(t *testing.T) { {"repl_backlog_active", 0}, {"repl_backlog_size", 1048576}, {"repl_backlog_histlen", 0}, + {"keys", 2}, + {"expires", 0}, + {"avg_ttl", 0}, } for _, c := range checkInt { @@ -284,5 +173,7 @@ used_cpu_sys_children:0.00 used_cpu_user_children:0.00 # Keyspace +db0:keys=2,expires=0,avg_ttl=0 +(error) ERR unknown command 'eof' ` diff --git a/scripts/circle-test.sh b/scripts/circle-test.sh index e8c7c131b..5badaccde 100755 --- a/scripts/circle-test.sh +++ b/scripts/circle-test.sh @@ -59,7 +59,7 @@ exit_if_fail godep go install -v ./... # Run the tests exit_if_fail godep go vet ./... -exit_if_fail godep go test -v -short ./... +exit_if_fail godep go test -short ./... # Build binaries build "linux" "amd64" $VERSION diff --git a/scripts/docker-compose.yml b/scripts/docker-compose.yml index c7d360863..3772a3b8b 100644 --- a/scripts/docker-compose.yml +++ b/scripts/docker-compose.yml @@ -26,7 +26,6 @@ kafka: ADVERTISED_HOST: ADVERTISED_PORT: 9092 - opentsdb: image: lancope/opentsdb ports: @@ -43,3 +42,8 @@ opentsdb: image: lancope/opentsdb ports: - "24242:4242" + +redis: + image: redis + ports: + - "6379:6379" diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 10d35a77e..1eca05946 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -71,7 +71,7 @@ func (a *Accumulator) CheckValue(measurement string, val interface{}) bool { return p.Values["value"] == val } } - + fmt.Printf("CheckValue failed, measurement %s, value %s", measurement, val) return false }