Memcached plugin

This commit is contained in:
Maksim Naumov 2015-06-22 04:27:46 +02:00
parent 480f29bde7
commit 039fc80ed7
2 changed files with 179 additions and 0 deletions

View File

@ -0,0 +1,148 @@
package memcached
import (
"bufio"
"bytes"
"fmt"
"net"
"strconv"
"time"
"github.com/influxdb/telegraf/plugins"
)
// Memcached is a memcached plugin
type Memcached struct {
Servers []string
}
var sampleConfig = `
# An array of address to gather stats about. Specify an ip on hostname
# with optional port. ie localhost, 10.0.0.1:11211, etc.
#
# If no servers are specified, then localhost is used as the host.
servers = ["localhost"]`
var defaultTimeout = 5 * time.Second
// The list of metrics tha should be calculated
var sendAsIs = []string{
"get_hits",
"get_misses",
"evictions",
}
// SampleConfig returns sample configuration message
func (m *Memcached) SampleConfig() string {
return sampleConfig
}
// Description returns description of Memcached plugin
func (m *Memcached) Description() string {
return "Read metrics from one or many memcached servers"
}
// Gather reads stats from all configured servers accumulates stats
func (m *Memcached) Gather(acc plugins.Accumulator) error {
if len(m.Servers) == 0 {
return m.gatherServer(":11211", acc)
}
for _, serverAddress := range m.Servers {
if err := m.gatherServer(serverAddress, acc); err != nil {
return err
}
}
return nil
}
func (m *Memcached) gatherServer(address string, acc plugins.Accumulator) error {
_, _, err := net.SplitHostPort(address)
if err != nil {
address = address + ":11211"
}
// Connect
conn, err := net.DialTimeout("tcp", address, defaultTimeout)
if err != nil {
return err
}
defer conn.Close()
// Extend connection
conn.SetDeadline(time.Now().Add(defaultTimeout))
// Read and write buffer
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
// Send command
if _, err = fmt.Fprint(rw, "stats\r\n"); err != nil {
return err
}
if err = rw.Flush(); err != nil {
return err
}
// Read response
values := make(map[string]string)
for {
// Read line
line, _, errRead := rw.Reader.ReadLine()
if errRead != nil {
return errRead
}
// Done
if bytes.Equal(line, []byte("END")) {
break
}
// Read values
var name, value string
n, errScan := fmt.Sscanf(string(line), "STAT %s %s\r\n", &name, &value)
if errScan != nil || n != 2 {
return fmt.Errorf("unexpected line in stats response: %q", line)
}
// Save values
values[name] = value
}
//
tags := map[string]string{"server": address}
// Process values
for _, key := range sendAsIs {
if value, ok := values[key]; ok {
// Mostly it is the number
if iValue, errParse := strconv.ParseInt(value, 10, 64); errParse != nil {
acc.Add(key, value, tags)
} else {
acc.Add(key, iValue, tags)
}
}
}
// Usage
acc.Add("usage", m.calcUsage(values), tags)
return nil
}
func (m *Memcached) calcUsage(values map[string]string) float64 {
maxBytes, maxOk := values["limit_maxbytes"]
bytes, bytesOk := values["bytes"]
if maxOk && bytesOk {
if fMax, errMax := strconv.ParseFloat(maxBytes, 64); errMax == nil && fMax > 0 {
if fBytes, errBytes := strconv.ParseFloat(bytes, 64); errBytes == nil {
return fBytes / fMax
}
}
}
return 0
}
func init() {
plugins.Add("memcached", func() plugins.Plugin {
return &Memcached{}
})
}

View File

@ -0,0 +1,31 @@
package memcached
import (
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestMemcachedGeneratesMetrics(t *testing.T) {
m := &Memcached{
Servers: []string{"localhost"},
}
var acc testutil.Accumulator
err := m.Gather(&acc)
require.NoError(t, err)
intMetrics := []string{"get_hits", "get_misses", "evictions"}
floatMetrics := []string{"usage"}
for _, metric := range intMetrics {
assert.True(t, acc.HasIntValue(metric), metric)
}
for _, metric := range floatMetrics {
assert.True(t, acc.HasFloatValue(metric), metric)
}
}