Add input plugin for McRouter (#4077)

This commit is contained in:
Craig Thayer 2018-05-01 11:58:15 -07:00 committed by Daniel Nelson
parent f094f83da5
commit 83345ec2b3
4 changed files with 640 additions and 0 deletions

View File

@ -50,6 +50,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/logparser"
_ "github.com/influxdata/telegraf/plugins/inputs/lustre2"
_ "github.com/influxdata/telegraf/plugins/inputs/mailchimp"
_ "github.com/influxdata/telegraf/plugins/inputs/mcrouter"
_ "github.com/influxdata/telegraf/plugins/inputs/memcached"
_ "github.com/influxdata/telegraf/plugins/inputs/mesos"
_ "github.com/influxdata/telegraf/plugins/inputs/minecraft"

View File

@ -0,0 +1,103 @@
# Mcrouter Input Plugin
This plugin gathers statistics data from a Mcrouter server.
### Configuration:
```toml
# Read metrics from one or many mcrouter servers.
[[inputs.mcrouter]]
## An array of address to gather stats about. Specify an ip or hostname
## with port. ie tcp://localhost:11211, tcp://10.0.0.1:11211, etc.
servers = ["tcp://localhost:11211", "unix:///var/run/mcrouter.sock"]
## Timeout for metric collections from all servers. Minimum timeout is "1s".
# timeout = "5s"
```
### Measurements & Fields:
The fields from this plugin are gathered in the *mcrouter* measurement.
Description of gathered fields can be found [here](https://github.com/facebook/mcrouter/wiki/Stats-list).
Fields:
* uptime
* num_servers
* num_servers_new
* num_servers_up
* num_servers_down
* num_servers_closed
* num_clients
* num_suspect_servers
* destination_batches_sum
* destination_requests_sum
* outstanding_route_get_reqs_queued
* outstanding_route_update_reqs_queued
* outstanding_route_get_avg_queue_size
* outstanding_route_update_avg_queue_size
* outstanding_route_get_avg_wait_time_sec
* outstanding_route_update_avg_wait_time_sec
* retrans_closed_connections
* destination_pending_reqs
* destination_inflight_reqs
* destination_batch_size
* asynclog_requests
* proxy_reqs_processing
* proxy_reqs_waiting
* client_queue_notify_period
* rusage_system
* rusage_user
* ps_num_minor_faults
* ps_num_major_faults
* ps_user_time_sec
* ps_system_time_sec
* ps_vsize
* ps_rss
* fibers_allocated
* fibers_pool_size
* fibers_stack_high_watermark
* successful_client_connections
* duration_us
* destination_max_pending_reqs
* destination_max_inflight_reqs
* retrans_per_kbyte_max
* cmd_get_count
* cmd_delete_out
* cmd_lease_get
* cmd_set
* cmd_get_out_all
* cmd_get_out
* cmd_lease_set_count
* cmd_other_out_all
* cmd_lease_get_out
* cmd_set_count
* cmd_lease_set_out
* cmd_delete_count
* cmd_other
* cmd_delete
* cmd_get
* cmd_lease_set
* cmd_set_out
* cmd_lease_get_count
* cmd_other_out
* cmd_lease_get_out_all
* cmd_set_out_all
* cmd_other_count
* cmd_delete_out_all
* cmd_lease_set_out_all
### Tags:
* Mcrouter measurements have the following tags:
- server (the host name from which metrics are gathered)
### Example Output:
```
$ ./telegraf --config telegraf.conf --input-filter mcrouter --test
mcrouter,server=localhost:11211 uptime=166,num_servers=1,num_servers_new=1,num_servers_up=0,num_servers_down=0,num_servers_closed=0,num_clients=1,num_suspect_servers=0,destination_batches_sum=0,destination_requests_sum=0,outstanding_route_get_reqs_queued=0,outstanding_route_update_reqs_queued=0,outstanding_route_get_avg_queue_size=0,outstanding_route_update_avg_queue_size=0,outstanding_route_get_avg_wait_time_sec=0,outstanding_route_update_avg_wait_time_sec=0,retrans_closed_connections=0,destination_pending_reqs=0,destination_inflight_reqs=0,destination_batch_size=0,asynclog_requests=0,proxy_reqs_processing=1,proxy_reqs_waiting=0,client_queue_notify_period=0,rusage_system=0.040966,rusage_user=0.020483,ps_num_minor_faults=2490,ps_num_major_faults=11,ps_user_time_sec=0.02,ps_system_time_sec=0.04,ps_vsize=697741312,ps_rss=10563584,fibers_allocated=0,fibers_pool_size=0,fibers_stack_high_watermark=0,successful_client_connections=18,duration_us=0,destination_max_pending_reqs=0,destination_max_inflight_reqs=0,retrans_per_kbyte_max=0,cmd_get_count=0,cmd_delete_out=0,cmd_lease_get=0,cmd_set=0,cmd_get_out_all=0,cmd_get_out=0,cmd_lease_set_count=0,cmd_other_out_all=0,cmd_lease_get_out=0,cmd_set_count=0,cmd_lease_set_out=0,cmd_delete_count=0,cmd_other=0,cmd_delete=0,cmd_get=0,cmd_lease_set=0,cmd_set_out=0,cmd_lease_get_count=0,cmd_other_out=0,cmd_lease_get_out_all=0,cmd_set_out_all=0,cmd_other_count=0,cmd_delete_out_all=0,cmd_lease_set_out_all=0 1453831884664956455
```

View File

@ -0,0 +1,286 @@
package mcrouter
import (
"bufio"
"context"
"fmt"
"net"
"net/url"
"strconv"
"strings"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)
// Mcrouter is a mcrouter plugin
type Mcrouter struct {
Servers []string
Timeout internal.Duration
}
// enum for statType
type statType int
const (
typeInt statType = iota
typeFloat statType = iota
)
var sampleConfig = `
## An array of address to gather stats about. Specify an ip or hostname
## with port. ie tcp://localhost:11211, tcp://10.0.0.1:11211, etc.
servers = ["tcp://localhost:11211", "unix:///var/run/mcrouter.sock"]
## Timeout for metric collections from all servers. Minimum timeout is "1s".
# timeout = "5s"
`
var defaultTimeout = 5 * time.Second
var defaultServerURL = url.URL{
Scheme: "tcp",
Host: "localhost:11211",
}
// The list of metrics that should be sent
var sendMetrics = map[string]statType{
"uptime": typeInt,
"num_servers": typeInt,
"num_servers_new": typeInt,
"num_servers_up": typeInt,
"num_servers_down": typeInt,
"num_servers_closed": typeInt,
"num_clients": typeInt,
"num_suspect_servers": typeInt,
"destination_batches_sum": typeInt,
"destination_requests_sum": typeInt,
"outstanding_route_get_reqs_queued": typeInt,
"outstanding_route_update_reqs_queued": typeInt,
"outstanding_route_get_avg_queue_size": typeInt,
"outstanding_route_update_avg_queue_size": typeInt,
"outstanding_route_get_avg_wait_time_sec": typeInt,
"outstanding_route_update_avg_wait_time_sec": typeInt,
"retrans_closed_connections": typeInt,
"destination_pending_reqs": typeInt,
"destination_inflight_reqs": typeInt,
"destination_batch_size": typeInt,
"asynclog_requests": typeInt,
"proxy_reqs_processing": typeInt,
"proxy_reqs_waiting": typeInt,
"client_queue_notify_period": typeInt,
"rusage_system": typeFloat,
"rusage_user": typeFloat,
"ps_num_minor_faults": typeInt,
"ps_num_major_faults": typeInt,
"ps_user_time_sec": typeFloat,
"ps_system_time_sec": typeFloat,
"ps_vsize": typeInt,
"ps_rss": typeInt,
"fibers_allocated": typeInt,
"fibers_pool_size": typeInt,
"fibers_stack_high_watermark": typeInt,
"successful_client_connections": typeInt,
"duration_us": typeInt,
"destination_max_pending_reqs": typeInt,
"destination_max_inflight_reqs": typeInt,
"retrans_per_kbyte_max": typeInt,
"cmd_get_count": typeInt,
"cmd_delete_out": typeInt,
"cmd_lease_get": typeInt,
"cmd_set": typeInt,
"cmd_get_out_all": typeInt,
"cmd_get_out": typeInt,
"cmd_lease_set_count": typeInt,
"cmd_other_out_all": typeInt,
"cmd_lease_get_out": typeInt,
"cmd_set_count": typeInt,
"cmd_lease_set_out": typeInt,
"cmd_delete_count": typeInt,
"cmd_other": typeInt,
"cmd_delete": typeInt,
"cmd_get": typeInt,
"cmd_lease_set": typeInt,
"cmd_set_out": typeInt,
"cmd_lease_get_count": typeInt,
"cmd_other_out": typeInt,
"cmd_lease_get_out_all": typeInt,
"cmd_set_out_all": typeInt,
"cmd_other_count": typeInt,
"cmd_delete_out_all": typeInt,
"cmd_lease_set_out_all": typeInt,
}
// SampleConfig returns sample configuration message
func (m *Mcrouter) SampleConfig() string {
return sampleConfig
}
// Description returns description of Mcrouter plugin
func (m *Mcrouter) Description() string {
return "Read metrics from one or many mcrouter servers"
}
// Gather reads stats from all configured servers accumulates stats
func (m *Mcrouter) Gather(acc telegraf.Accumulator) error {
ctx := context.Background()
if m.Timeout.Duration < 1*time.Second {
m.Timeout.Duration = defaultTimeout
}
ctx, cancel := context.WithTimeout(ctx, m.Timeout.Duration)
defer cancel()
if len(m.Servers) == 0 {
m.Servers = []string{defaultServerURL.String()}
}
for _, serverAddress := range m.Servers {
acc.AddError(m.gatherServer(ctx, serverAddress, acc))
}
return nil
}
// ParseAddress parses an address string into 'host:port' and 'protocol' parts
func (m *Mcrouter) ParseAddress(address string) (string, string, error) {
var protocol string
var host string
var port string
u, parseError := url.Parse(address)
if parseError != nil {
return "", "", fmt.Errorf("Invalid server address")
}
if u.Scheme != "tcp" && u.Scheme != "unix" {
return "", "", fmt.Errorf("Invalid server protocol")
}
protocol = u.Scheme
if protocol == "unix" {
if u.Path == "" {
return "", "", fmt.Errorf("Invalid unix socket path")
}
address = u.Path
} else {
if u.Host == "" {
return "", "", fmt.Errorf("Invalid host")
}
host = u.Hostname()
port = u.Port()
if host == "" {
host = defaultServerURL.Hostname()
}
if port == "" {
port = defaultServerURL.Port()
}
address = host + ":" + port
}
return address, protocol, nil
}
func (m *Mcrouter) gatherServer(ctx context.Context, address string, acc telegraf.Accumulator) error {
var conn net.Conn
var err error
var protocol string
var dialer net.Dialer
address, protocol, err = m.ParseAddress(address)
conn, err = dialer.DialContext(ctx, protocol, address)
if err != nil {
return err
}
defer conn.Close()
// Extend connection
deadline, ok := ctx.Deadline()
if ok {
conn.SetDeadline(deadline)
}
// Read and write buffer
reader := bufio.NewReader(conn)
scanner := bufio.NewScanner(reader)
// Send command
if _, err := fmt.Fprint(conn, "stats\r\n"); err != nil {
return err
}
values, err := parseResponse(scanner)
if err != nil {
return err
}
// Add server address as a tag
tags := map[string]string{"server": address}
// Process values
fields := make(map[string]interface{})
for key, sType := range sendMetrics {
if value, ok := values[key]; ok {
switch sType {
case typeInt:
if v, errParse := strconv.ParseInt(value, 10, 64); errParse == nil {
fields[key] = v
}
case typeFloat:
if v, errParse := strconv.ParseFloat(value, 64); errParse == nil {
fields[key] = v
}
default:
}
}
}
acc.AddFields("mcrouter", fields, tags)
return nil
}
func parseResponse(r *bufio.Scanner) (map[string]string, error) {
values := make(map[string]string)
for r.Scan() {
// Read line
line := r.Text()
// Done
if line == "END" {
break
}
// Read values
s := strings.SplitN(line, " ", 3)
if len(s) != 3 || s[0] != "STAT" {
return nil, fmt.Errorf("unexpected line in stats response: %s", line)
}
// Save values
values[s[1]] = s[2]
}
return values, nil
}
func init() {
inputs.Add("mcrouter", func() telegraf.Input {
return &Mcrouter{}
})
}

View File

@ -0,0 +1,250 @@
package mcrouter
import (
"bufio"
"strings"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestAddressParsing(t *testing.T) {
m := &Mcrouter{
Servers: []string{"tcp://" + testutil.GetLocalHost()},
}
var acceptTests = [][3]string{
{"tcp://localhost:8086", "localhost:8086", "tcp"},
{"tcp://localhost", "localhost:" + defaultServerURL.Port(), "tcp"},
{"tcp://localhost:", "localhost:" + defaultServerURL.Port(), "tcp"},
{"tcp://:8086", defaultServerURL.Hostname() + ":8086", "tcp"},
{"tcp://:", defaultServerURL.Host, "tcp"},
}
var rejectTests = []string{
"tcp://",
}
for _, args := range acceptTests {
address, protocol, err := m.ParseAddress(args[0])
assert.Nil(t, err, args[0])
assert.True(t, address == args[1], args[0])
assert.True(t, protocol == args[2], args[0])
}
for _, addr := range rejectTests {
address, protocol, err := m.ParseAddress(addr)
assert.NotNil(t, err, addr)
assert.Empty(t, address, addr)
assert.Empty(t, protocol, addr)
}
}
func TestMcrouterGeneratesMetrics(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
m := &Mcrouter{
Servers: []string{"tcp://" + testutil.GetLocalHost()},
}
var acc testutil.Accumulator
err := acc.GatherError(m.Gather)
require.NoError(t, err)
intMetrics := []string{"uptime", "num_servers", "num_servers_new", "num_servers_up",
"num_servers_down", "num_servers_closed", "num_clients",
"num_suspect_servers", "destination_batches_sum", "destination_requests_sum",
"outstanding_route_get_reqs_queued", "outstanding_route_update_reqs_queued",
"outstanding_route_get_avg_queue_size", "outstanding_route_update_avg_queue_size",
"outstanding_route_get_avg_wait_time_sec", "outstanding_route_update_avg_wait_time_sec",
"retrans_closed_connections", "destination_pending_reqs", "destination_inflight_reqs",
"destination_batch_size", "asynclog_requests", "proxy_reqs_processing",
"proxy_reqs_waiting", "client_queue_notify_period",
"ps_num_minor_faults", "ps_num_major_faults",
"ps_vsize", "ps_rss", "fibers_allocated", "fibers_pool_size", "fibers_stack_high_watermark",
"successful_client_connections", "duration_us", "destination_max_pending_reqs",
"destination_max_inflight_reqs", "retrans_per_kbyte_max", "cmd_get_count", "cmd_delete_out",
"cmd_lease_get", "cmd_set", "cmd_get_out_all", "cmd_get_out", "cmd_lease_set_count",
"cmd_other_out_all", "cmd_lease_get_out", "cmd_set_count", "cmd_lease_set_out",
"cmd_delete_count", "cmd_other", "cmd_delete", "cmd_get", "cmd_lease_set", "cmd_set_out",
"cmd_lease_get_count", "cmd_other_out", "cmd_lease_get_out_all", "cmd_set_out_all",
"cmd_other_count", "cmd_delete_out_all", "cmd_lease_set_out_all"}
floatMetrics := []string{"rusage_system", "rusage_user", "ps_user_time_sec", "ps_system_time_sec"}
for _, metric := range intMetrics {
assert.True(t, acc.HasInt64Field("mcrouter", metric), metric)
}
for _, metric := range floatMetrics {
assert.True(t, acc.HasFloatField("mcrouter", metric), metric)
}
}
func TestMcrouterParseMetrics(t *testing.T) {
r := bufio.NewReader(strings.NewReader(mcrouterStats))
scanner := bufio.NewScanner(r)
values, err := parseResponse(scanner)
require.NoError(t, err, "Error parsing mcrouter response")
tests := []struct {
key string
value string
}{
{"uptime", "166"},
{"num_servers", "1"},
{"num_servers_new", "1"},
{"num_servers_up", "0"},
{"num_servers_down", "0"},
{"num_servers_closed", "0"},
{"num_clients", "1"},
{"num_suspect_servers", "0"},
{"destination_batches_sum", "0"},
{"destination_requests_sum", "0"},
{"outstanding_route_get_reqs_queued", "0"},
{"outstanding_route_update_reqs_queued", "0"},
{"outstanding_route_get_avg_queue_size", "0"},
{"outstanding_route_update_avg_queue_size", "0"},
{"outstanding_route_get_avg_wait_time_sec", "0"},
{"outstanding_route_update_avg_wait_time_sec", "0"},
{"retrans_closed_connections", "0"},
{"destination_pending_reqs", "0"},
{"destination_inflight_reqs", "0"},
{"destination_batch_size", "0"},
{"asynclog_requests", "0"},
{"proxy_reqs_processing", "1"},
{"proxy_reqs_waiting", "0"},
{"client_queue_notify_period", "0"},
{"rusage_system", "0.040966"},
{"rusage_user", "0.020483"},
{"ps_num_minor_faults", "2490"},
{"ps_num_major_faults", "11"},
{"ps_user_time_sec", "0.02"},
{"ps_system_time_sec", "0.04"},
{"ps_vsize", "697741312"},
{"ps_rss", "10563584"},
{"fibers_allocated", "0"},
{"fibers_pool_size", "0"},
{"fibers_stack_high_watermark", "0"},
{"successful_client_connections", "18"},
{"duration_us", "0"},
{"destination_max_pending_reqs", "0"},
{"destination_max_inflight_reqs", "0"},
{"retrans_per_kbyte_max", "0"},
{"cmd_get_count", "0"},
{"cmd_delete_out", "0"},
{"cmd_lease_get", "0"},
{"cmd_set", "0"},
{"cmd_get_out_all", "0"},
{"cmd_get_out", "0"},
{"cmd_lease_set_count", "0"},
{"cmd_other_out_all", "0"},
{"cmd_lease_get_out", "0"},
{"cmd_set_count", "0"},
{"cmd_lease_set_out", "0"},
{"cmd_delete_count", "0"},
{"cmd_other", "0"},
{"cmd_delete", "0"},
{"cmd_get", "0"},
{"cmd_lease_set", "0"},
{"cmd_set_out", "0"},
{"cmd_lease_get_count", "0"},
{"cmd_other_out", "0"},
{"cmd_lease_get_out_all", "0"},
{"cmd_set_out_all", "0"},
{"cmd_other_count", "0"},
{"cmd_delete_out_all", "0"},
{"cmd_lease_set_out_all", "0"},
}
for _, test := range tests {
value, ok := values[test.key]
if !ok {
t.Errorf("Did not find key for metric %s in values", test.key)
continue
}
if value != test.value {
t.Errorf("Metric: %s, Expected: %s, actual: %s",
test.key, test.value, value)
}
}
}
var mcrouterStats = `STAT version 36.0.0 mcrouter
STAT commandargs --port 11211 --config-file /etc/mcrouter/mcrouter.json --async-dir /var/spool/mcrouter --log-path /var/log/mcrouter/mcrouter.log --stats-root /var/mcrouter/stats --server-timeout 100 --reset-inactive-connection-interval 10000 --proxy-threads auto
STAT pid 21357
STAT parent_pid 1
STAT time 1524673265
STAT uptime 166
STAT num_servers 1
STAT num_servers_new 1
STAT num_servers_up 0
STAT num_servers_down 0
STAT num_servers_closed 0
STAT num_clients 1
STAT num_suspect_servers 0
STAT destination_batches_sum 0
STAT destination_requests_sum 0
STAT outstanding_route_get_reqs_queued 0
STAT outstanding_route_update_reqs_queued 0
STAT outstanding_route_get_avg_queue_size 0
STAT outstanding_route_update_avg_queue_size 0
STAT outstanding_route_get_avg_wait_time_sec 0
STAT outstanding_route_update_avg_wait_time_sec 0
STAT retrans_closed_connections 0
STAT destination_pending_reqs 0
STAT destination_inflight_reqs 0
STAT destination_batch_size 0
STAT asynclog_requests 0
STAT proxy_reqs_processing 1
STAT proxy_reqs_waiting 0
STAT client_queue_notify_period 0
STAT rusage_system 0.040966
STAT rusage_user 0.020483
STAT ps_num_minor_faults 2490
STAT ps_num_major_faults 11
STAT ps_user_time_sec 0.02
STAT ps_system_time_sec 0.04
STAT ps_vsize 697741312
STAT ps_rss 10563584
STAT fibers_allocated 0
STAT fibers_pool_size 0
STAT fibers_stack_high_watermark 0
STAT successful_client_connections 18
STAT duration_us 0
STAT destination_max_pending_reqs 0
STAT destination_max_inflight_reqs 0
STAT retrans_per_kbyte_max 0
STAT cmd_get_count 0
STAT cmd_delete_out 0
STAT cmd_lease_get 0
STAT cmd_set 0
STAT cmd_get_out_all 0
STAT cmd_get_out 0
STAT cmd_lease_set_count 0
STAT cmd_other_out_all 0
STAT cmd_lease_get_out 0
STAT cmd_set_count 0
STAT cmd_lease_set_out 0
STAT cmd_delete_count 0
STAT cmd_other 0
STAT cmd_delete 0
STAT cmd_get 0
STAT cmd_lease_set 0
STAT cmd_set_out 0
STAT cmd_lease_get_count 0
STAT cmd_other_out 0
STAT cmd_lease_get_out_all 0
STAT cmd_set_out_all 0
STAT cmd_other_count 0
STAT cmd_delete_out_all 0
STAT cmd_lease_set_out_all 0
END
`