phpfpm plugin: enhance socket gathering and config

- If we detect errors when gathering stat via socket, return those error
  so it canbe appear in Telegraf log
- Improve fcgi client, also upgrade it to current version of Go at
  https://golang.org/src/net/http/fcgi/fcgi.go
- Add test for unix socket and fcgi
  to remotely connect but only as an extra url field.
- Allow customization of fpm status path
- Document about using of `host` in case `unixsocket` that it isn't used
- Documet upgrade for new data layout

closes #499
closes #502
closes #538
This commit is contained in:
Vinh 2016-01-14 14:20:59 -08:00 committed by Cameron Sparr
parent a712036b56
commit 5af6974796
4 changed files with 406 additions and 205 deletions

View File

@ -6,10 +6,14 @@ Get phpfpm stat using either HTTP status page or fpm socket.
Meta: Meta:
- tags: `url=<ip> pool=poolname` - tags: `pool=poolname`
Measurement names: Measurement names:
- phpfpm
Measurement field:
- accepted_conn - accepted_conn
- listen_queue - listen_queue
- max_listen_queue - max_listen_queue
@ -50,36 +54,12 @@ It produces:
``` ```
* Plugin: phpfpm, Collection 1 * Plugin: phpfpm, Collection 1
> [url="10.0.0.12" pool="www"] phpfpm_idle_processes value=1 > phpfpm,pool=www accepted_conn=13i,active_processes=2i,idle_processes=1i,listen_queue=0i,listen_queue_len=0i,max_active_processes=2i,max_children_reached=0i,max_listen_queue=0i,slow_requests=0i,total_processes=3i 1453011293083331187
> [url="10.0.0.12" pool="www"] phpfpm_total_processes value=2 > phpfpm,pool=www2 accepted_conn=12i,active_processes=1i,idle_processes=2i,listen_queue=0i,listen_queue_len=0i,max_active_processes=2i,max_children_reached=0i,max_listen_queue=0i,slow_requests=0i,total_processes=3i 1453011293083691422
> [url="10.0.0.12" pool="www"] phpfpm_max_children_reached value=0 > phpfpm,pool=www3 accepted_conn=11i,active_processes=1i,idle_processes=2i,listen_queue=0i,listen_queue_len=0i,max_active_processes=2i,max_children_reached=0i,max_listen_queue=0i,slow_requests=0i,total_processes=3i 1453011293083691658
> [url="10.0.0.12" pool="www"] phpfpm_max_listen_queue value=0
> [url="10.0.0.12" pool="www"] phpfpm_listen_queue value=0
> [url="10.0.0.12" pool="www"] phpfpm_listen_queue_len value=0
> [url="10.0.0.12" pool="www"] phpfpm_active_processes value=1
> [url="10.0.0.12" pool="www"] phpfpm_max_active_processes value=2
> [url="10.0.0.12" pool="www"] phpfpm_slow_requests value=0
> [url="10.0.0.12" pool="www"] phpfpm_accepted_conn value=305
> [url="localhost" pool="www2"] phpfpm_max_children_reached value=0
> [url="localhost" pool="www2"] phpfpm_slow_requests value=0
> [url="localhost" pool="www2"] phpfpm_max_listen_queue value=0
> [url="localhost" pool="www2"] phpfpm_active_processes value=1
> [url="localhost" pool="www2"] phpfpm_listen_queue_len value=0
> [url="localhost" pool="www2"] phpfpm_idle_processes value=1
> [url="localhost" pool="www2"] phpfpm_total_processes value=2
> [url="localhost" pool="www2"] phpfpm_max_active_processes value=2
> [url="localhost" pool="www2"] phpfpm_accepted_conn value=306
> [url="localhost" pool="www2"] phpfpm_listen_queue value=0
> [url="10.0.0.12:9000" pool="www3"] phpfpm_max_children_reached value=0
> [url="10.0.0.12:9000" pool="www3"] phpfpm_slow_requests value=1
> [url="10.0.0.12:9000" pool="www3"] phpfpm_max_listen_queue value=0
> [url="10.0.0.12:9000" pool="www3"] phpfpm_active_processes value=1
> [url="10.0.0.12:9000" pool="www3"] phpfpm_listen_queue_len value=0
> [url="10.0.0.12:9000" pool="www3"] phpfpm_idle_processes value=2
> [url="10.0.0.12:9000" pool="www3"] phpfpm_total_processes value=2
> [url="10.0.0.12:9000" pool="www3"] phpfpm_max_active_processes value=2
> [url="10.0.0.12:9000" pool="www3"] phpfpm_accepted_conn value=307
> [url="10.0.0.12:9000" pool="www3"] phpfpm_listen_queue value=0
``` ```
## Note
When using `unixsocket`, you have to ensure that telegraf runs on same
host, and socket path is accessible to telegraf user.

