diff --git a/Godeps b/Godeps index 784c6044c..c659227ca 100644 --- a/Godeps +++ b/Godeps @@ -27,6 +27,7 @@ github.com/golang/snappy 7db9049039a047d955fe8c19b83c8ff5abd765c7 github.com/go-ole/go-ole be49f7c07711fcb603cff39e1de7c67926dc0ba7 github.com/google/go-cmp f94e52cad91c65a63acc1e75d4be223ea22e99bc github.com/gorilla/mux 392c28fe23e1c45ddba891b0320b3b5df220beea +github.com/go-redis/redis 73b70592cdaa9e6abdfcfbf97b4a90d80728c836 github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 github.com/hashicorp/consul 63d2fc68239b996096a1c55a0d4b400ea4c2583f diff --git a/plugins/inputs/redis/redis.go b/plugins/inputs/redis/redis.go index 7f6143340..c809aa437 100644 --- a/plugins/inputs/redis/redis.go +++ b/plugins/inputs/redis/redis.go @@ -2,21 +2,47 @@ package redis import ( "bufio" - "errors" "fmt" - "net" + "io" + "log" "net/url" "strconv" "strings" "sync" "time" + "github.com/go-redis/redis" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) type Redis struct { Servers []string + + clients []Client + initialized bool +} + +type Client interface { + Info() *redis.StringCmd + BaseTags() map[string]string +} + +type RedisClient struct { + client *redis.Client + tags map[string]string +} + +func (r *RedisClient) Info() *redis.StringCmd { + return r.client.Info() +} + +func (r *RedisClient) BaseTags() map[string]string { + tags := make(map[string]string) + for k, v := range r.tags { + tags[k] = v + } + return tags } var sampleConfig = ` @@ -32,8 +58,6 @@ var sampleConfig = ` servers = ["tcp://localhost:6379"] ` -var defaultTimeout = 5 * time.Second - func (r *Redis) SampleConfig() string { return sampleConfig } @@ -48,111 +72,107 @@ var Tracking = map[string]string{ "role": "replication_role", } -var ErrProtocolError = errors.New("redis protocol error") - -const defaultPort = "6379" - -// Reads stats from all configured servers accumulates stats. -// Returns one of the errors encountered while gather stats (if any). -func (r *Redis) Gather(acc telegraf.Accumulator) error { - if len(r.Servers) == 0 { - url := &url.URL{ - Scheme: "tcp", - Host: ":6379", - } - r.gatherServer(url, acc) +func (r *Redis) init(acc telegraf.Accumulator) error { + if r.initialized { return nil } - var wg sync.WaitGroup - for _, serv := range r.Servers { + if len(r.Servers) == 0 { + r.Servers = []string{"tcp://localhost:6379"} + } + + r.clients = make([]Client, len(r.Servers)) + + for i, serv := range r.Servers { if !strings.HasPrefix(serv, "tcp://") && !strings.HasPrefix(serv, "unix://") { + log.Printf("W! [inputs.redis]: server URL found without scheme; please update your configuration file") serv = "tcp://" + serv } u, err := url.Parse(serv) if err != nil { - acc.AddError(fmt.Errorf("Unable to parse to address '%s': %s", serv, err)) - continue - } else if u.Scheme == "" { - // fallback to simple string based address (i.e. "10.0.0.1:10000") - u.Scheme = "tcp" - u.Host = serv - u.Path = "" + return fmt.Errorf("Unable to parse to address %q: %v", serv, err) } - if u.Scheme == "tcp" { - _, _, err := net.SplitHostPort(u.Host) - if err != nil { - u.Host = u.Host + ":" + defaultPort + + password := "" + if u.User != nil { + pw, ok := u.User.Password() + if ok { + password = pw } } + var address string + if u.Scheme == "unix" { + address = u.Path + } else { + address = u.Host + } + + client := redis.NewClient( + &redis.Options{ + Addr: address, + Password: password, + Network: u.Scheme, + PoolSize: 1, + }, + ) + + tags := map[string]string{} + if u.Scheme == "unix" { + tags["socket"] = u.Path + } else { + tags["server"] = u.Hostname() + tags["port"] = u.Port() + } + + r.clients[i] = &RedisClient{ + client: client, + tags: tags, + } + } + + r.initialized = true + return nil +} + +// Reads stats from all configured servers accumulates stats. +// Returns one of the errors encountered while gather stats (if any). +func (r *Redis) Gather(acc telegraf.Accumulator) error { + if !r.initialized { + err := r.init(acc) + if err != nil { + return err + } + } + + var wg sync.WaitGroup + + for _, client := range r.clients { wg.Add(1) - go func(serv string) { + go func(client Client) { defer wg.Done() - acc.AddError(r.gatherServer(u, acc)) - }(serv) + acc.AddError(r.gatherServer(client, acc)) + }(client) } wg.Wait() return nil } -func (r *Redis) gatherServer(addr *url.URL, acc telegraf.Accumulator) error { - var address string - - if addr.Scheme == "unix" { - address = addr.Path - } else { - address = addr.Host - } - c, err := net.DialTimeout(addr.Scheme, address, defaultTimeout) +func (r *Redis) gatherServer(client Client, acc telegraf.Accumulator) error { + info, err := client.Info().Result() if err != nil { - return fmt.Errorf("Unable to connect to redis server '%s': %s", address, err) - } - defer c.Close() - - // Extend connection - c.SetDeadline(time.Now().Add(defaultTimeout)) - - if addr.User != nil { - pwd, set := addr.User.Password() - if set && pwd != "" { - c.Write([]byte(fmt.Sprintf("AUTH %s\r\n", pwd))) - - rdr := bufio.NewReader(c) - - line, err := rdr.ReadString('\n') - if err != nil { - return err - } - if line[0] != '+' { - return fmt.Errorf("%s", strings.TrimSpace(line)[1:]) - } - } + return err } - c.Write([]byte("INFO\r\n")) - c.Write([]byte("EOF\r\n")) - rdr := bufio.NewReader(c) - - var tags map[string]string - - if addr.Scheme == "unix" { - tags = map[string]string{"socket": addr.Path} - } else { - // Setup tags for all redis metrics - host, port := "unknown", "unknown" - // If there's an error, ignore and use 'unknown' tags - host, port, _ = net.SplitHostPort(addr.Host) - tags = map[string]string{"server": host, "port": port} - } - return gatherInfoOutput(rdr, acc, tags) + rdr := strings.NewReader(info) + return gatherInfoOutput(rdr, acc, client.BaseTags()) } // gatherInfoOutput gathers func gatherInfoOutput( - rdr *bufio.Reader, + rdr io.Reader, acc telegraf.Accumulator, tags map[string]string, ) error { @@ -163,13 +183,11 @@ func gatherInfoOutput( fields := make(map[string]interface{}) for scanner.Scan() { line := scanner.Text() - if strings.Contains(line, "ERR") { - break - } if len(line) == 0 { continue } + if line[0] == '#' { if len(line) > 2 { section = line[2:]