telegraf/plugins/inputs/redis/redis.go

303 lines
6.0 KiB
Go

package redis
import (
"bufio"
"fmt"
"io"
"log"
"net/url"
"strconv"
"strings"
"sync"
"time"
"github.com/go-redis/redis"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
type Redis struct {
Servers []string
clients []Client
initialized bool
}
type Client interface {
Info() *redis.StringCmd
BaseTags() map[string]string
}
type RedisClient struct {
client *redis.Client
tags map[string]string
}
func (r *RedisClient) Info() *redis.StringCmd {
return r.client.Info()
}
func (r *RedisClient) BaseTags() map[string]string {
tags := make(map[string]string)
for k, v := range r.tags {
tags[k] = v
}
return tags
}
var sampleConfig = `
## specify servers via a url matching:
## [protocol://][:password]@address[:port]
## e.g.
## tcp://localhost:6379
## tcp://:password@192.168.99.100
## unix:///var/run/redis.sock
##
## If no servers are specified, then localhost is used as the host.
## If no port is specified, 6379 is used
servers = ["tcp://localhost:6379"]
`
func (r *Redis) SampleConfig() string {
return sampleConfig
}
func (r *Redis) Description() string {
return "Read metrics from one or many redis servers"
}
var Tracking = map[string]string{
"uptime_in_seconds": "uptime",
"connected_clients": "clients",
"role": "replication_role",
}
func (r *Redis) init(acc telegraf.Accumulator) error {
if r.initialized {
return nil
}
if len(r.Servers) == 0 {
r.Servers = []string{"tcp://localhost:6379"}
}
r.clients = make([]Client, len(r.Servers))
for i, serv := range r.Servers {
if !strings.HasPrefix(serv, "tcp://") && !strings.HasPrefix(serv, "unix://") {
log.Printf("W! [inputs.redis]: server URL found without scheme; please update your configuration file")
serv = "tcp://" + serv
}
u, err := url.Parse(serv)
if err != nil {
return fmt.Errorf("Unable to parse to address %q: %v", serv, err)
}
password := ""
if u.User != nil {
pw, ok := u.User.Password()
if ok {
password = pw
}
}
var address string
if u.Scheme == "unix" {
address = u.Path
} else {
address = u.Host
}
client := redis.NewClient(
&redis.Options{
Addr: address,
Password: password,
Network: u.Scheme,
PoolSize: 1,
},
)
tags := map[string]string{}
if u.Scheme == "unix" {
tags["socket"] = u.Path
} else {
tags["server"] = u.Hostname()
tags["port"] = u.Port()
}
r.clients[i] = &RedisClient{
client: client,
tags: tags,
}
}
r.initialized = true
return nil
}
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (r *Redis) Gather(acc telegraf.Accumulator) error {
if !r.initialized {
err := r.init(acc)
if err != nil {
return err
}
}
var wg sync.WaitGroup
for _, client := range r.clients {
wg.Add(1)
go func(client Client) {
defer wg.Done()
acc.AddError(r.gatherServer(client, acc))
}(client)
}
wg.Wait()
return nil
}
func (r *Redis) gatherServer(client Client, acc telegraf.Accumulator) error {
info, err := client.Info().Result()
if err != nil {
return err
}
rdr := strings.NewReader(info)
return gatherInfoOutput(rdr, acc, client.BaseTags())
}
// gatherInfoOutput gathers
func gatherInfoOutput(
rdr io.Reader,
acc telegraf.Accumulator,
tags map[string]string,
) error {
var section string
var keyspace_hits, keyspace_misses int64
scanner := bufio.NewScanner(rdr)
fields := make(map[string]interface{})
for scanner.Scan() {
line := scanner.Text()
if len(line) == 0 {
continue
}
if line[0] == '#' {
if len(line) > 2 {
section = line[2:]
}
continue
}
parts := strings.SplitN(line, ":", 2)
if len(parts) < 2 {
continue
}
name := string(parts[0])
if section == "Server" {
if name != "lru_clock" && name != "uptime_in_seconds" && name != "redis_version" {
continue
}
}
if name == "mem_allocator" {
continue
}
if strings.HasSuffix(name, "_human") {
continue
}
metric, ok := Tracking[name]
if !ok {
if section == "Keyspace" {
kline := strings.TrimSpace(string(parts[1]))
gatherKeyspaceLine(name, kline, acc, tags)
continue
}
metric = name
}
val := strings.TrimSpace(parts[1])
// Try parsing as int
if ival, err := strconv.ParseInt(val, 10, 64); err == nil {
switch name {
case "keyspace_hits":
keyspace_hits = ival
case "keyspace_misses":
keyspace_misses = ival
case "rdb_last_save_time":
// influxdb can't calculate this, so we have to do it
fields["rdb_last_save_time_elapsed"] = time.Now().Unix() - ival
}
fields[metric] = ival
continue
}
// Try parsing as a float
if fval, err := strconv.ParseFloat(val, 64); err == nil {
fields[metric] = fval
continue
}
// Treat it as a string
if name == "role" {
tags["replication_role"] = val
continue
}
fields[metric] = val
}
var keyspace_hitrate float64 = 0.0
if keyspace_hits != 0 || keyspace_misses != 0 {
keyspace_hitrate = float64(keyspace_hits) / float64(keyspace_hits+keyspace_misses)
}
fields["keyspace_hitrate"] = keyspace_hitrate
acc.AddFields("redis", fields, tags)
return nil
}
// Parse the special Keyspace line at end of redis stats
// This is a special line that looks something like:
// db0:keys=2,expires=0,avg_ttl=0
// And there is one for each db on the redis instance
func gatherKeyspaceLine(
name string,
line string,
acc telegraf.Accumulator,
global_tags map[string]string,
) {
if strings.Contains(line, "keys=") {
fields := make(map[string]interface{})
tags := make(map[string]string)
for k, v := range global_tags {
tags[k] = v
}
tags["database"] = name
dbparts := strings.Split(line, ",")
for _, dbp := range dbparts {
kv := strings.Split(dbp, "=")
ival, err := strconv.ParseInt(kv[1], 10, 64)
if err == nil {
fields[kv[0]] = ival
}
}
acc.AddFields("redis_keyspace", fields, tags)
}
}
func init() {
inputs.Add("redis", func() telegraf.Input {
return &Redis{}
})
}