View File

@ -7,6 +7,7 @@ import (
"io" "io"
"net/http" "net/http"
"net/url" "net/url"
"os"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -40,20 +41,25 @@ type phpfpm struct {
var sampleConfig = ` var sampleConfig = `
# An array of addresses to gather stats about. Specify an ip or hostname # An array of addresses to gather stats about. Specify an ip or hostname
# with optional port and path. # with optional port and path
# #
# Plugin can be configured in three modes (both can be used): # Plugin can be configured in three modes (either can be used):
# - http: the URL must start with http:// or https://, ex: # - http: the URL must start with http:// or https://, ie:
# "http://localhost/status" # "http://localhost/status"
# "http://192.168.130.1/status?full" # "http://192.168.130.1/status?full"
# - unixsocket: path to fpm socket, ex: #
# - unixsocket: path to fpm socket, ie:
# "/var/run/php5-fpm.sock" # "/var/run/php5-fpm.sock"
# "192.168.10.10:/var/run/php5-fpm-www2.sock" # or using a custom fpm status path:
# - fcgi: the URL mush start with fcgi:// or cgi://, and port must present, ex: # "/var/run/php5-fpm.sock:fpm-custom-status-path"
#
# - fcgi: the URL must start with fcgi:// or cgi://, and port must be present, ie:
# "fcgi://10.0.0.12:9000/status" # "fcgi://10.0.0.12:9000/status"
# "cgi://10.0.10.12:9001/status" # "cgi://10.0.10.12:9001/status"
# #
# If no servers are specified, then default to 127.0.0.1/server-status # Example of multiple gathering from local socket and remove host
# urls = ["http://192.168.1.20/status", "/tmp/fpm.sock"]
# If no servers are specified, then default to http://127.0.0.1/status
urls = ["http://localhost/status"] urls = ["http://localhost/status"]
` `
@ -62,7 +68,7 @@ func (r *phpfpm) SampleConfig() string {
} }
func (r *phpfpm) Description() string { func (r *phpfpm) Description() string {
return "Read metrics of phpfpm, via HTTP status page or socket(pending)" return "Read metrics of phpfpm, via HTTP status page or socket"
} }
// Reads stats from all configured servers accumulates stats. // Reads stats from all configured servers accumulates stats.
@ -89,71 +95,96 @@ func (g *phpfpm) Gather(acc inputs.Accumulator) error {
return outerr return outerr
} }
// Request status page to get stat raw data // Request status page to get stat raw data and import it
func (g *phpfpm) gatherServer(addr string, acc inputs.Accumulator) error { func (g *phpfpm) gatherServer(addr string, acc inputs.Accumulator) error {
if g.client == nil { if g.client == nil {
client := &http.Client{} client := &http.Client{}
g.client = client g.client = client
} }
if strings.HasPrefix(addr, "http://") || strings.HasPrefix(addr, "https://") { if strings.HasPrefix(addr, "http://") || strings.HasPrefix(addr, "https://") {
return g.gatherHttp(addr, acc)
}
var (
fcgi *conn
socketPath string
statusPath string
)
if strings.HasPrefix(addr, "fcgi://") || strings.HasPrefix(addr, "cgi://") {
u, err := url.Parse(addr) u, err := url.Parse(addr)
if err != nil { if err != nil {
return fmt.Errorf("Unable parse server address '%s': %s", addr, err) return fmt.Errorf("Unable parse server address '%s': %s", addr, err)
} }
socketAddr := strings.Split(u.Host, ":")
req, err := http.NewRequest("GET", fmt.Sprintf("%s://%s%s", u.Scheme, fcgiIp := socketAddr[0]
u.Host, u.Path), nil) fcgiPort, _ := strconv.Atoi(socketAddr[1])
res, err := g.client.Do(req) fcgi, _ = NewClient(fcgiIp, fcgiPort)
if err != nil {
return fmt.Errorf("Unable to connect to phpfpm status page '%s': %v",
addr, err)
}
if res.StatusCode != 200 {
return fmt.Errorf("Unable to get valid stat result from '%s': %v",
addr, err)
}
importMetric(res.Body, acc, u.Host)
} else { } else {
var ( socketAddr := strings.Split(addr, ":")
fcgi *FCGIClient if len(socketAddr) >= 2 {
fcgiAddr string socketPath = socketAddr[0]
) statusPath = socketAddr[1]
if strings.HasPrefix(addr, "fcgi://") || strings.HasPrefix(addr, "cgi://") {
u, err := url.Parse(addr)
if err != nil {
return fmt.Errorf("Unable parse server address '%s': %s", addr, err)
}
socketAddr := strings.Split(u.Host, ":")
fcgiIp := socketAddr[0]
fcgiPort, _ := strconv.Atoi(socketAddr[1])
fcgiAddr = u.Host
fcgi, _ = NewClient(fcgiIp, fcgiPort)
} else { } else {
socketAddr := strings.Split(addr, ":") socketPath = socketAddr[0]
fcgiAddr = socketAddr[0] statusPath = "status"
fcgi, _ = NewClient("unix", socketAddr[1])
}
resOut, resErr, err := fcgi.Request(map[string]string{
"SCRIPT_NAME": "/status",
"SCRIPT_FILENAME": "status",
"REQUEST_METHOD": "GET",
}, "")
if len(resErr) == 0 && err == nil {
importMetric(bytes.NewReader(resOut), acc, fcgiAddr)
} }
if _, err := os.Stat(socketPath); os.IsNotExist(err) {
return fmt.Errorf("Socket doesn't exist '%s': %s", socketPath, err)
}
fcgi, _ = NewClient("unix", socketPath)
}
return g.gatherFcgi(fcgi, statusPath, acc)
}
// Gather stat using fcgi protocol
func (g *phpfpm) gatherFcgi(fcgi *conn, statusPath string, acc inputs.Accumulator) error {
fpmOutput, fpmErr, err := fcgi.Request(map[string]string{
"SCRIPT_NAME": "/" + statusPath,
"SCRIPT_FILENAME": statusPath,
"REQUEST_METHOD": "GET",
"CONTENT_LENGTH": "0",
"SERVER_PROTOCOL": "HTTP/1.0",
"SERVER_SOFTWARE": "go / fcgiclient ",
"REMOTE_ADDR": "127.0.0.1",
}, "/"+statusPath)
if len(fpmErr) == 0 && err == nil {
importMetric(bytes.NewReader(fpmOutput), acc)
return nil
} else {
return fmt.Errorf("Unable parse phpfpm status. Error: %v %v", string(fpmErr), err)
}
}
// Gather stat using http protocol
func (g *phpfpm) gatherHttp(addr string, acc inputs.Accumulator) error {
u, err := url.Parse(addr)
if err != nil {
return fmt.Errorf("Unable parse server address '%s': %s", addr, err)
} }
req, err := http.NewRequest("GET", fmt.Sprintf("%s://%s%s", u.Scheme,
u.Host, u.Path), nil)
res, err := g.client.Do(req)
if err != nil {
return fmt.Errorf("Unable to connect to phpfpm status page '%s': %v",
addr, err)
}
if res.StatusCode != 200 {
return fmt.Errorf("Unable to get valid stat result from '%s': %v",
addr, err)
}
importMetric(res.Body, acc)
return nil return nil
} }
// Import HTTP stat data into Telegraf system // Import stat data into Telegraf system
func importMetric(r io.Reader, acc inputs.Accumulator, host string) (poolStat, error) { func importMetric(r io.Reader, acc inputs.Accumulator) (poolStat, error) {
stats := make(poolStat) stats := make(poolStat)
var currentPool string var currentPool string
@ -195,7 +226,6 @@ func importMetric(r io.Reader, acc inputs.Accumulator, host string) (poolStat, e
// Finally, we push the pool metric // Finally, we push the pool metric
for pool := range stats { for pool := range stats {
tags := map[string]string{ tags := map[string]string{
"url": host,
"pool": pool, "pool": pool,
} }
fields := make(map[string]interface{}) fields := make(map[string]interface{})

View File

@ -1,13 +1,14 @@
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package fcgi implements the FastCGI protocol.
// Currently only the responder role is supported.
// The protocol is defined at http://www.fastcgi.com/drupal/node/6?q=node/22
package phpfpm package phpfpm
// FastCGI client to request via socket // This file defines the raw protocol and some utilities used by the child and
// the host.
// Copyright 2012 Junqing Tan <ivan@mysqlab.net> and The Go Authors
// Use of this source code is governed by a BSD-style
// Part of source code is from Go fcgi package
// Fix bug: Can't recive more than 1 record untill FCGI_END_REQUEST 2012-09-15
// By: wofeiwo
import ( import (
"bufio" "bufio"
@ -15,70 +16,84 @@ import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"io" "io"
"sync"
"net" "net"
"strconv" "strconv"
"sync"
"strings"
) )
const FCGI_LISTENSOCK_FILENO uint8 = 0 // recType is a record type, as defined by
const FCGI_HEADER_LEN uint8 = 8 // http://www.fastcgi.com/devkit/doc/fcgi-spec.html#S8
const VERSION_1 uint8 = 1 type recType uint8
const FCGI_NULL_REQUEST_ID uint8 = 0
const FCGI_KEEP_CONN uint8 = 1
const ( const (
FCGI_BEGIN_REQUEST uint8 = iota + 1 typeBeginRequest recType = 1
FCGI_ABORT_REQUEST typeAbortRequest recType = 2
FCGI_END_REQUEST typeEndRequest recType = 3
FCGI_PARAMS typeParams recType = 4
FCGI_STDIN typeStdin recType = 5
FCGI_STDOUT typeStdout recType = 6
FCGI_STDERR typeStderr recType = 7
FCGI_DATA typeData recType = 8
FCGI_GET_VALUES typeGetValues recType = 9
FCGI_GET_VALUES_RESULT typeGetValuesResult recType = 10
FCGI_UNKNOWN_TYPE typeUnknownType recType = 11
FCGI_MAXTYPE = FCGI_UNKNOWN_TYPE
) )
const ( // keep the connection between web-server and responder open after request
FCGI_RESPONDER uint8 = iota + 1 const flagKeepConn = 1
FCGI_AUTHORIZER
FCGI_FILTER
)
const ( const (
FCGI_REQUEST_COMPLETE uint8 = iota maxWrite = 65535 // maximum record body
FCGI_CANT_MPX_CONN
FCGI_OVERLOADED
FCGI_UNKNOWN_ROLE
)
const (
FCGI_MAX_CONNS string = "MAX_CONNS"
FCGI_MAX_REQS string = "MAX_REQS"
FCGI_MPXS_CONNS string = "MPXS_CONNS"
)
const (
maxWrite = 6553500 // maximum record body
maxPad = 255 maxPad = 255
) )
const (
roleResponder = iota + 1 // only Responders are implemented.
roleAuthorizer
roleFilter
)
const (
statusRequestComplete = iota
statusCantMultiplex
statusOverloaded
statusUnknownRole
)
const headerLen = 8
type header struct { type header struct {
Version uint8 Version uint8
Type uint8 Type recType
Id uint16 Id uint16
ContentLength uint16 ContentLength uint16
PaddingLength uint8 PaddingLength uint8
Reserved uint8 Reserved uint8
} }
type beginRequest struct {
role uint16
flags uint8
reserved [5]uint8
}
func (br *beginRequest) read(content []byte) error {
if len(content) != 8 {
return errors.New("fcgi: invalid begin request record")
}
br.role = binary.BigEndian.Uint16(content)
br.flags = content[2]
return nil
}
// for padding so we don't have to allocate all the time // for padding so we don't have to allocate all the time
// not synchronized because we don't care what the contents are // not synchronized because we don't care what the contents are
var pad [maxPad]byte var pad [maxPad]byte
func (h *header) init(recType uint8, reqId uint16, contentLength int) { func (h *header) init(recType recType, reqId uint16, contentLength int) {
h.Version = 1 h.Version = 1
h.Type = recType h.Type = recType
h.Id = reqId h.Id = reqId
@ -86,6 +101,26 @@ func (h *header) init(recType uint8, reqId uint16, contentLength int) {
h.PaddingLength = uint8(-contentLength & 7) h.PaddingLength = uint8(-contentLength & 7)
} }
// conn sends records over rwc
type conn struct {
mutex sync.Mutex
rwc io.ReadWriteCloser
// to avoid allocations
buf bytes.Buffer
h header
}
func newConn(rwc io.ReadWriteCloser) *conn {
return &conn{rwc: rwc}
}
func (c *conn) Close() error {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.rwc.Close()
}
type record struct { type record struct {
h header h header
buf [maxWrite + maxPad]byte buf [maxWrite + maxPad]byte
@ -109,69 +144,39 @@ func (r *record) content() []byte {
return r.buf[:r.h.ContentLength] return r.buf[:r.h.ContentLength]
} }
type FCGIClient struct { // writeRecord writes and sends a single record.
mutex sync.Mutex func (c *conn) writeRecord(recType recType, reqId uint16, b []byte) error {
rwc io.ReadWriteCloser c.mutex.Lock()
h header defer c.mutex.Unlock()
buf bytes.Buffer c.buf.Reset()
keepAlive bool c.h.init(recType, reqId, len(b))
} if err := binary.Write(&c.buf, binary.BigEndian, c.h); err != nil {
func NewClient(h string, args ...interface{}) (fcgi *FCGIClient, err error) {
var conn net.Conn
if len(args) != 1 {
err = errors.New("fcgi: not enough params")
return
}
switch args[0].(type) {
case int:
addr := h + ":" + strconv.FormatInt(int64(args[0].(int)), 10)
conn, err = net.Dial("tcp", addr)
case string:
laddr := net.UnixAddr{Name: args[0].(string), Net: h}
conn, err = net.DialUnix(h, nil, &laddr)
default:
err = errors.New("fcgi: we only accept int (port) or string (socket) params.")
}
fcgi = &FCGIClient{
rwc: conn,
keepAlive: false,
}
return
}
func (client *FCGIClient) writeRecord(recType uint8, reqId uint16, content []byte) (err error) {
client.mutex.Lock()
defer client.mutex.Unlock()
client.buf.Reset()
client.h.init(recType, reqId, len(content))
if err := binary.Write(&client.buf, binary.BigEndian, client.h); err != nil {
return err return err
} }
if _, err := client.buf.Write(content); err != nil { if _, err := c.buf.Write(b); err != nil {
return err return err
} }
if _, err := client.buf.Write(pad[:client.h.PaddingLength]); err != nil { if _, err := c.buf.Write(pad[:c.h.PaddingLength]); err != nil {
return err return err
} }
_, err = client.rwc.Write(client.buf.Bytes()) _, err := c.rwc.Write(c.buf.Bytes())
return err return err
} }
func (client *FCGIClient) writeBeginRequest(reqId uint16, role uint16, flags uint8) error { func (c *conn) writeBeginRequest(reqId uint16, role uint16, flags uint8) error {
b := [8]byte{byte(role >> 8), byte(role), flags} b := [8]byte{byte(role >> 8), byte(role), flags}
return client.writeRecord(FCGI_BEGIN_REQUEST, reqId, b[:]) return c.writeRecord(typeBeginRequest, reqId, b[:])
} }
func (client *FCGIClient) writeEndRequest(reqId uint16, appStatus int, protocolStatus uint8) error { func (c *conn) writeEndRequest(reqId uint16, appStatus int, protocolStatus uint8) error {
b := make([]byte, 8) b := make([]byte, 8)
binary.BigEndian.PutUint32(b, uint32(appStatus)) binary.BigEndian.PutUint32(b, uint32(appStatus))
b[4] = protocolStatus b[4] = protocolStatus
return client.writeRecord(FCGI_END_REQUEST, reqId, b) return c.writeRecord(typeEndRequest, reqId, b)
} }
func (client *FCGIClient) writePairs(recType uint8, reqId uint16, pairs map[string]string) error { func (c *conn) writePairs(recType recType, reqId uint16, pairs map[string]string) error {
w := newWriter(client, recType, reqId) w := newWriter(c, recType, reqId)
b := make([]byte, 8) b := make([]byte, 8)
for k, v := range pairs { for k, v := range pairs {
n := encodeSize(b, uint32(len(k))) n := encodeSize(b, uint32(len(k)))
@ -238,7 +243,7 @@ func (w *bufWriter) Close() error {
return w.closer.Close() return w.closer.Close()
} }
func newWriter(c *FCGIClient, recType uint8, reqId uint16) *bufWriter { func newWriter(c *conn, recType recType, reqId uint16) *bufWriter {
s := &streamWriter{c: c, recType: recType, reqId: reqId} s := &streamWriter{c: c, recType: recType, reqId: reqId}
w := bufio.NewWriterSize(s, maxWrite) w := bufio.NewWriterSize(s, maxWrite)
return &bufWriter{s, w} return &bufWriter{s, w}
@ -247,8 +252,8 @@ func newWriter(c *FCGIClient, recType uint8, reqId uint16) *bufWriter {
// streamWriter abstracts out the separation of a stream into discrete records. // streamWriter abstracts out the separation of a stream into discrete records.
// It only writes maxWrite bytes at a time. // It only writes maxWrite bytes at a time.
type streamWriter struct { type streamWriter struct {
c *FCGIClient c *conn
recType uint8 recType recType
reqId uint16 reqId uint16
} }
@ -273,22 +278,44 @@ func (w *streamWriter) Close() error {
return w.c.writeRecord(w.recType, w.reqId, nil) return w.c.writeRecord(w.recType, w.reqId, nil)
} }
func (client *FCGIClient) Request(env map[string]string, reqStr string) (retout []byte, reterr []byte, err error) { func NewClient(h string, args ...interface{}) (fcgi *conn, err error) {
var con net.Conn
if len(args) != 1 {
err = errors.New("fcgi: not enough params")
return
}
switch args[0].(type) {
case int:
addr := h + ":" + strconv.FormatInt(int64(args[0].(int)), 10)
con, err = net.Dial("tcp", addr)
case string:
laddr := net.UnixAddr{Name: args[0].(string), Net: h}
con, err = net.DialUnix(h, nil, &laddr)
default:
err = errors.New("fcgi: we only accept int (port) or string (socket) params.")
}
fcgi = &conn{
rwc: con,
}
return
}
var reqId uint16 = 1 func (client *conn) Request(env map[string]string, requestData string) (retout []byte, reterr []byte, err error) {
defer client.rwc.Close() defer client.rwc.Close()
var reqId uint16 = 1
err = client.writeBeginRequest(reqId, uint16(FCGI_RESPONDER), 0) err = client.writeBeginRequest(reqId, uint16(roleResponder), 0)
if err != nil { if err != nil {
return return
} }
err = client.writePairs(FCGI_PARAMS, reqId, env)
err = client.writePairs(typeParams, reqId, env)
if err != nil { if err != nil {
return return
} }
if len(reqStr) > 0 {
err = client.writeRecord(FCGI_STDIN, reqId, []byte(reqStr)) if len(requestData) > 0 {
if err != nil { if err = client.writeRecord(typeStdin, reqId, []byte(requestData)); err != nil {
return return
} }
} }
@ -297,23 +324,25 @@ func (client *FCGIClient) Request(env map[string]string, reqStr string) (retout
var err1 error var err1 error
// recive untill EOF or FCGI_END_REQUEST // recive untill EOF or FCGI_END_REQUEST
READ_LOOP:
for { for {
err1 = rec.read(client.rwc) err1 = rec.read(client.rwc)
if err1 != nil { if err1 != nil && strings.Contains(err1.Error(), "use of closed network connection") {
if err1 != io.EOF { if err1 != io.EOF {
err = err1 err = err1
} }
break break
} }
switch { switch {
case rec.h.Type == FCGI_STDOUT: case rec.h.Type == typeStdout:
retout = append(retout, rec.content()...) retout = append(retout, rec.content()...)
case rec.h.Type == FCGI_STDERR: case rec.h.Type == typeStderr:
reterr = append(reterr, rec.content()...) reterr = append(reterr, rec.content()...)
case rec.h.Type == FCGI_END_REQUEST: case rec.h.Type == typeEndRequest:
fallthrough fallthrough
default: default:
break break READ_LOOP
} }
} }

View File

@ -1,24 +1,34 @@
package phpfpm package phpfpm
import ( import (
"crypto/rand"
"encoding/binary"
"fmt" "fmt"
"net"
"net/http"
"net/http/fcgi"
"net/http/httptest"
"testing" "testing"
"github.com/influxdb/telegraf/testutil" "github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"net/http"
"net/http/httptest"
) )
func TestPhpFpmGeneratesMetrics(t *testing.T) { type statServer struct{}
//We create a fake server to return test data
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // We create a fake server to return test data
fmt.Fprint(w, outputSample) func (s statServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
})) w.Header().Set("Content-Type", "text/plain")
w.Header().Set("Content-Length", fmt.Sprint(len(outputSample)))
fmt.Fprint(w, outputSample)
}
func TestPhpFpmGeneratesMetrics_From_Http(t *testing.T) {
sv := statServer{}
ts := httptest.NewServer(sv)
defer ts.Close() defer ts.Close()
//Now we tested again above server, with our authentication data
r := &phpfpm{ r := &phpfpm{
Urls: []string{ts.URL}, Urls: []string{ts.URL},
} }
@ -29,7 +39,134 @@ func TestPhpFpmGeneratesMetrics(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
tags := map[string]string{ tags := map[string]string{
"url": ts.Listener.Addr().String(), "pool": "www",
}
fields := map[string]interface{}{
"accepted_conn": int64(3),
"listen_queue": int64(1),
"max_listen_queue": int64(0),
"listen_queue_len": int64(0),
"idle_processes": int64(1),
"active_processes": int64(1),
"total_processes": int64(2),
"max_active_processes": int64(1),
"max_children_reached": int64(2),
"slow_requests": int64(1),
}
acc.AssertContainsTaggedFields(t, "phpfpm", fields, tags)
}
func TestPhpFpmGeneratesMetrics_From_Fcgi(t *testing.T) {
// Let OS find an available port
tcp, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal("Cannot initalize test server")
}
defer tcp.Close()
s := statServer{}
go fcgi.Serve(tcp, s)
//Now we tested again above server
r := &phpfpm{
Urls: []string{"fcgi://" + tcp.Addr().String() + "/status"},
}
var acc testutil.Accumulator
err = r.Gather(&acc)
require.NoError(t, err)
tags := map[string]string{
"pool": "www",
}
fields := map[string]interface{}{
"accepted_conn": int64(3),
"listen_queue": int64(1),
"max_listen_queue": int64(0),
"listen_queue_len": int64(0),
"idle_processes": int64(1),
"active_processes": int64(1),
"total_processes": int64(2),
"max_active_processes": int64(1),
"max_children_reached": int64(2),
"slow_requests": int64(1),
}
acc.AssertContainsTaggedFields(t, "phpfpm", fields, tags)
}
func TestPhpFpmGeneratesMetrics_From_Socket(t *testing.T) {
// Create a socket in /tmp because we always have write permission and if the
// removing of socket fail when system restart /tmp is clear so
// we don't have junk files around
var randomNumber int64
binary.Read(rand.Reader, binary.LittleEndian, &randomNumber)
tcp, err := net.Listen("unix", fmt.Sprintf("/tmp/test-fpm%d.sock", randomNumber))
if err != nil {
t.Fatal("Cannot initalize server on port ")
}
defer tcp.Close()
s := statServer{}
go fcgi.Serve(tcp, s)
r := &phpfpm{
Urls: []string{tcp.Addr().String()},
}
var acc testutil.Accumulator
err = r.Gather(&acc)
require.NoError(t, err)
tags := map[string]string{
"pool": "www",
}
fields := map[string]interface{}{
"accepted_conn": int64(3),
"listen_queue": int64(1),
"max_listen_queue": int64(0),
"listen_queue_len": int64(0),
"idle_processes": int64(1),
"active_processes": int64(1),
"total_processes": int64(2),
"max_active_processes": int64(1),
"max_children_reached": int64(2),
"slow_requests": int64(1),
}
acc.AssertContainsTaggedFields(t, "phpfpm", fields, tags)
}
func TestPhpFpmGeneratesMetrics_From_Socket_Custom_Status_Path(t *testing.T) {
// Create a socket in /tmp because we always have write permission. If the
// removing of socket fail we won't have junk files around. Cuz when system
// restart, it clears out /tmp
var randomNumber int64
binary.Read(rand.Reader, binary.LittleEndian, &randomNumber)
tcp, err := net.Listen("unix", fmt.Sprintf("/tmp/test-fpm%d.sock", randomNumber))
if err != nil {
t.Fatal("Cannot initalize server on port ")
}
defer tcp.Close()
s := statServer{}
go fcgi.Serve(tcp, s)
r := &phpfpm{
Urls: []string{tcp.Addr().String() + ":custom-status-path"},
}
var acc testutil.Accumulator
err = r.Gather(&acc)
require.NoError(t, err)
tags := map[string]string{
"pool": "www", "pool": "www",
} }
@ -51,7 +188,7 @@ func TestPhpFpmGeneratesMetrics(t *testing.T) {
//When not passing server config, we default to localhost //When not passing server config, we default to localhost
//We just want to make sure we did request stat from localhost //We just want to make sure we did request stat from localhost
func TestHaproxyDefaultGetFromLocalhost(t *testing.T) { func TestPhpFpmDefaultGetFromLocalhost(t *testing.T) {
r := &phpfpm{} r := &phpfpm{}
var acc testutil.Accumulator var acc testutil.Accumulator
@ -61,6 +198,31 @@ func TestHaproxyDefaultGetFromLocalhost(t *testing.T) {
assert.Contains(t, err.Error(), "127.0.0.1/status") assert.Contains(t, err.Error(), "127.0.0.1/status")
} }
func TestPhpFpmGeneratesMetrics_Throw_Error_When_Fpm_Status_Is_Not_Responding(t *testing.T) {
r := &phpfpm{
Urls: []string{"http://aninvalidone"},
}
var acc testutil.Accumulator
err := r.Gather(&acc)
require.Error(t, err)
assert.Contains(t, err.Error(), `Unable to connect to phpfpm status page 'http://aninvalidone': Get http://aninvalidone: dial tcp: lookup aninvalidone`)
}
func TestPhpFpmGeneratesMetrics_Throw_Error_When_Socket_Path_Is_Invalid(t *testing.T) {
r := &phpfpm{
Urls: []string{"/tmp/invalid.sock"},
}
var acc testutil.Accumulator
err := r.Gather(&acc)
require.Error(t, err)
assert.Equal(t, `Socket doesn't exist '/tmp/invalid.sock': stat /tmp/invalid.sock: no such file or directory`, err.Error())
}
const outputSample = ` const outputSample = `
pool: www pool: www
process manager: dynamic process manager: dynamic