memcached plugin: support unix sockets

closes #415
This commit is contained in:
Cameron Sparr 2015-12-03 12:08:01 -07:00
parent 7a2eeb7439
commit 03863bd84d
2 changed files with 38 additions and 17 deletions

View File

@ -4,6 +4,7 @@
- [#412](https://github.com/influxdb/telegraf/pull/412): Additional memcached stats. Thanks @mgresser!
- [#410](https://github.com/influxdb/telegraf/pull/410): Additional redis metrics. Thanks @vlaadbrain!
- [#414](https://github.com/influxdb/telegraf/issues/414): Jolokia plugin auth parameters
- [#415](https://github.com/influxdb/telegraf/issues/415): memcached plugin: support unix sockets
### Bugfixes
- [#405](https://github.com/influxdb/telegraf/issues/405): Prometheus output cardinality issue

View File

@ -14,6 +14,7 @@ import (
// Memcached is a memcached plugin
type Memcached struct {
Servers []string
UnixSockets []string
}
var sampleConfig = `
@ -21,7 +22,8 @@ var sampleConfig = `
# with optional port. ie localhost, 10.0.0.1:11211, etc.
#
# If no servers are specified, then localhost is used as the host.
servers = ["localhost"]
servers = ["localhost:11211"]
# unix_sockets = ["/var/run/memcached.sock"]
`
var defaultTimeout = 5 * time.Second
@ -68,12 +70,18 @@ func (m *Memcached) Description() string {
// Gather reads stats from all configured servers accumulates stats
func (m *Memcached) Gather(acc plugins.Accumulator) error {
if len(m.Servers) == 0 {
return m.gatherServer(":11211", acc)
if len(m.Servers) == 0 && len(m.UnixSockets) == 0 {
return m.gatherServer(":11211", false, acc)
}
for _, serverAddress := range m.Servers {
if err := m.gatherServer(serverAddress, acc); err != nil {
if err := m.gatherServer(serverAddress, false, acc); err != nil {
return err
}
}
for _, unixAddress := range m.UnixSockets {
if err := m.gatherServer(unixAddress, true, acc); err != nil {
return err
}
}
@ -81,18 +89,30 @@ func (m *Memcached) Gather(acc plugins.Accumulator) error {
return nil
}
func (m *Memcached) gatherServer(address string, acc plugins.Accumulator) error {
func (m *Memcached) gatherServer(
address string,
unix bool,
acc plugins.Accumulator,
) error {
var conn net.Conn
if unix {
conn, err := net.DialTimeout("unix", address, defaultTimeout)
if err != nil {
return err
}
defer conn.Close()
} else {
_, _, err := net.SplitHostPort(address)
if err != nil {
address = address + ":11211"
}
// Connect
conn, err := net.DialTimeout("tcp", address, defaultTimeout)
conn, err = net.DialTimeout("tcp", address, defaultTimeout)
if err != nil {
return err
}
defer conn.Close()
}
// Extend connection
conn.SetDeadline(time.Now().Add(defaultTimeout))
@ -101,10 +121,10 @@ func (m *Memcached) gatherServer(address string, acc plugins.Accumulator) error
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
// Send command
if _, err = fmt.Fprint(rw, "stats\r\n"); err != nil {
if _, err := fmt.Fprint(rw, "stats\r\n"); err != nil {
return err
}
if err = rw.Flush(); err != nil {
if err := rw.Flush(); err != nil {
return err
}