186 lines
3.8 KiB
Go
186 lines
3.8 KiB
Go
package memcached
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"fmt"
|
|
"net"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/internal/errchan"
|
|
"github.com/influxdata/telegraf/plugins/inputs"
|
|
)
|
|
|
|
// Memcached is a memcached plugin
|
|
type Memcached struct {
|
|
Servers []string
|
|
UnixSockets []string
|
|
}
|
|
|
|
var sampleConfig = `
|
|
## An array of address to gather stats about. Specify an ip on hostname
|
|
## with optional port. ie localhost, 10.0.0.1:11211, etc.
|
|
servers = ["localhost:11211"]
|
|
# unix_sockets = ["/var/run/memcached.sock"]
|
|
`
|
|
|
|
var defaultTimeout = 5 * time.Second
|
|
|
|
// The list of metrics that should be sent
|
|
var sendMetrics = []string{
|
|
"get_hits",
|
|
"get_misses",
|
|
"evictions",
|
|
"limit_maxbytes",
|
|
"bytes",
|
|
"uptime",
|
|
"curr_items",
|
|
"total_items",
|
|
"curr_connections",
|
|
"total_connections",
|
|
"connection_structures",
|
|
"cmd_get",
|
|
"cmd_set",
|
|
"delete_hits",
|
|
"delete_misses",
|
|
"incr_hits",
|
|
"incr_misses",
|
|
"decr_hits",
|
|
"decr_misses",
|
|
"cas_hits",
|
|
"cas_misses",
|
|
"bytes_read",
|
|
"bytes_written",
|
|
"threads",
|
|
"conn_yields",
|
|
}
|
|
|
|
// SampleConfig returns sample configuration message
|
|
func (m *Memcached) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
// Description returns description of Memcached plugin
|
|
func (m *Memcached) Description() string {
|
|
return "Read metrics from one or many memcached servers"
|
|
}
|
|
|
|
// Gather reads stats from all configured servers accumulates stats
|
|
func (m *Memcached) Gather(acc telegraf.Accumulator) error {
|
|
if len(m.Servers) == 0 && len(m.UnixSockets) == 0 {
|
|
return m.gatherServer(":11211", false, acc)
|
|
}
|
|
|
|
errChan := errchan.New(len(m.Servers) + len(m.UnixSockets))
|
|
for _, serverAddress := range m.Servers {
|
|
errChan.C <- m.gatherServer(serverAddress, false, acc)
|
|
}
|
|
|
|
for _, unixAddress := range m.UnixSockets {
|
|
errChan.C <- m.gatherServer(unixAddress, true, acc)
|
|
}
|
|
|
|
return errChan.Error()
|
|
}
|
|
|
|
func (m *Memcached) gatherServer(
|
|
address string,
|
|
unix bool,
|
|
acc telegraf.Accumulator,
|
|
) error {
|
|
var conn net.Conn
|
|
var err error
|
|
if unix {
|
|
conn, err = net.DialTimeout("unix", address, defaultTimeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer conn.Close()
|
|
} else {
|
|
_, _, err = net.SplitHostPort(address)
|
|
if err != nil {
|
|
address = address + ":11211"
|
|
}
|
|
|
|
conn, err = net.DialTimeout("tcp", address, defaultTimeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer conn.Close()
|
|
}
|
|
|
|
if conn == nil {
|
|
return fmt.Errorf("Failed to create net connection")
|
|
}
|
|
|
|
// Extend connection
|
|
conn.SetDeadline(time.Now().Add(defaultTimeout))
|
|
|
|
// Read and write buffer
|
|
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
|
|
|
|
// Send command
|
|
if _, err := fmt.Fprint(rw, "stats\r\n"); err != nil {
|
|
return err
|
|
}
|
|
if err := rw.Flush(); err != nil {
|
|
return err
|
|
}
|
|
|
|
values, err := parseResponse(rw.Reader)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Add server address as a tag
|
|
tags := map[string]string{"server": address}
|
|
|
|
// Process values
|
|
fields := make(map[string]interface{})
|
|
for _, key := range sendMetrics {
|
|
if value, ok := values[key]; ok {
|
|
// Mostly it is the number
|
|
if iValue, errParse := strconv.ParseInt(value, 10, 64); errParse == nil {
|
|
fields[key] = iValue
|
|
} else {
|
|
fields[key] = value
|
|
}
|
|
}
|
|
}
|
|
acc.AddFields("memcached", fields, tags)
|
|
return nil
|
|
}
|
|
|
|
func parseResponse(r *bufio.Reader) (map[string]string, error) {
|
|
values := make(map[string]string)
|
|
|
|
for {
|
|
// Read line
|
|
line, _, errRead := r.ReadLine()
|
|
if errRead != nil {
|
|
return values, errRead
|
|
}
|
|
// Done
|
|
if bytes.Equal(line, []byte("END")) {
|
|
break
|
|
}
|
|
// Read values
|
|
s := bytes.SplitN(line, []byte(" "), 3)
|
|
if len(s) != 3 || !bytes.Equal(s[0], []byte("STAT")) {
|
|
return values, fmt.Errorf("unexpected line in stats response: %q", line)
|
|
}
|
|
|
|
// Save values
|
|
values[string(s[1])] = string(s[2])
|
|
}
|
|
return values, nil
|
|
}
|
|
|
|
func init() {
|
|
inputs.Add("memcached", func() telegraf.Input {
|
|
return &Memcached{}
|
|
})
|
|
}
|