package mcrouter import ( "bufio" "context" "fmt" "net" "net/url" "strconv" "strings" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" ) // Mcrouter is a mcrouter plugin type Mcrouter struct { Servers []string Timeout internal.Duration } // enum for statType type statType int const ( typeInt statType = iota typeFloat statType = iota ) var sampleConfig = ` ## An array of address to gather stats about. Specify an ip or hostname ## with port. ie tcp://localhost:11211, tcp://10.0.0.1:11211, etc. servers = ["tcp://localhost:11211", "unix:///var/run/mcrouter.sock"] ## Timeout for metric collections from all servers. Minimum timeout is "1s". # timeout = "5s" ` var defaultTimeout = 5 * time.Second var defaultServerURL = url.URL{ Scheme: "tcp", Host: "localhost:11211", } // The list of metrics that should be sent var sendMetrics = map[string]statType{ "uptime": typeInt, "num_servers": typeInt, "num_servers_new": typeInt, "num_servers_up": typeInt, "num_servers_down": typeInt, "num_servers_closed": typeInt, "num_clients": typeInt, "num_suspect_servers": typeInt, "destination_batches_sum": typeInt, "destination_requests_sum": typeInt, "outstanding_route_get_reqs_queued": typeInt, "outstanding_route_update_reqs_queued": typeInt, "outstanding_route_get_avg_queue_size": typeInt, "outstanding_route_update_avg_queue_size": typeInt, "outstanding_route_get_avg_wait_time_sec": typeInt, "outstanding_route_update_avg_wait_time_sec": typeInt, "retrans_closed_connections": typeInt, "destination_pending_reqs": typeInt, "destination_inflight_reqs": typeInt, "destination_batch_size": typeInt, "asynclog_requests": typeInt, "proxy_reqs_processing": typeInt, "proxy_reqs_waiting": typeInt, "client_queue_notify_period": typeInt, "rusage_system": typeFloat, "rusage_user": typeFloat, "ps_num_minor_faults": typeInt, "ps_num_major_faults": typeInt, "ps_user_time_sec": typeFloat, "ps_system_time_sec": typeFloat, "ps_vsize": typeInt, "ps_rss": typeInt, "fibers_allocated": typeInt, "fibers_pool_size": typeInt, "fibers_stack_high_watermark": typeInt, "successful_client_connections": typeInt, "duration_us": typeInt, "destination_max_pending_reqs": typeInt, "destination_max_inflight_reqs": typeInt, "retrans_per_kbyte_max": typeInt, "cmd_get_count": typeInt, "cmd_delete_out": typeInt, "cmd_lease_get": typeInt, "cmd_set": typeInt, "cmd_get_out_all": typeInt, "cmd_get_out": typeInt, "cmd_lease_set_count": typeInt, "cmd_other_out_all": typeInt, "cmd_lease_get_out": typeInt, "cmd_set_count": typeInt, "cmd_lease_set_out": typeInt, "cmd_delete_count": typeInt, "cmd_other": typeInt, "cmd_delete": typeInt, "cmd_get": typeInt, "cmd_lease_set": typeInt, "cmd_set_out": typeInt, "cmd_lease_get_count": typeInt, "cmd_other_out": typeInt, "cmd_lease_get_out_all": typeInt, "cmd_set_out_all": typeInt, "cmd_other_count": typeInt, "cmd_delete_out_all": typeInt, "cmd_lease_set_out_all": typeInt, } // SampleConfig returns sample configuration message func (m *Mcrouter) SampleConfig() string { return sampleConfig } // Description returns description of Mcrouter plugin func (m *Mcrouter) Description() string { return "Read metrics from one or many mcrouter servers" } // Gather reads stats from all configured servers accumulates stats func (m *Mcrouter) Gather(acc telegraf.Accumulator) error { ctx := context.Background() if m.Timeout.Duration < 1*time.Second { m.Timeout.Duration = defaultTimeout } ctx, cancel := context.WithTimeout(ctx, m.Timeout.Duration) defer cancel() if len(m.Servers) == 0 { m.Servers = []string{defaultServerURL.String()} } for _, serverAddress := range m.Servers { acc.AddError(m.gatherServer(ctx, serverAddress, acc)) } return nil } // ParseAddress parses an address string into 'host:port' and 'protocol' parts func (m *Mcrouter) ParseAddress(address string) (string, string, error) { var protocol string var host string var port string u, parseError := url.Parse(address) if parseError != nil { return "", "", fmt.Errorf("Invalid server address") } if u.Scheme != "tcp" && u.Scheme != "unix" { return "", "", fmt.Errorf("Invalid server protocol") } protocol = u.Scheme if protocol == "unix" { if u.Path == "" { return "", "", fmt.Errorf("Invalid unix socket path") } address = u.Path } else { if u.Host == "" { return "", "", fmt.Errorf("Invalid host") } host = u.Hostname() port = u.Port() if host == "" { host = defaultServerURL.Hostname() } if port == "" { port = defaultServerURL.Port() } address = host + ":" + port } return address, protocol, nil } func (m *Mcrouter) gatherServer(ctx context.Context, address string, acc telegraf.Accumulator) error { var conn net.Conn var err error var protocol string var dialer net.Dialer address, protocol, err = m.ParseAddress(address) conn, err = dialer.DialContext(ctx, protocol, address) if err != nil { return err } defer conn.Close() // Extend connection deadline, ok := ctx.Deadline() if ok { conn.SetDeadline(deadline) } // Read and write buffer reader := bufio.NewReader(conn) scanner := bufio.NewScanner(reader) // Send command if _, err := fmt.Fprint(conn, "stats\r\n"); err != nil { return err } values, err := parseResponse(scanner) if err != nil { return err } // Add server address as a tag tags := map[string]string{"server": address} // Process values fields := make(map[string]interface{}) for key, sType := range sendMetrics { if value, ok := values[key]; ok { switch sType { case typeInt: if v, errParse := strconv.ParseInt(value, 10, 64); errParse == nil { fields[key] = v } case typeFloat: if v, errParse := strconv.ParseFloat(value, 64); errParse == nil { fields[key] = v } default: } } } acc.AddFields("mcrouter", fields, tags) return nil } func parseResponse(r *bufio.Scanner) (map[string]string, error) { values := make(map[string]string) for r.Scan() { // Read line line := r.Text() // Done if line == "END" { break } // Read values s := strings.SplitN(line, " ", 3) if len(s) != 3 || s[0] != "STAT" { return nil, fmt.Errorf("unexpected line in stats response: %s", line) } // Save values values[s[1]] = s[2] } return values, nil } func init() { inputs.Add("mcrouter", func() telegraf.Input { return &Mcrouter{} }) }