diff --git a/plugins/all/all.go b/plugins/all/all.go index 0b0f6236d..cc34f2235 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -1,7 +1,8 @@ package all import ( - "github.com/influxdb/telegraf/plugins/ceph" + _ "github.com/influxdb/telegraf/plugins/ceph" + _ "github.com/influxdb/telegraf/plugins/memcached" _ "github.com/influxdb/telegraf/plugins/mysql" _ "github.com/influxdb/telegraf/plugins/postgresql" _ "github.com/influxdb/telegraf/plugins/redis" diff --git a/plugins/memcached/memcached.go b/plugins/memcached/memcached.go new file mode 100644 index 000000000..802fae35d --- /dev/null +++ b/plugins/memcached/memcached.go @@ -0,0 +1,134 @@ +package memcached + +import ( + "bufio" + "bytes" + "fmt" + "net" + "strconv" + "time" + + "github.com/influxdb/telegraf/plugins" +) + +// Memcached is a memcached plugin +type Memcached struct { + Servers []string +} + +var sampleConfig = ` +# An array of address to gather stats about. Specify an ip on hostname +# with optional port. ie localhost, 10.0.0.1:11211, etc. +# +# If no servers are specified, then localhost is used as the host. +servers = ["localhost"]` + +var defaultTimeout = 5 * time.Second + +// The list of metrics tha should be calculated +var sendAsIs = []string{ + "get_hits", + "get_misses", + "evictions", + "limit_maxbytes", + "bytes", +} + +// SampleConfig returns sample configuration message +func (m *Memcached) SampleConfig() string { + return sampleConfig +} + +// Description returns description of Memcached plugin +func (m *Memcached) Description() string { + return "Read metrics from one or many memcached servers" +} + +// Gather reads stats from all configured servers accumulates stats +func (m *Memcached) Gather(acc plugins.Accumulator) error { + if len(m.Servers) == 0 { + return m.gatherServer(":11211", acc) + } + + for _, serverAddress := range m.Servers { + if err := m.gatherServer(serverAddress, acc); err != nil { + return err + } + } + + return nil +} + +func (m *Memcached) gatherServer(address string, acc plugins.Accumulator) error { + _, _, err := net.SplitHostPort(address) + if err != nil { + address = address + ":11211" + } + + // Connect + conn, err := net.DialTimeout("tcp", address, defaultTimeout) + if err != nil { + return err + } + defer conn.Close() + + // Extend connection + conn.SetDeadline(time.Now().Add(defaultTimeout)) + + // Read and write buffer + rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + // Send command + if _, err = fmt.Fprint(rw, "stats\r\n"); err != nil { + return err + } + if err = rw.Flush(); err != nil { + return err + } + + // Read response + values := make(map[string]string) + + for { + // Read line + line, _, errRead := rw.Reader.ReadLine() + if errRead != nil { + return errRead + } + // Done + if bytes.Equal(line, []byte("END")) { + break + } + // Read values + var name, value string + n, errScan := fmt.Sscanf(string(line), "STAT %s %s\r\n", &name, &value) + if errScan != nil || n != 2 { + return fmt.Errorf("unexpected line in stats response: %q", line) + } + + // Save values + values[name] = value + } + + // + tags := map[string]string{"server": address} + + // Process values + for _, key := range sendAsIs { + if value, ok := values[key]; ok { + // Mostly it is the number + if iValue, errParse := strconv.ParseInt(value, 10, 64); errParse != nil { + acc.Add(key, value, tags) + } else { + acc.Add(key, iValue, tags) + } + } + } + return nil +} + +func init() { + plugins.Add("memcached", func() plugins.Plugin { + return &Memcached{} + }) +} diff --git a/plugins/memcached/memcached_test.go b/plugins/memcached/memcached_test.go new file mode 100644 index 000000000..08e696fb7 --- /dev/null +++ b/plugins/memcached/memcached_test.go @@ -0,0 +1,26 @@ +package memcached + +import ( + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMemcachedGeneratesMetrics(t *testing.T) { + m := &Memcached{ + Servers: []string{"localhost"}, + } + + var acc testutil.Accumulator + + err := m.Gather(&acc) + require.NoError(t, err) + + intMetrics := []string{"get_hits", "get_misses", "evictions", "limit_maxbytes", "bytes"} + + for _, metric := range intMetrics { + assert.True(t, acc.HasIntValue(metric), metric) + } +} diff --git a/plugins/redis/redis.go b/plugins/redis/redis.go index 2dd184a12..831d74dbe 100644 --- a/plugins/redis/redis.go +++ b/plugins/redis/redis.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net" + "net/url" "strconv" "strings" "sync" @@ -20,8 +21,9 @@ type Redis struct { } var sampleConfig = ` -# An array of address to gather stats about. Specify an ip on hostname -# with optional port. ie localhost, 10.10.3.33:18832, etc. +# An array of URI to gather stats about. Specify an ip or hostname +# with optional port add password. ie redis://localhost, redis://10.10.3.33:18832, +# 10.0.0.1:10000, etc. # # If no servers are specified, then localhost is used as the host. servers = ["localhost"]` @@ -73,7 +75,10 @@ var ErrProtocolError = errors.New("redis protocol error") // Returns one of the errors encountered while gather stats (if any). func (g *Redis) Gather(acc plugins.Accumulator) error { if len(g.Servers) == 0 { - g.gatherServer(":6379", acc) + url := &url.URL{ + Host: ":6379", + } + g.gatherServer(url, acc) return nil } @@ -82,10 +87,19 @@ func (g *Redis) Gather(acc plugins.Accumulator) error { var outerr error for _, serv := range g.Servers { + u, err := url.Parse(serv) + if err != nil { + return fmt.Errorf("Unable to parse to address '%s': %s", serv, err) + } else if u.Scheme == "" { + // fallback to simple string based address (i.e. "10.0.0.1:10000") + u.Scheme = "tcp" + u.Host = serv + u.Path = "" + } wg.Add(1) go func(serv string) { defer wg.Done() - outerr = g.gatherServer(serv, acc) + outerr = g.gatherServer(u, acc) }(serv) } @@ -96,17 +110,34 @@ func (g *Redis) Gather(acc plugins.Accumulator) error { const defaultPort = "6379" -func (g *Redis) gatherServer(addr string, acc plugins.Accumulator) error { +func (g *Redis) gatherServer(addr *url.URL, acc plugins.Accumulator) error { if g.c == nil { - _, _, err := net.SplitHostPort(addr) + _, _, err := net.SplitHostPort(addr.Host) if err != nil { - addr = addr + ":" + defaultPort + addr.Host = addr.Host + ":" + defaultPort } - c, err := net.Dial("tcp", addr) + c, err := net.Dial("tcp", addr.Host) if err != nil { - return fmt.Errorf("Unable to connect to redis server '%s': %s", addr, err) + return fmt.Errorf("Unable to connect to redis server '%s': %s", addr.Host, err) + } + + if addr.User != nil { + pwd, set := addr.User.Password() + if set && pwd != "" { + c.Write([]byte(fmt.Sprintf("AUTH %s\n", pwd))) + + r := bufio.NewReader(c) + + line, err := r.ReadString('\n') + if err != nil { + return err + } + if line[0] != '+' { + return fmt.Errorf("%s", strings.TrimSpace(line)[1:]) + } + } } g.c = c @@ -157,7 +188,7 @@ func (g *Redis) gatherServer(addr string, acc plugins.Accumulator) error { continue } - tags := map[string]string{"host": addr} + tags := map[string]string{"host": addr.String()} val := strings.TrimSpace(parts[1]) ival, err := strconv.ParseUint(val, 10, 64) diff --git a/plugins/redis/redis_test.go b/plugins/redis/redis_test.go index 52f3c9b62..317fde783 100644 --- a/plugins/redis/redis_test.go +++ b/plugins/redis/redis_test.go @@ -40,7 +40,7 @@ func TestRedisGeneratesMetrics(t *testing.T) { } }() - addr := l.Addr().String() + addr := fmt.Sprintf("redis://%s", l.Addr().String()) r := &Redis{ Servers: []string{addr}, @@ -131,7 +131,7 @@ func TestRedisCanPullStatsFromMultipleServers(t *testing.T) { } }() - addr := l.Addr().String() + addr := fmt.Sprintf("redis://%s", l.Addr().String()) r := &Redis{ Servers: []string{addr},