Use go-redis for the redis input (#3661)

This commit is contained in:
Daniel Nelson 2018-01-17 14:57:46 -08:00 committed by GitHub
parent ad921a3840
commit fa5f1bf6d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 103 additions and 84 deletions

1
Godeps
View File

@ -27,6 +27,7 @@ github.com/golang/snappy 7db9049039a047d955fe8c19b83c8ff5abd765c7
github.com/go-ole/go-ole be49f7c07711fcb603cff39e1de7c67926dc0ba7 github.com/go-ole/go-ole be49f7c07711fcb603cff39e1de7c67926dc0ba7
github.com/google/go-cmp f94e52cad91c65a63acc1e75d4be223ea22e99bc github.com/google/go-cmp f94e52cad91c65a63acc1e75d4be223ea22e99bc
github.com/gorilla/mux 392c28fe23e1c45ddba891b0320b3b5df220beea github.com/gorilla/mux 392c28fe23e1c45ddba891b0320b3b5df220beea
github.com/go-redis/redis 73b70592cdaa9e6abdfcfbf97b4a90d80728c836
github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034
github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478
github.com/hashicorp/consul 63d2fc68239b996096a1c55a0d4b400ea4c2583f github.com/hashicorp/consul 63d2fc68239b996096a1c55a0d4b400ea4c2583f

View File

@ -2,21 +2,47 @@ package redis
import ( import (
"bufio" "bufio"
"errors"
"fmt" "fmt"
"net" "io"
"log"
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/go-redis/redis"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
type Redis struct { type Redis struct {
Servers []string Servers []string
clients []Client
initialized bool
}
type Client interface {
Info() *redis.StringCmd
BaseTags() map[string]string
}
type RedisClient struct {
client *redis.Client
tags map[string]string
}
func (r *RedisClient) Info() *redis.StringCmd {
return r.client.Info()
}
func (r *RedisClient) BaseTags() map[string]string {
tags := make(map[string]string)
for k, v := range r.tags {
tags[k] = v
}
return tags
} }
var sampleConfig = ` var sampleConfig = `
@ -32,8 +58,6 @@ var sampleConfig = `
servers = ["tcp://localhost:6379"] servers = ["tcp://localhost:6379"]
` `
var defaultTimeout = 5 * time.Second
func (r *Redis) SampleConfig() string { func (r *Redis) SampleConfig() string {
return sampleConfig return sampleConfig
} }
@ -48,111 +72,107 @@ var Tracking = map[string]string{
"role": "replication_role", "role": "replication_role",
} }
var ErrProtocolError = errors.New("redis protocol error") func (r *Redis) init(acc telegraf.Accumulator) error {
if r.initialized {
const defaultPort = "6379"
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (r *Redis) Gather(acc telegraf.Accumulator) error {
if len(r.Servers) == 0 {
url := &url.URL{
Scheme: "tcp",
Host: ":6379",
}
r.gatherServer(url, acc)
return nil return nil
} }
var wg sync.WaitGroup if len(r.Servers) == 0 {
for _, serv := range r.Servers { r.Servers = []string{"tcp://localhost:6379"}
}
r.clients = make([]Client, len(r.Servers))
for i, serv := range r.Servers {
if !strings.HasPrefix(serv, "tcp://") && !strings.HasPrefix(serv, "unix://") { if !strings.HasPrefix(serv, "tcp://") && !strings.HasPrefix(serv, "unix://") {
log.Printf("W! [inputs.redis]: server URL found without scheme; please update your configuration file")
serv = "tcp://" + serv serv = "tcp://" + serv
} }
u, err := url.Parse(serv) u, err := url.Parse(serv)
if err != nil { if err != nil {
acc.AddError(fmt.Errorf("Unable to parse to address '%s': %s", serv, err)) return fmt.Errorf("Unable to parse to address %q: %v", serv, err)
continue
} else if u.Scheme == "" {
// fallback to simple string based address (i.e. "10.0.0.1:10000")
u.Scheme = "tcp"
u.Host = serv
u.Path = ""
} }
if u.Scheme == "tcp" {
_, _, err := net.SplitHostPort(u.Host) password := ""
if err != nil { if u.User != nil {
u.Host = u.Host + ":" + defaultPort pw, ok := u.User.Password()
if ok {
password = pw
} }
} }
var address string
if u.Scheme == "unix" {
address = u.Path
} else {
address = u.Host
}
client := redis.NewClient(
&redis.Options{
Addr: address,
Password: password,
Network: u.Scheme,
PoolSize: 1,
},
)
tags := map[string]string{}
if u.Scheme == "unix" {
tags["socket"] = u.Path
} else {
tags["server"] = u.Hostname()
tags["port"] = u.Port()
}
r.clients[i] = &RedisClient{
client: client,
tags: tags,
}
}
r.initialized = true
return nil
}
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (r *Redis) Gather(acc telegraf.Accumulator) error {
if !r.initialized {
err := r.init(acc)
if err != nil {
return err
}
}
var wg sync.WaitGroup
for _, client := range r.clients {
wg.Add(1) wg.Add(1)
go func(serv string) { go func(client Client) {
defer wg.Done() defer wg.Done()
acc.AddError(r.gatherServer(u, acc)) acc.AddError(r.gatherServer(client, acc))
}(serv) }(client)
} }
wg.Wait() wg.Wait()
return nil return nil
} }
func (r *Redis) gatherServer(addr *url.URL, acc telegraf.Accumulator) error { func (r *Redis) gatherServer(client Client, acc telegraf.Accumulator) error {
var address string info, err := client.Info().Result()
if addr.Scheme == "unix" {
address = addr.Path
} else {
address = addr.Host
}
c, err := net.DialTimeout(addr.Scheme, address, defaultTimeout)
if err != nil {
return fmt.Errorf("Unable to connect to redis server '%s': %s", address, err)
}
defer c.Close()
// Extend connection
c.SetDeadline(time.Now().Add(defaultTimeout))
if addr.User != nil {
pwd, set := addr.User.Password()
if set && pwd != "" {
c.Write([]byte(fmt.Sprintf("AUTH %s\r\n", pwd)))
rdr := bufio.NewReader(c)
line, err := rdr.ReadString('\n')
if err != nil { if err != nil {
return err return err
} }
if line[0] != '+' {
return fmt.Errorf("%s", strings.TrimSpace(line)[1:])
}
}
}
c.Write([]byte("INFO\r\n")) rdr := strings.NewReader(info)
c.Write([]byte("EOF\r\n")) return gatherInfoOutput(rdr, acc, client.BaseTags())
rdr := bufio.NewReader(c)
var tags map[string]string
if addr.Scheme == "unix" {
tags = map[string]string{"socket": addr.Path}
} else {
// Setup tags for all redis metrics
host, port := "unknown", "unknown"
// If there's an error, ignore and use 'unknown' tags
host, port, _ = net.SplitHostPort(addr.Host)
tags = map[string]string{"server": host, "port": port}
}
return gatherInfoOutput(rdr, acc, tags)
} }
// gatherInfoOutput gathers // gatherInfoOutput gathers
func gatherInfoOutput( func gatherInfoOutput(
rdr *bufio.Reader, rdr io.Reader,
acc telegraf.Accumulator, acc telegraf.Accumulator,
tags map[string]string, tags map[string]string,
) error { ) error {
@ -163,13 +183,11 @@ func gatherInfoOutput(
fields := make(map[string]interface{}) fields := make(map[string]interface{})
for scanner.Scan() { for scanner.Scan() {
line := scanner.Text() line := scanner.Text()
if strings.Contains(line, "ERR") {
break
}
if len(line) == 0 { if len(line) == 0 {
continue continue
} }
if line[0] == '#' { if line[0] == '#' {
if len(line) > 2 { if len(line) > 2 {
section = line[2:] section = line[2:]