Add PHPFPM stat
- HTTP status or Socket status
- Collect those metric:
    accepted conn:
    listen queue:
    max listen queue:
    listen queue len:
    idle processes:
    active processes:
    total processes:
    max active processes:
    max children reached:
    slow requests:
- Tag metric with: `host` and `pool` name
Closes #255
			
			
This commit is contained in:
		
							parent
							
								
									d394003739
								
							
						
					
					
						commit
						0bc76f094a
					
				|  | @ -14,6 +14,7 @@ import ( | |||
| 	_ "github.com/influxdb/telegraf/plugins/mongodb" | ||||
| 	_ "github.com/influxdb/telegraf/plugins/mysql" | ||||
| 	_ "github.com/influxdb/telegraf/plugins/nginx" | ||||
| 	_ "github.com/influxdb/telegraf/plugins/phpfpm" | ||||
| 	_ "github.com/influxdb/telegraf/plugins/ping" | ||||
| 	_ "github.com/influxdb/telegraf/plugins/postgresql" | ||||
| 	_ "github.com/influxdb/telegraf/plugins/procstat" | ||||
|  |  | |||
|  | @ -0,0 +1,73 @@ | |||
| # Telegraf plugin: phpfpm | ||||
| 
 | ||||
| Get phpfpm stat using either HTTP status page or fpm socket. | ||||
| 
 | ||||
| # Measurements | ||||
| 
 | ||||
| Meta: | ||||
| 
 | ||||
| - tags: `url=<ip> pool=poolname` | ||||
| 
 | ||||
| Measurement names: | ||||
| 
 | ||||
| - accepted_conn | ||||
| - listen_queue | ||||
| - max_listen_queue | ||||
| - listen_queue_len | ||||
| - idle_processes | ||||
| - active_processes | ||||
| - total_processes | ||||
| - max_active_processes | ||||
| - max_children_reached | ||||
| - slow_requests | ||||
| 
 | ||||
| # Example output | ||||
| 
 | ||||
| Using this configuration: | ||||
| 
 | ||||
| ``` | ||||
| [phpfpm] | ||||
|   # An array of address to gather stats about. Specify an ip on hostname | ||||
|   # with optional port and path. ie localhost, 10.10.3.33/server-status, etc. | ||||
|   # | ||||
|   # We can configure int two modes: | ||||
|   #   - unixsocket: the string is the path to fpm socket like | ||||
|   #      /var/run/php5-fpm.sock | ||||
|   #   - http: the URL has to start with http:// or https:// | ||||
|   # | ||||
|   # If no servers are specified, then default to 127.0.0.1/server-status | ||||
|   urls = ["http://localhost/status", "10.0.0.12:/var/run/php5-fpm-www2.sock"] | ||||
| ``` | ||||
| 
 | ||||
| When run with: | ||||
| 
 | ||||
| ``` | ||||
| ./telegraf -config telegraf.conf -filter phpfpm -test | ||||
| ``` | ||||
| 
 | ||||
| It produces: | ||||
| 
 | ||||
| ``` | ||||
| * Plugin: phpfpm, Collection 1 | ||||
| > [url="10.0.0.12" pool="www"] phpfpm_idle_processes value=1 | ||||
| > [url="10.0.0.12" pool="www"] phpfpm_total_processes value=2 | ||||
| > [url="10.0.0.12" pool="www"] phpfpm_max_children_reached value=0 | ||||
| > [url="10.0.0.12" pool="www"] phpfpm_max_listen_queue value=0 | ||||
| > [url="10.0.0.12" pool="www"] phpfpm_listen_queue value=0 | ||||
| > [url="10.0.0.12" pool="www"] phpfpm_listen_queue_len value=0 | ||||
| > [url="10.0.0.12" pool="www"] phpfpm_active_processes value=1 | ||||
| > [url="10.0.0.12" pool="www"] phpfpm_max_active_processes value=2 | ||||
| > [url="10.0.0.12" pool="www"] phpfpm_slow_requests value=0 | ||||
| > [url="10.0.0.12" pool="www"] phpfpm_accepted_conn value=305 | ||||
| 
 | ||||
| > [url="localhost" pool="www2"] phpfpm_max_children_reached value=0 | ||||
| > [url="localhost" pool="www2"] phpfpm_slow_requests value=0 | ||||
| > [url="localhost" pool="www2"] phpfpm_max_listen_queue value=0 | ||||
| > [url="localhost" pool="www2"] phpfpm_active_processes value=1 | ||||
| > [url="localhost" pool="www2"] phpfpm_listen_queue_len value=0 | ||||
| > [url="localhost" pool="www2"] phpfpm_idle_processes value=1 | ||||
| > [url="localhost" pool="www2"] phpfpm_total_processes value=2 | ||||
| > [url="localhost" pool="www2"] phpfpm_max_active_processes value=2 | ||||
| > [url="localhost" pool="www2"] phpfpm_accepted_conn value=306 | ||||
| > [url="localhost" pool="www2"] phpfpm_listen_queue value=0 | ||||
| ``` | ||||
|  | @ -0,0 +1,194 @@ | |||
| package phpfpm | ||||
| 
 | ||||
| import ( | ||||
| 	"bufio" | ||||
| 	"bytes" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"net/http" | ||||
| 	"net/url" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 
 | ||||
| 	"github.com/influxdb/telegraf/plugins" | ||||
| ) | ||||
| 
 | ||||
| const ( | ||||
| 	PF_POOL                 = "pool" | ||||
| 	PF_PROCESS_MANAGER      = "process manager" | ||||
| 	PF_ACCEPTED_CONN        = "accepted conn" | ||||
| 	PF_LISTEN_QUEUE         = "listen queue" | ||||
| 	PF_MAX_LISTEN_QUEUE     = "max listen queue" | ||||
| 	PF_LISTEN_QUEUE_LEN     = "listen queue len" | ||||
| 	PF_IDLE_PROCESSES       = "idle processes" | ||||
| 	PF_ACTIVE_PROCESSES     = "active processes" | ||||
| 	PF_TOTAL_PROCESSES      = "total processes" | ||||
| 	PF_MAX_ACTIVE_PROCESSES = "max active processes" | ||||
| 	PF_MAX_CHILDREN_REACHED = "max children reached" | ||||
| 	PF_SLOW_REQUESTS        = "slow requests" | ||||
| ) | ||||
| 
 | ||||
| type metric map[string]int64 | ||||
| type poolStat map[string]metric | ||||
| 
 | ||||
| type phpfpm struct { | ||||
| 	Urls []string | ||||
| 
 | ||||
| 	client *http.Client | ||||
| } | ||||
| 
 | ||||
| var sampleConfig = ` | ||||
| 	# An array of addresses to gather stats about. Specify an ip or hostname | ||||
| 	# with optional port and path. | ||||
| 	# | ||||
| 	# Plugin can be configured in two modes (both can be used): | ||||
| 	#   - http: the URL must start with http:// or https://, ex:
 | ||||
| 	#		"http://localhost/status" | ||||
| 	#		"http://192.168.130.1/status?full" | ||||
| 	#   - unixsocket: path to fpm socket, ex: | ||||
| 	#		"/var/run/php5-fpm.sock" | ||||
| 	#		"192.168.10.10:/var/run/php5-fpm-www2.sock" | ||||
| 	# | ||||
| 	# If no servers are specified, then default to 127.0.0.1/server-status | ||||
| 	urls = ["http://localhost/status"] | ||||
| ` | ||||
| 
 | ||||
| func (r *phpfpm) SampleConfig() string { | ||||
| 	return sampleConfig | ||||
| } | ||||
| 
 | ||||
| func (r *phpfpm) Description() string { | ||||
| 	return "Read metrics of phpfpm, via HTTP status page or socket(pending)" | ||||
| } | ||||
| 
 | ||||
| // Reads stats from all configured servers accumulates stats.
 | ||||
| // Returns one of the errors encountered while gather stats (if any).
 | ||||
| func (g *phpfpm) Gather(acc plugins.Accumulator) error { | ||||
| 	if len(g.Urls) == 0 { | ||||
| 		return g.gatherServer("http://127.0.0.1/status", acc) | ||||
| 	} | ||||
| 
 | ||||
| 	var wg sync.WaitGroup | ||||
| 
 | ||||
| 	var outerr error | ||||
| 
 | ||||
| 	for _, serv := range g.Urls { | ||||
| 		wg.Add(1) | ||||
| 		go func(serv string) { | ||||
| 			defer wg.Done() | ||||
| 			outerr = g.gatherServer(serv, acc) | ||||
| 		}(serv) | ||||
| 	} | ||||
| 
 | ||||
| 	wg.Wait() | ||||
| 
 | ||||
| 	return outerr | ||||
| } | ||||
| 
 | ||||
| // Request status page to get stat raw data
 | ||||
| func (g *phpfpm) gatherServer(addr string, acc plugins.Accumulator) error { | ||||
| 	if g.client == nil { | ||||
| 
 | ||||
| 		client := &http.Client{} | ||||
| 		g.client = client | ||||
| 	} | ||||
| 
 | ||||
| 	if strings.HasPrefix(addr, "http://") || strings.HasPrefix(addr, "https://") { | ||||
| 		u, err := url.Parse(addr) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("Unable parse server address '%s': %s", addr, err) | ||||
| 		} | ||||
| 
 | ||||
| 		req, err := http.NewRequest("GET", fmt.Sprintf("%s://%s%s", u.Scheme, | ||||
| 			u.Host, u.Path), nil) | ||||
| 		res, err := g.client.Do(req) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("Unable to connect to phpfpm status page '%s': %v", | ||||
| 				addr, err) | ||||
| 		} | ||||
| 
 | ||||
| 		if res.StatusCode != 200 { | ||||
| 			return fmt.Errorf("Unable to get valid stat result from '%s': %v", | ||||
| 				addr, err) | ||||
| 		} | ||||
| 
 | ||||
| 		importMetric(res.Body, acc, u.Host) | ||||
| 	} else { | ||||
| 		socketAddr := strings.Split(addr, ":") | ||||
| 
 | ||||
| 		fcgi, _ := NewClient("unix", socketAddr[1]) | ||||
| 		resOut, resErr, err := fcgi.Request(map[string]string{ | ||||
| 			"SCRIPT_NAME":     "/status", | ||||
| 			"SCRIPT_FILENAME": "status", | ||||
| 			"REQUEST_METHOD":  "GET", | ||||
| 		}, "") | ||||
| 
 | ||||
| 		if len(resErr) == 0 && err == nil { | ||||
| 			importMetric(bytes.NewReader(resOut), acc, socketAddr[0]) | ||||
| 		} | ||||
| 
 | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // Import HTTP stat data into Telegraf system
 | ||||
| func importMetric(r io.Reader, acc plugins.Accumulator, host string) (poolStat, error) { | ||||
| 	stats := make(poolStat) | ||||
| 	var currentPool string | ||||
| 
 | ||||
| 	scanner := bufio.NewScanner(r) | ||||
| 	for scanner.Scan() { | ||||
| 		statLine := scanner.Text() | ||||
| 		keyvalue := strings.Split(statLine, ":") | ||||
| 
 | ||||
| 		if len(keyvalue) < 2 { | ||||
| 			continue | ||||
| 		} | ||||
| 		fieldName := strings.Trim(keyvalue[0], " ") | ||||
| 		// We start to gather data for a new pool here
 | ||||
| 		if fieldName == PF_POOL { | ||||
| 			currentPool = strings.Trim(keyvalue[1], " ") | ||||
| 			stats[currentPool] = make(metric) | ||||
| 			continue | ||||
| 		} | ||||
| 
 | ||||
| 		// Start to parse metric for current pool
 | ||||
| 		switch fieldName { | ||||
| 		case PF_ACCEPTED_CONN, | ||||
| 			PF_LISTEN_QUEUE, | ||||
| 			PF_MAX_LISTEN_QUEUE, | ||||
| 			PF_LISTEN_QUEUE_LEN, | ||||
| 			PF_IDLE_PROCESSES, | ||||
| 			PF_ACTIVE_PROCESSES, | ||||
| 			PF_TOTAL_PROCESSES, | ||||
| 			PF_MAX_ACTIVE_PROCESSES, | ||||
| 			PF_MAX_CHILDREN_REACHED, | ||||
| 			PF_SLOW_REQUESTS: | ||||
| 			fieldValue, err := strconv.ParseInt(strings.Trim(keyvalue[1], " "), 10, 64) | ||||
| 			if err == nil { | ||||
| 				stats[currentPool][fieldName] = fieldValue | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	// Finally, we push the pool metric
 | ||||
| 	for pool := range stats { | ||||
| 		tags := map[string]string{ | ||||
| 			"url":  host, | ||||
| 			"pool": pool, | ||||
| 		} | ||||
| 		for k, v := range stats[pool] { | ||||
| 			acc.Add(strings.Replace(k, " ", "_", -1), v, tags) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return stats, nil | ||||
| } | ||||
| 
 | ||||
| func init() { | ||||
| 	plugins.Add("phpfpm", func() plugins.Plugin { | ||||
| 		return &phpfpm{} | ||||
| 	}) | ||||
| } | ||||
|  | @ -0,0 +1,321 @@ | |||
| package phpfpm | ||||
| 
 | ||||
| // FastCGI client to request via socket
 | ||||
| 
 | ||||
| // Copyright 2012 Junqing Tan <ivan@mysqlab.net> and The Go Authors
 | ||||
| // Use of this source code is governed by a BSD-style
 | ||||
| // Part of source code is from Go fcgi package
 | ||||
| 
 | ||||
| // Fix bug: Can't recive more than 1 record untill FCGI_END_REQUEST 2012-09-15
 | ||||
| // By: wofeiwo
 | ||||
| 
 | ||||
| import ( | ||||
| 	"bufio" | ||||
| 	"bytes" | ||||
| 	"encoding/binary" | ||||
| 	"errors" | ||||
| 	"io" | ||||
| 	"net" | ||||
| 	"strconv" | ||||
| 	"sync" | ||||
| ) | ||||
| 
 | ||||
| const FCGI_LISTENSOCK_FILENO uint8 = 0 | ||||
| const FCGI_HEADER_LEN uint8 = 8 | ||||
| const VERSION_1 uint8 = 1 | ||||
| const FCGI_NULL_REQUEST_ID uint8 = 0 | ||||
| const FCGI_KEEP_CONN uint8 = 1 | ||||
| 
 | ||||
| const ( | ||||
| 	FCGI_BEGIN_REQUEST uint8 = iota + 1 | ||||
| 	FCGI_ABORT_REQUEST | ||||
| 	FCGI_END_REQUEST | ||||
| 	FCGI_PARAMS | ||||
| 	FCGI_STDIN | ||||
| 	FCGI_STDOUT | ||||
| 	FCGI_STDERR | ||||
| 	FCGI_DATA | ||||
| 	FCGI_GET_VALUES | ||||
| 	FCGI_GET_VALUES_RESULT | ||||
| 	FCGI_UNKNOWN_TYPE | ||||
| 	FCGI_MAXTYPE = FCGI_UNKNOWN_TYPE | ||||
| ) | ||||
| 
 | ||||
| const ( | ||||
| 	FCGI_RESPONDER uint8 = iota + 1 | ||||
| 	FCGI_AUTHORIZER | ||||
| 	FCGI_FILTER | ||||
| ) | ||||
| 
 | ||||
| const ( | ||||
| 	FCGI_REQUEST_COMPLETE uint8 = iota | ||||
| 	FCGI_CANT_MPX_CONN | ||||
| 	FCGI_OVERLOADED | ||||
| 	FCGI_UNKNOWN_ROLE | ||||
| ) | ||||
| 
 | ||||
| const ( | ||||
| 	FCGI_MAX_CONNS  string = "MAX_CONNS" | ||||
| 	FCGI_MAX_REQS   string = "MAX_REQS" | ||||
| 	FCGI_MPXS_CONNS string = "MPXS_CONNS" | ||||
| ) | ||||
| 
 | ||||
| const ( | ||||
| 	maxWrite = 6553500 // maximum record body
 | ||||
| 	maxPad   = 255 | ||||
| ) | ||||
| 
 | ||||
| type header struct { | ||||
| 	Version       uint8 | ||||
| 	Type          uint8 | ||||
| 	Id            uint16 | ||||
| 	ContentLength uint16 | ||||
| 	PaddingLength uint8 | ||||
| 	Reserved      uint8 | ||||
| } | ||||
| 
 | ||||
| // for padding so we don't have to allocate all the time
 | ||||
| // not synchronized because we don't care what the contents are
 | ||||
| var pad [maxPad]byte | ||||
| 
 | ||||
| func (h *header) init(recType uint8, reqId uint16, contentLength int) { | ||||
| 	h.Version = 1 | ||||
| 	h.Type = recType | ||||
| 	h.Id = reqId | ||||
| 	h.ContentLength = uint16(contentLength) | ||||
| 	h.PaddingLength = uint8(-contentLength & 7) | ||||
| } | ||||
| 
 | ||||
| type record struct { | ||||
| 	h   header | ||||
| 	buf [maxWrite + maxPad]byte | ||||
| } | ||||
| 
 | ||||
| func (rec *record) read(r io.Reader) (err error) { | ||||
| 	if err = binary.Read(r, binary.BigEndian, &rec.h); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if rec.h.Version != 1 { | ||||
| 		return errors.New("fcgi: invalid header version") | ||||
| 	} | ||||
| 	n := int(rec.h.ContentLength) + int(rec.h.PaddingLength) | ||||
| 	if _, err = io.ReadFull(r, rec.buf[:n]); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (r *record) content() []byte { | ||||
| 	return r.buf[:r.h.ContentLength] | ||||
| } | ||||
| 
 | ||||
| type FCGIClient struct { | ||||
| 	mutex     sync.Mutex | ||||
| 	rwc       io.ReadWriteCloser | ||||
| 	h         header | ||||
| 	buf       bytes.Buffer | ||||
| 	keepAlive bool | ||||
| } | ||||
| 
 | ||||
| func NewClient(h string, args ...interface{}) (fcgi *FCGIClient, err error) { | ||||
| 	var conn net.Conn | ||||
| 	if len(args) != 1 { | ||||
| 		err = errors.New("fcgi: not enough params") | ||||
| 		return | ||||
| 	} | ||||
| 	switch args[0].(type) { | ||||
| 	case int: | ||||
| 		addr := h + ":" + strconv.FormatInt(int64(args[0].(int)), 10) | ||||
| 		conn, err = net.Dial("tcp", addr) | ||||
| 	case string: | ||||
| 		laddr := net.UnixAddr{Name: args[0].(string), Net: h} | ||||
| 		conn, err = net.DialUnix(h, nil, &laddr) | ||||
| 	default: | ||||
| 		err = errors.New("fcgi: we only accept int (port) or string (socket) params.") | ||||
| 	} | ||||
| 	fcgi = &FCGIClient{ | ||||
| 		rwc:       conn, | ||||
| 		keepAlive: false, | ||||
| 	} | ||||
| 	return | ||||
| } | ||||
| 
 | ||||
| func (client *FCGIClient) writeRecord(recType uint8, reqId uint16, content []byte) (err error) { | ||||
| 	client.mutex.Lock() | ||||
| 	defer client.mutex.Unlock() | ||||
| 	client.buf.Reset() | ||||
| 	client.h.init(recType, reqId, len(content)) | ||||
| 	if err := binary.Write(&client.buf, binary.BigEndian, client.h); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if _, err := client.buf.Write(content); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if _, err := client.buf.Write(pad[:client.h.PaddingLength]); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	_, err = client.rwc.Write(client.buf.Bytes()) | ||||
| 	return err | ||||
| } | ||||
| 
 | ||||
| func (client *FCGIClient) writeBeginRequest(reqId uint16, role uint16, flags uint8) error { | ||||
| 	b := [8]byte{byte(role >> 8), byte(role), flags} | ||||
| 	return client.writeRecord(FCGI_BEGIN_REQUEST, reqId, b[:]) | ||||
| } | ||||
| 
 | ||||
| func (client *FCGIClient) writeEndRequest(reqId uint16, appStatus int, protocolStatus uint8) error { | ||||
| 	b := make([]byte, 8) | ||||
| 	binary.BigEndian.PutUint32(b, uint32(appStatus)) | ||||
| 	b[4] = protocolStatus | ||||
| 	return client.writeRecord(FCGI_END_REQUEST, reqId, b) | ||||
| } | ||||
| 
 | ||||
| func (client *FCGIClient) writePairs(recType uint8, reqId uint16, pairs map[string]string) error { | ||||
| 	w := newWriter(client, recType, reqId) | ||||
| 	b := make([]byte, 8) | ||||
| 	for k, v := range pairs { | ||||
| 		n := encodeSize(b, uint32(len(k))) | ||||
| 		n += encodeSize(b[n:], uint32(len(v))) | ||||
| 		if _, err := w.Write(b[:n]); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		if _, err := w.WriteString(k); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		if _, err := w.WriteString(v); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 	w.Close() | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func readSize(s []byte) (uint32, int) { | ||||
| 	if len(s) == 0 { | ||||
| 		return 0, 0 | ||||
| 	} | ||||
| 	size, n := uint32(s[0]), 1 | ||||
| 	if size&(1<<7) != 0 { | ||||
| 		if len(s) < 4 { | ||||
| 			return 0, 0 | ||||
| 		} | ||||
| 		n = 4 | ||||
| 		size = binary.BigEndian.Uint32(s) | ||||
| 		size &^= 1 << 31 | ||||
| 	} | ||||
| 	return size, n | ||||
| } | ||||
| 
 | ||||
| func readString(s []byte, size uint32) string { | ||||
| 	if size > uint32(len(s)) { | ||||
| 		return "" | ||||
| 	} | ||||
| 	return string(s[:size]) | ||||
| } | ||||
| 
 | ||||
| func encodeSize(b []byte, size uint32) int { | ||||
| 	if size > 127 { | ||||
| 		size |= 1 << 31 | ||||
| 		binary.BigEndian.PutUint32(b, size) | ||||
| 		return 4 | ||||
| 	} | ||||
| 	b[0] = byte(size) | ||||
| 	return 1 | ||||
| } | ||||
| 
 | ||||
| // bufWriter encapsulates bufio.Writer but also closes the underlying stream when
 | ||||
| // Closed.
 | ||||
| type bufWriter struct { | ||||
| 	closer io.Closer | ||||
| 	*bufio.Writer | ||||
| } | ||||
| 
 | ||||
| func (w *bufWriter) Close() error { | ||||
| 	if err := w.Writer.Flush(); err != nil { | ||||
| 		w.closer.Close() | ||||
| 		return err | ||||
| 	} | ||||
| 	return w.closer.Close() | ||||
| } | ||||
| 
 | ||||
| func newWriter(c *FCGIClient, recType uint8, reqId uint16) *bufWriter { | ||||
| 	s := &streamWriter{c: c, recType: recType, reqId: reqId} | ||||
| 	w := bufio.NewWriterSize(s, maxWrite) | ||||
| 	return &bufWriter{s, w} | ||||
| } | ||||
| 
 | ||||
| // streamWriter abstracts out the separation of a stream into discrete records.
 | ||||
| // It only writes maxWrite bytes at a time.
 | ||||
| type streamWriter struct { | ||||
| 	c       *FCGIClient | ||||
| 	recType uint8 | ||||
| 	reqId   uint16 | ||||
| } | ||||
| 
 | ||||
| func (w *streamWriter) Write(p []byte) (int, error) { | ||||
| 	nn := 0 | ||||
| 	for len(p) > 0 { | ||||
| 		n := len(p) | ||||
| 		if n > maxWrite { | ||||
| 			n = maxWrite | ||||
| 		} | ||||
| 		if err := w.c.writeRecord(w.recType, w.reqId, p[:n]); err != nil { | ||||
| 			return nn, err | ||||
| 		} | ||||
| 		nn += n | ||||
| 		p = p[n:] | ||||
| 	} | ||||
| 	return nn, nil | ||||
| } | ||||
| 
 | ||||
| func (w *streamWriter) Close() error { | ||||
| 	// send empty record to close the stream
 | ||||
| 	return w.c.writeRecord(w.recType, w.reqId, nil) | ||||
| } | ||||
| 
 | ||||
| func (client *FCGIClient) Request(env map[string]string, reqStr string) (retout []byte, reterr []byte, err error) { | ||||
| 
 | ||||
| 	var reqId uint16 = 1 | ||||
| 	defer client.rwc.Close() | ||||
| 
 | ||||
| 	err = client.writeBeginRequest(reqId, uint16(FCGI_RESPONDER), 0) | ||||
| 	if err != nil { | ||||
| 		return | ||||
| 	} | ||||
| 	err = client.writePairs(FCGI_PARAMS, reqId, env) | ||||
| 	if err != nil { | ||||
| 		return | ||||
| 	} | ||||
| 	if len(reqStr) > 0 { | ||||
| 		err = client.writeRecord(FCGI_STDIN, reqId, []byte(reqStr)) | ||||
| 		if err != nil { | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	rec := &record{} | ||||
| 	var err1 error | ||||
| 
 | ||||
| 	// recive untill EOF or FCGI_END_REQUEST
 | ||||
| 	for { | ||||
| 		err1 = rec.read(client.rwc) | ||||
| 		if err1 != nil { | ||||
| 			if err1 != io.EOF { | ||||
| 				err = err1 | ||||
| 			} | ||||
| 			break | ||||
| 		} | ||||
| 		switch { | ||||
| 		case rec.h.Type == FCGI_STDOUT: | ||||
| 			retout = append(retout, rec.content()...) | ||||
| 		case rec.h.Type == FCGI_STDERR: | ||||
| 			reterr = append(reterr, rec.content()...) | ||||
| 		case rec.h.Type == FCGI_END_REQUEST: | ||||
| 			fallthrough | ||||
| 		default: | ||||
| 			break | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return | ||||
| } | ||||
|  | @ -0,0 +1,85 @@ | |||
| package phpfpm | ||||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"testing" | ||||
| 
 | ||||
| 	"github.com/influxdb/telegraf/testutil" | ||||
| 	"github.com/stretchr/testify/assert" | ||||
| 	"github.com/stretchr/testify/require" | ||||
| 	"net/http" | ||||
| 	"net/http/httptest" | ||||
| ) | ||||
| 
 | ||||
| func TestPhpFpmGeneratesMetrics(t *testing.T) { | ||||
| 	//We create a fake server to return test data
 | ||||
| 	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||
| 		fmt.Fprint(w, outputSample) | ||||
| 	})) | ||||
| 	defer ts.Close() | ||||
| 
 | ||||
| 	//Now we tested again above server, with our authentication data
 | ||||
| 	r := &phpfpm{ | ||||
| 		Urls: []string{ts.URL}, | ||||
| 	} | ||||
| 
 | ||||
| 	var acc testutil.Accumulator | ||||
| 
 | ||||
| 	err := r.Gather(&acc) | ||||
| 	require.NoError(t, err) | ||||
| 
 | ||||
| 	tags := map[string]string{ | ||||
| 		"url":  ts.Listener.Addr().String(), | ||||
| 		"pool": "www", | ||||
| 	} | ||||
| 	assert.NoError(t, acc.ValidateTaggedValue("accepted_conn", int64(3), tags)) | ||||
| 
 | ||||
| 	checkInt := []struct { | ||||
| 		name  string | ||||
| 		value int64 | ||||
| 	}{ | ||||
| 		{"accepted_conn", 3}, | ||||
| 		{"listen_queue", 1}, | ||||
| 		{"max_listen_queue", 0}, | ||||
| 		{"listen_queue_len", 0}, | ||||
| 		{"idle_processes", 1}, | ||||
| 		{"active_processes", 1}, | ||||
| 		{"total_processes", 2}, | ||||
| 		{"max_active_processes", 1}, | ||||
| 		{"max_children_reached", 2}, | ||||
| 		{"slow_requests", 1}, | ||||
| 	} | ||||
| 
 | ||||
| 	for _, c := range checkInt { | ||||
| 		assert.Equal(t, true, acc.CheckValue(c.name, c.value)) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| //When not passing server config, we default to localhost
 | ||||
| //We just want to make sure we did request stat from localhost
 | ||||
| func TestHaproxyDefaultGetFromLocalhost(t *testing.T) { | ||||
| 	r := &phpfpm{} | ||||
| 
 | ||||
| 	var acc testutil.Accumulator | ||||
| 
 | ||||
| 	err := r.Gather(&acc) | ||||
| 	require.Error(t, err) | ||||
| 	assert.Contains(t, err.Error(), "127.0.0.1/status") | ||||
| } | ||||
| 
 | ||||
| const outputSample = ` | ||||
| pool:                 www | ||||
| process manager:      dynamic | ||||
| start time:           11/Oct/2015:23:38:51 +0000 | ||||
| start since:          1991 | ||||
| accepted conn:        3 | ||||
| listen queue:         1 | ||||
| max listen queue:     0 | ||||
| listen queue len:     0 | ||||
| idle processes:       1 | ||||
| active processes:     1 | ||||
| total processes:      2 | ||||
| max active processes: 1 | ||||
| max children reached: 2 | ||||
| slow requests:        1 | ||||
| ` | ||||
		Loading…
	
		Reference in New Issue