From 039fc80ed790302e94fcb57605e4ef103ad3aea3 Mon Sep 17 00:00:00 2001 From: Maksim Naumov Date: Mon, 22 Jun 2015 04:27:46 +0200 Subject: [PATCH 1/6] Memcached plugin --- plugins/memcached/memcached.go | 148 ++++++++++++++++++++++++++++ plugins/memcached/memcached_test.go | 31 ++++++ 2 files changed, 179 insertions(+) create mode 100644 plugins/memcached/memcached.go create mode 100644 plugins/memcached/memcached_test.go diff --git a/plugins/memcached/memcached.go b/plugins/memcached/memcached.go new file mode 100644 index 000000000..9de2a5959 --- /dev/null +++ b/plugins/memcached/memcached.go @@ -0,0 +1,148 @@ +package memcached + +import ( + "bufio" + "bytes" + "fmt" + "net" + "strconv" + "time" + + "github.com/influxdb/telegraf/plugins" +) + +// Memcached is a memcached plugin +type Memcached struct { + Servers []string +} + +var sampleConfig = ` +# An array of address to gather stats about. Specify an ip on hostname +# with optional port. ie localhost, 10.0.0.1:11211, etc. +# +# If no servers are specified, then localhost is used as the host. +servers = ["localhost"]` + +var defaultTimeout = 5 * time.Second + +// The list of metrics tha should be calculated +var sendAsIs = []string{ + "get_hits", + "get_misses", + "evictions", +} + +// SampleConfig returns sample configuration message +func (m *Memcached) SampleConfig() string { + return sampleConfig +} + +// Description returns description of Memcached plugin +func (m *Memcached) Description() string { + return "Read metrics from one or many memcached servers" +} + +// Gather reads stats from all configured servers accumulates stats +func (m *Memcached) Gather(acc plugins.Accumulator) error { + if len(m.Servers) == 0 { + return m.gatherServer(":11211", acc) + } + + for _, serverAddress := range m.Servers { + if err := m.gatherServer(serverAddress, acc); err != nil { + return err + } + } + + return nil +} + +func (m *Memcached) gatherServer(address string, acc plugins.Accumulator) error { + _, _, err := net.SplitHostPort(address) + if err != nil { + address = address + ":11211" + } + + // Connect + conn, err := net.DialTimeout("tcp", address, defaultTimeout) + if err != nil { + return err + } + defer conn.Close() + + // Extend connection + conn.SetDeadline(time.Now().Add(defaultTimeout)) + + // Read and write buffer + rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + // Send command + if _, err = fmt.Fprint(rw, "stats\r\n"); err != nil { + return err + } + if err = rw.Flush(); err != nil { + return err + } + + // Read response + values := make(map[string]string) + + for { + // Read line + line, _, errRead := rw.Reader.ReadLine() + if errRead != nil { + return errRead + } + // Done + if bytes.Equal(line, []byte("END")) { + break + } + // Read values + var name, value string + n, errScan := fmt.Sscanf(string(line), "STAT %s %s\r\n", &name, &value) + if errScan != nil || n != 2 { + return fmt.Errorf("unexpected line in stats response: %q", line) + } + + // Save values + values[name] = value + } + + // + tags := map[string]string{"server": address} + + // Process values + for _, key := range sendAsIs { + if value, ok := values[key]; ok { + // Mostly it is the number + if iValue, errParse := strconv.ParseInt(value, 10, 64); errParse != nil { + acc.Add(key, value, tags) + } else { + acc.Add(key, iValue, tags) + } + } + } + + // Usage + acc.Add("usage", m.calcUsage(values), tags) + return nil +} + +func (m *Memcached) calcUsage(values map[string]string) float64 { + maxBytes, maxOk := values["limit_maxbytes"] + bytes, bytesOk := values["bytes"] + if maxOk && bytesOk { + if fMax, errMax := strconv.ParseFloat(maxBytes, 64); errMax == nil && fMax > 0 { + if fBytes, errBytes := strconv.ParseFloat(bytes, 64); errBytes == nil { + return fBytes / fMax + } + } + } + return 0 +} + +func init() { + plugins.Add("memcached", func() plugins.Plugin { + return &Memcached{} + }) +} diff --git a/plugins/memcached/memcached_test.go b/plugins/memcached/memcached_test.go new file mode 100644 index 000000000..04184fff6 --- /dev/null +++ b/plugins/memcached/memcached_test.go @@ -0,0 +1,31 @@ +package memcached + +import ( + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMemcachedGeneratesMetrics(t *testing.T) { + m := &Memcached{ + Servers: []string{"localhost"}, + } + + var acc testutil.Accumulator + + err := m.Gather(&acc) + require.NoError(t, err) + + intMetrics := []string{"get_hits", "get_misses", "evictions"} + floatMetrics := []string{"usage"} + + for _, metric := range intMetrics { + assert.True(t, acc.HasIntValue(metric), metric) + } + + for _, metric := range floatMetrics { + assert.True(t, acc.HasFloatValue(metric), metric) + } +} From e1c7dc80aeb9473e54c9368a78e5b5ca4723b400 Mon Sep 17 00:00:00 2001 From: JP Date: Sat, 20 Jun 2015 13:40:20 -0500 Subject: [PATCH 2/6] redis plugin accepts URI or string, support Redis AUTH --- plugins/redis/redis.go | 47 ++++++++++++++++++++++++++++++------- plugins/redis/redis_test.go | 4 ++-- 2 files changed, 40 insertions(+), 11 deletions(-) diff --git a/plugins/redis/redis.go b/plugins/redis/redis.go index 2b56ac8c3..24c7fb6bd 100644 --- a/plugins/redis/redis.go +++ b/plugins/redis/redis.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net" + "net/url" "strconv" "strings" "sync" @@ -20,8 +21,9 @@ type Redis struct { } var sampleConfig = ` -# An array of address to gather stats about. Specify an ip on hostname -# with optional port. ie localhost, 10.10.3.33:18832, etc. +# An array of URI to gather stats about. Specify an ip or hostname +# with optional port add password. ie redis://localhost, redis://10.10.3.33:18832, +# 10.0.0.1:10000, etc. # # If no servers are specified, then localhost is used as the host. servers = ["localhost"]` @@ -73,7 +75,10 @@ var ErrProtocolError = errors.New("redis protocol error") // Returns one of the errors encountered while gather stats (if any). func (g *Redis) Gather(acc plugins.Accumulator) error { if len(g.Servers) == 0 { - g.gatherServer(":6379", acc) + url := &url.URL{ + Host: ":6379", + } + g.gatherServer(url, acc) return nil } @@ -82,10 +87,17 @@ func (g *Redis) Gather(acc plugins.Accumulator) error { var outerr error for _, serv := range g.Servers { + u, err := url.Parse(serv) + if err != nil { + return fmt.Errorf("Unable to parse to address '%s': %s", serv, err) + } else if u.Scheme == "" { + // fallback to simple string based address (i.e. "10.0.0.1:10000") + u.Host = serv + } wg.Add(1) go func(serv string) { defer wg.Done() - outerr = g.gatherServer(serv, acc) + outerr = g.gatherServer(u, acc) }(serv) } @@ -96,17 +108,34 @@ func (g *Redis) Gather(acc plugins.Accumulator) error { const defaultPort = "6379" -func (g *Redis) gatherServer(addr string, acc plugins.Accumulator) error { +func (g *Redis) gatherServer(addr *url.URL, acc plugins.Accumulator) error { if g.c == nil { - _, _, err := net.SplitHostPort(addr) + _, _, err := net.SplitHostPort(addr.Host) if err != nil { - addr = addr + ":" + defaultPort + addr.Host = addr.Host + ":" + defaultPort } - c, err := net.Dial("tcp", addr) + c, err := net.Dial("tcp", addr.Host) if err != nil { - return fmt.Errorf("Unable to connect to redis server '%s': %s", addr, err) + return fmt.Errorf("Unable to connect to redis server '%s': %s", addr.Host, err) + } + + if addr.User != nil { + pwd, set := addr.User.Password() + if set && pwd != "" { + c.Write([]byte(fmt.Sprintf("AUTH %s\n", pwd))) + + r := bufio.NewReader(c) + + line, err := r.ReadString('\n') + if err != nil { + return err + } + if line[0] != '+' { + return fmt.Errorf("%s", strings.TrimSpace(line)[1:]) + } + } } g.c = c diff --git a/plugins/redis/redis_test.go b/plugins/redis/redis_test.go index 52f3c9b62..317fde783 100644 --- a/plugins/redis/redis_test.go +++ b/plugins/redis/redis_test.go @@ -40,7 +40,7 @@ func TestRedisGeneratesMetrics(t *testing.T) { } }() - addr := l.Addr().String() + addr := fmt.Sprintf("redis://%s", l.Addr().String()) r := &Redis{ Servers: []string{addr}, @@ -131,7 +131,7 @@ func TestRedisCanPullStatsFromMultipleServers(t *testing.T) { } }() - addr := l.Addr().String() + addr := fmt.Sprintf("redis://%s", l.Addr().String()) r := &Redis{ Servers: []string{addr}, From b86d789abe3f55ff611220fb87be12e42bc632dc Mon Sep 17 00:00:00 2001 From: Maksim Naumov Date: Tue, 23 Jun 2015 09:44:39 +0200 Subject: [PATCH 3/6] Explore "limit_maxbytes" and "bytes" individually --- plugins/memcached/memcached.go | 18 ++---------------- plugins/memcached/memcached_test.go | 7 +------ 2 files changed, 3 insertions(+), 22 deletions(-) diff --git a/plugins/memcached/memcached.go b/plugins/memcached/memcached.go index 9de2a5959..802fae35d 100644 --- a/plugins/memcached/memcached.go +++ b/plugins/memcached/memcached.go @@ -30,6 +30,8 @@ var sendAsIs = []string{ "get_hits", "get_misses", "evictions", + "limit_maxbytes", + "bytes", } // SampleConfig returns sample configuration message @@ -122,25 +124,9 @@ func (m *Memcached) gatherServer(address string, acc plugins.Accumulator) error } } } - - // Usage - acc.Add("usage", m.calcUsage(values), tags) return nil } -func (m *Memcached) calcUsage(values map[string]string) float64 { - maxBytes, maxOk := values["limit_maxbytes"] - bytes, bytesOk := values["bytes"] - if maxOk && bytesOk { - if fMax, errMax := strconv.ParseFloat(maxBytes, 64); errMax == nil && fMax > 0 { - if fBytes, errBytes := strconv.ParseFloat(bytes, 64); errBytes == nil { - return fBytes / fMax - } - } - } - return 0 -} - func init() { plugins.Add("memcached", func() plugins.Plugin { return &Memcached{} diff --git a/plugins/memcached/memcached_test.go b/plugins/memcached/memcached_test.go index 04184fff6..08e696fb7 100644 --- a/plugins/memcached/memcached_test.go +++ b/plugins/memcached/memcached_test.go @@ -18,14 +18,9 @@ func TestMemcachedGeneratesMetrics(t *testing.T) { err := m.Gather(&acc) require.NoError(t, err) - intMetrics := []string{"get_hits", "get_misses", "evictions"} - floatMetrics := []string{"usage"} + intMetrics := []string{"get_hits", "get_misses", "evictions", "limit_maxbytes", "bytes"} for _, metric := range intMetrics { assert.True(t, acc.HasIntValue(metric), metric) } - - for _, metric := range floatMetrics { - assert.True(t, acc.HasFloatValue(metric), metric) - } } From 5fbd07b146c66ed3b05765b9cefeb3cdf148d6e1 Mon Sep 17 00:00:00 2001 From: Evan Phoenix Date: Tue, 23 Jun 2015 14:51:32 -0700 Subject: [PATCH 4/6] Add memcached to the all plugins package --- plugins/all/all.go | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/all/all.go b/plugins/all/all.go index 8acedb33f..71381aa87 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -1,6 +1,7 @@ package all import ( + _ "github.com/influxdb/telegraf/plugins/memcached" _ "github.com/influxdb/telegraf/plugins/mysql" _ "github.com/influxdb/telegraf/plugins/postgresql" _ "github.com/influxdb/telegraf/plugins/redis" From a1f7d5549bd279d8604a0cb421e46c2b0647daaf Mon Sep 17 00:00:00 2001 From: Evan Phoenix Date: Tue, 23 Jun 2015 14:51:38 -0700 Subject: [PATCH 5/6] Fix type error using URL as a string --- plugins/redis/redis.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/redis/redis.go b/plugins/redis/redis.go index 6f2648c7b..e936ae723 100644 --- a/plugins/redis/redis.go +++ b/plugins/redis/redis.go @@ -186,7 +186,7 @@ func (g *Redis) gatherServer(addr *url.URL, acc plugins.Accumulator) error { continue } - tags := map[string]string{"host": addr} + tags := map[string]string{"host": addr.String()} val := strings.TrimSpace(parts[1]) ival, err := strconv.ParseUint(val, 10, 64) From 86a6f337f68194603d29082b42eaca4853d390c9 Mon Sep 17 00:00:00 2001 From: Evan Phoenix Date: Tue, 23 Jun 2015 14:51:55 -0700 Subject: [PATCH 6/6] Cleanup the URL when one isn't specified --- plugins/redis/redis.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/plugins/redis/redis.go b/plugins/redis/redis.go index e936ae723..831d74dbe 100644 --- a/plugins/redis/redis.go +++ b/plugins/redis/redis.go @@ -92,7 +92,9 @@ func (g *Redis) Gather(acc plugins.Accumulator) error { return fmt.Errorf("Unable to parse to address '%s': %s", serv, err) } else if u.Scheme == "" { // fallback to simple string based address (i.e. "10.0.0.1:10000") + u.Scheme = "tcp" u.Host = serv + u.Path = "" } wg.Add(1) go func(serv string) {