parent
52f4980bc2
commit
41e2c7f6da
|
@ -0,0 +1,331 @@
|
|||
// Copyright 2011 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package phpfpm
|
||||
|
||||
// This file implements FastCGI from the perspective of a child process.
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/cgi"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// request holds the state for an in-progress request. As soon as it's complete,
|
||||
// it's converted to an http.Request.
|
||||
type request struct {
|
||||
pw *io.PipeWriter
|
||||
reqId uint16
|
||||
params map[string]string
|
||||
buf [1024]byte
|
||||
rawParams []byte
|
||||
keepConn bool
|
||||
}
|
||||
|
||||
func newRequest(reqId uint16, flags uint8) *request {
|
||||
r := &request{
|
||||
reqId: reqId,
|
||||
params: map[string]string{},
|
||||
keepConn: flags&flagKeepConn != 0,
|
||||
}
|
||||
r.rawParams = r.buf[:0]
|
||||
return r
|
||||
}
|
||||
|
||||
// parseParams reads an encoded []byte into Params.
|
||||
func (r *request) parseParams() {
|
||||
text := r.rawParams
|
||||
r.rawParams = nil
|
||||
for len(text) > 0 {
|
||||
keyLen, n := readSize(text)
|
||||
if n == 0 {
|
||||
return
|
||||
}
|
||||
text = text[n:]
|
||||
valLen, n := readSize(text)
|
||||
if n == 0 {
|
||||
return
|
||||
}
|
||||
text = text[n:]
|
||||
if int(keyLen)+int(valLen) > len(text) {
|
||||
return
|
||||
}
|
||||
key := readString(text, keyLen)
|
||||
text = text[keyLen:]
|
||||
val := readString(text, valLen)
|
||||
text = text[valLen:]
|
||||
r.params[key] = val
|
||||
}
|
||||
}
|
||||
|
||||
// response implements http.ResponseWriter.
|
||||
type response struct {
|
||||
req *request
|
||||
header http.Header
|
||||
w *bufWriter
|
||||
wroteHeader bool
|
||||
}
|
||||
|
||||
func newResponse(c *child, req *request) *response {
|
||||
return &response{
|
||||
req: req,
|
||||
header: http.Header{},
|
||||
w: newWriter(c.conn, typeStdout, req.reqId),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *response) Header() http.Header {
|
||||
return r.header
|
||||
}
|
||||
|
||||
func (r *response) Write(data []byte) (int, error) {
|
||||
if !r.wroteHeader {
|
||||
r.WriteHeader(http.StatusOK)
|
||||
}
|
||||
return r.w.Write(data)
|
||||
}
|
||||
|
||||
func (r *response) WriteHeader(code int) {
|
||||
if r.wroteHeader {
|
||||
return
|
||||
}
|
||||
r.wroteHeader = true
|
||||
if code == http.StatusNotModified {
|
||||
// Must not have body.
|
||||
r.header.Del("Content-Type")
|
||||
r.header.Del("Content-Length")
|
||||
r.header.Del("Transfer-Encoding")
|
||||
} else if r.header.Get("Content-Type") == "" {
|
||||
r.header.Set("Content-Type", "text/html; charset=utf-8")
|
||||
}
|
||||
|
||||
if r.header.Get("Date") == "" {
|
||||
r.header.Set("Date", time.Now().UTC().Format(http.TimeFormat))
|
||||
}
|
||||
|
||||
fmt.Fprintf(r.w, "Status: %d %s\r\n", code, http.StatusText(code))
|
||||
r.header.Write(r.w)
|
||||
r.w.WriteString("\r\n")
|
||||
}
|
||||
|
||||
func (r *response) Flush() {
|
||||
if !r.wroteHeader {
|
||||
r.WriteHeader(http.StatusOK)
|
||||
}
|
||||
r.w.Flush()
|
||||
}
|
||||
|
||||
func (r *response) Close() error {
|
||||
r.Flush()
|
||||
return r.w.Close()
|
||||
}
|
||||
|
||||
type child struct {
|
||||
conn *conn
|
||||
handler http.Handler
|
||||
|
||||
mu sync.Mutex // protects requests:
|
||||
requests map[uint16]*request // keyed by request ID
|
||||
}
|
||||
|
||||
func newChild(rwc io.ReadWriteCloser, handler http.Handler) *child {
|
||||
return &child{
|
||||
conn: newConn(rwc),
|
||||
handler: handler,
|
||||
requests: make(map[uint16]*request),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *child) serve() {
|
||||
defer c.conn.Close()
|
||||
defer c.cleanUp()
|
||||
var rec record
|
||||
for {
|
||||
if err := rec.read(c.conn.rwc); err != nil {
|
||||
return
|
||||
}
|
||||
if err := c.handleRecord(&rec); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var errCloseConn = errors.New("fcgi: connection should be closed")
|
||||
|
||||
var emptyBody = ioutil.NopCloser(strings.NewReader(""))
|
||||
|
||||
// ErrRequestAborted is returned by Read when a handler attempts to read the
|
||||
// body of a request that has been aborted by the web server.
|
||||
var ErrRequestAborted = errors.New("fcgi: request aborted by web server")
|
||||
|
||||
// ErrConnClosed is returned by Read when a handler attempts to read the body of
|
||||
// a request after the connection to the web server has been closed.
|
||||
var ErrConnClosed = errors.New("fcgi: connection to web server closed")
|
||||
|
||||
func (c *child) handleRecord(rec *record) error {
|
||||
c.mu.Lock()
|
||||
req, ok := c.requests[rec.h.Id]
|
||||
c.mu.Unlock()
|
||||
if !ok && rec.h.Type != typeBeginRequest && rec.h.Type != typeGetValues {
|
||||
// The spec says to ignore unknown request IDs.
|
||||
return nil
|
||||
}
|
||||
|
||||
switch rec.h.Type {
|
||||
case typeBeginRequest:
|
||||
if req != nil {
|
||||
// The server is trying to begin a request with the same ID
|
||||
// as an in-progress request. This is an error.
|
||||
return errors.New("fcgi: received ID that is already in-flight")
|
||||
}
|
||||
|
||||
var br beginRequest
|
||||
if err := br.read(rec.content()); err != nil {
|
||||
return err
|
||||
}
|
||||
if br.role != roleResponder {
|
||||
c.conn.writeEndRequest(rec.h.Id, 0, statusUnknownRole)
|
||||
return nil
|
||||
}
|
||||
req = newRequest(rec.h.Id, br.flags)
|
||||
c.mu.Lock()
|
||||
c.requests[rec.h.Id] = req
|
||||
c.mu.Unlock()
|
||||
return nil
|
||||
case typeParams:
|
||||
// NOTE(eds): Technically a key-value pair can straddle the boundary
|
||||
// between two packets. We buffer until we've received all parameters.
|
||||
if len(rec.content()) > 0 {
|
||||
req.rawParams = append(req.rawParams, rec.content()...)
|
||||
return nil
|
||||
}
|
||||
req.parseParams()
|
||||
return nil
|
||||
case typeStdin:
|
||||
content := rec.content()
|
||||
if req.pw == nil {
|
||||
var body io.ReadCloser
|
||||
if len(content) > 0 {
|
||||
// body could be an io.LimitReader, but it shouldn't matter
|
||||
// as long as both sides are behaving.
|
||||
body, req.pw = io.Pipe()
|
||||
} else {
|
||||
body = emptyBody
|
||||
}
|
||||
go c.serveRequest(req, body)
|
||||
}
|
||||
if len(content) > 0 {
|
||||
// TODO(eds): This blocks until the handler reads from the pipe.
|
||||
// If the handler takes a long time, it might be a problem.
|
||||
req.pw.Write(content)
|
||||
} else if req.pw != nil {
|
||||
req.pw.Close()
|
||||
}
|
||||
return nil
|
||||
case typeGetValues:
|
||||
values := map[string]string{"FCGI_MPXS_CONNS": "1"}
|
||||
c.conn.writePairs(typeGetValuesResult, 0, values)
|
||||
return nil
|
||||
case typeData:
|
||||
// If the filter role is implemented, read the data stream here.
|
||||
return nil
|
||||
case typeAbortRequest:
|
||||
c.mu.Lock()
|
||||
delete(c.requests, rec.h.Id)
|
||||
c.mu.Unlock()
|
||||
c.conn.writeEndRequest(rec.h.Id, 0, statusRequestComplete)
|
||||
if req.pw != nil {
|
||||
req.pw.CloseWithError(ErrRequestAborted)
|
||||
}
|
||||
if !req.keepConn {
|
||||
// connection will close upon return
|
||||
return errCloseConn
|
||||
}
|
||||
return nil
|
||||
default:
|
||||
b := make([]byte, 8)
|
||||
b[0] = byte(rec.h.Type)
|
||||
c.conn.writeRecord(typeUnknownType, 0, b)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (c *child) serveRequest(req *request, body io.ReadCloser) {
|
||||
r := newResponse(c, req)
|
||||
httpReq, err := cgi.RequestFromMap(req.params)
|
||||
if err != nil {
|
||||
// there was an error reading the request
|
||||
r.WriteHeader(http.StatusInternalServerError)
|
||||
c.conn.writeRecord(typeStderr, req.reqId, []byte(err.Error()))
|
||||
} else {
|
||||
httpReq.Body = body
|
||||
c.handler.ServeHTTP(r, httpReq)
|
||||
}
|
||||
r.Close()
|
||||
c.mu.Lock()
|
||||
delete(c.requests, req.reqId)
|
||||
c.mu.Unlock()
|
||||
c.conn.writeEndRequest(req.reqId, 0, statusRequestComplete)
|
||||
|
||||
// Consume the entire body, so the host isn't still writing to
|
||||
// us when we close the socket below in the !keepConn case,
|
||||
// otherwise we'd send a RST. (golang.org/issue/4183)
|
||||
// TODO(bradfitz): also bound this copy in time. Or send
|
||||
// some sort of abort request to the host, so the host
|
||||
// can properly cut off the client sending all the data.
|
||||
// For now just bound it a little and
|
||||
io.CopyN(ioutil.Discard, body, 100<<20)
|
||||
body.Close()
|
||||
|
||||
if !req.keepConn {
|
||||
c.conn.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *child) cleanUp() {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
for _, req := range c.requests {
|
||||
if req.pw != nil {
|
||||
// race with call to Close in c.serveRequest doesn't matter because
|
||||
// Pipe(Reader|Writer).Close are idempotent
|
||||
req.pw.CloseWithError(ErrConnClosed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Serve accepts incoming FastCGI connections on the listener l, creating a new
|
||||
// goroutine for each. The goroutine reads requests and then calls handler
|
||||
// to reply to them.
|
||||
// If l is nil, Serve accepts connections from os.Stdin.
|
||||
// If handler is nil, http.DefaultServeMux is used.
|
||||
func Serve(l net.Listener, handler http.Handler) error {
|
||||
if l == nil {
|
||||
var err error
|
||||
l, err = net.FileListener(os.Stdin)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer l.Close()
|
||||
}
|
||||
if handler == nil {
|
||||
handler = http.DefaultServeMux
|
||||
}
|
||||
for {
|
||||
rw, err := l.Accept()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c := newChild(rw, handler)
|
||||
go c.serve()
|
||||
}
|
||||
}
|
|
@ -17,11 +17,6 @@ import (
|
|||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"net"
|
||||
"strconv"
|
||||
|
||||
"strings"
|
||||
)
|
||||
|
||||
// recType is a record type, as defined by
|
||||
|
@ -277,74 +272,3 @@ func (w *streamWriter) Close() error {
|
|||
// send empty record to close the stream
|
||||
return w.c.writeRecord(w.recType, w.reqId, nil)
|
||||
}
|
||||
|
||||
func NewClient(h string, args ...interface{}) (fcgi *conn, err error) {
|
||||
var con net.Conn
|
||||
if len(args) != 1 {
|
||||
err = errors.New("fcgi: not enough params")
|
||||
return
|
||||
}
|
||||
switch args[0].(type) {
|
||||
case int:
|
||||
addr := h + ":" + strconv.FormatInt(int64(args[0].(int)), 10)
|
||||
con, err = net.Dial("tcp", addr)
|
||||
case string:
|
||||
laddr := net.UnixAddr{Name: args[0].(string), Net: h}
|
||||
con, err = net.DialUnix(h, nil, &laddr)
|
||||
default:
|
||||
err = errors.New("fcgi: we only accept int (port) or string (socket) params.")
|
||||
}
|
||||
fcgi = &conn{
|
||||
rwc: con,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (client *conn) Request(env map[string]string, requestData string) (retout []byte, reterr []byte, err error) {
|
||||
defer client.rwc.Close()
|
||||
var reqId uint16 = 1
|
||||
|
||||
err = client.writeBeginRequest(reqId, uint16(roleResponder), 0)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = client.writePairs(typeParams, reqId, env)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if len(requestData) > 0 {
|
||||
if err = client.writeRecord(typeStdin, reqId, []byte(requestData)); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
rec := &record{}
|
||||
var err1 error
|
||||
|
||||
// recive untill EOF or FCGI_END_REQUEST
|
||||
READ_LOOP:
|
||||
for {
|
||||
err1 = rec.read(client.rwc)
|
||||
if err1 != nil && strings.Contains(err1.Error(), "use of closed network connection") {
|
||||
if err1 != io.EOF {
|
||||
err = err1
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
switch {
|
||||
case rec.h.Type == typeStdout:
|
||||
retout = append(retout, rec.content()...)
|
||||
case rec.h.Type == typeStderr:
|
||||
reterr = append(reterr, rec.content()...)
|
||||
case rec.h.Type == typeEndRequest:
|
||||
fallthrough
|
||||
default:
|
||||
break READ_LOOP
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
|
@ -0,0 +1,86 @@
|
|||
package phpfpm
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Create an fcgi client
|
||||
func newFcgiClient(h string, args ...interface{}) (*conn, error) {
|
||||
var con net.Conn
|
||||
if len(args) != 1 {
|
||||
return nil, errors.New("fcgi: not enough params")
|
||||
}
|
||||
|
||||
var err error
|
||||
switch args[0].(type) {
|
||||
case int:
|
||||
addr := h + ":" + strconv.FormatInt(int64(args[0].(int)), 10)
|
||||
con, err = net.Dial("tcp", addr)
|
||||
case string:
|
||||
laddr := net.UnixAddr{Name: args[0].(string), Net: h}
|
||||
con, err = net.DialUnix(h, nil, &laddr)
|
||||
default:
|
||||
err = errors.New("fcgi: we only accept int (port) or string (socket) params.")
|
||||
}
|
||||
fcgi := &conn{
|
||||
rwc: con,
|
||||
}
|
||||
|
||||
return fcgi, err
|
||||
}
|
||||
|
||||
func (client *conn) Request(
|
||||
env map[string]string,
|
||||
requestData string,
|
||||
) (retout []byte, reterr []byte, err error) {
|
||||
defer client.rwc.Close()
|
||||
var reqId uint16 = 1
|
||||
|
||||
err = client.writeBeginRequest(reqId, uint16(roleResponder), 0)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = client.writePairs(typeParams, reqId, env)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if len(requestData) > 0 {
|
||||
if err = client.writeRecord(typeStdin, reqId, []byte(requestData)); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
rec := &record{}
|
||||
var err1 error
|
||||
|
||||
// recive untill EOF or FCGI_END_REQUEST
|
||||
READ_LOOP:
|
||||
for {
|
||||
err1 = rec.read(client.rwc)
|
||||
if err1 != nil && strings.Contains(err1.Error(), "use of closed network connection") {
|
||||
if err1 != io.EOF {
|
||||
err = err1
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
switch {
|
||||
case rec.h.Type == typeStdout:
|
||||
retout = append(retout, rec.content()...)
|
||||
case rec.h.Type == typeStderr:
|
||||
reterr = append(reterr, rec.content()...)
|
||||
case rec.h.Type == typeEndRequest:
|
||||
fallthrough
|
||||
default:
|
||||
break READ_LOOP
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
|
@ -0,0 +1,280 @@
|
|||
// Copyright 2011 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package phpfpm
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"testing"
|
||||
)
|
||||
|
||||
var sizeTests = []struct {
|
||||
size uint32
|
||||
bytes []byte
|
||||
}{
|
||||
{0, []byte{0x00}},
|
||||
{127, []byte{0x7F}},
|
||||
{128, []byte{0x80, 0x00, 0x00, 0x80}},
|
||||
{1000, []byte{0x80, 0x00, 0x03, 0xE8}},
|
||||
{33554431, []byte{0x81, 0xFF, 0xFF, 0xFF}},
|
||||
}
|
||||
|
||||
func TestSize(t *testing.T) {
|
||||
b := make([]byte, 4)
|
||||
for i, test := range sizeTests {
|
||||
n := encodeSize(b, test.size)
|
||||
if !bytes.Equal(b[:n], test.bytes) {
|
||||
t.Errorf("%d expected %x, encoded %x", i, test.bytes, b)
|
||||
}
|
||||
size, n := readSize(test.bytes)
|
||||
if size != test.size {
|
||||
t.Errorf("%d expected %d, read %d", i, test.size, size)
|
||||
}
|
||||
if len(test.bytes) != n {
|
||||
t.Errorf("%d did not consume all the bytes", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var streamTests = []struct {
|
||||
desc string
|
||||
recType recType
|
||||
reqId uint16
|
||||
content []byte
|
||||
raw []byte
|
||||
}{
|
||||
{"single record", typeStdout, 1, nil,
|
||||
[]byte{1, byte(typeStdout), 0, 1, 0, 0, 0, 0},
|
||||
},
|
||||
// this data will have to be split into two records
|
||||
{"two records", typeStdin, 300, make([]byte, 66000),
|
||||
bytes.Join([][]byte{
|
||||
// header for the first record
|
||||
{1, byte(typeStdin), 0x01, 0x2C, 0xFF, 0xFF, 1, 0},
|
||||
make([]byte, 65536),
|
||||
// header for the second
|
||||
{1, byte(typeStdin), 0x01, 0x2C, 0x01, 0xD1, 7, 0},
|
||||
make([]byte, 472),
|
||||
// header for the empty record
|
||||
{1, byte(typeStdin), 0x01, 0x2C, 0, 0, 0, 0},
|
||||
},
|
||||
nil),
|
||||
},
|
||||
}
|
||||
|
||||
type nilCloser struct {
|
||||
io.ReadWriter
|
||||
}
|
||||
|
||||
func (c *nilCloser) Close() error { return nil }
|
||||
|
||||
func TestStreams(t *testing.T) {
|
||||
var rec record
|
||||
outer:
|
||||
for _, test := range streamTests {
|
||||
buf := bytes.NewBuffer(test.raw)
|
||||
var content []byte
|
||||
for buf.Len() > 0 {
|
||||
if err := rec.read(buf); err != nil {
|
||||
t.Errorf("%s: error reading record: %v", test.desc, err)
|
||||
continue outer
|
||||
}
|
||||
content = append(content, rec.content()...)
|
||||
}
|
||||
if rec.h.Type != test.recType {
|
||||
t.Errorf("%s: got type %d expected %d", test.desc, rec.h.Type, test.recType)
|
||||
continue
|
||||
}
|
||||
if rec.h.Id != test.reqId {
|
||||
t.Errorf("%s: got request ID %d expected %d", test.desc, rec.h.Id, test.reqId)
|
||||
continue
|
||||
}
|
||||
if !bytes.Equal(content, test.content) {
|
||||
t.Errorf("%s: read wrong content", test.desc)
|
||||
continue
|
||||
}
|
||||
buf.Reset()
|
||||
c := newConn(&nilCloser{buf})
|
||||
w := newWriter(c, test.recType, test.reqId)
|
||||
if _, err := w.Write(test.content); err != nil {
|
||||
t.Errorf("%s: error writing record: %v", test.desc, err)
|
||||
continue
|
||||
}
|
||||
if err := w.Close(); err != nil {
|
||||
t.Errorf("%s: error closing stream: %v", test.desc, err)
|
||||
continue
|
||||
}
|
||||
if !bytes.Equal(buf.Bytes(), test.raw) {
|
||||
t.Errorf("%s: wrote wrong content", test.desc)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type writeOnlyConn struct {
|
||||
buf []byte
|
||||
}
|
||||
|
||||
func (c *writeOnlyConn) Write(p []byte) (int, error) {
|
||||
c.buf = append(c.buf, p...)
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func (c *writeOnlyConn) Read(p []byte) (int, error) {
|
||||
return 0, errors.New("conn is write-only")
|
||||
}
|
||||
|
||||
func (c *writeOnlyConn) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestGetValues(t *testing.T) {
|
||||
var rec record
|
||||
rec.h.Type = typeGetValues
|
||||
|
||||
wc := new(writeOnlyConn)
|
||||
c := newChild(wc, nil)
|
||||
err := c.handleRecord(&rec)
|
||||
if err != nil {
|
||||
t.Fatalf("handleRecord: %v", err)
|
||||
}
|
||||
|
||||
const want = "\x01\n\x00\x00\x00\x12\x06\x00" +
|
||||
"\x0f\x01FCGI_MPXS_CONNS1" +
|
||||
"\x00\x00\x00\x00\x00\x00\x01\n\x00\x00\x00\x00\x00\x00"
|
||||
if got := string(wc.buf); got != want {
|
||||
t.Errorf(" got: %q\nwant: %q\n", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func nameValuePair11(nameData, valueData string) []byte {
|
||||
return bytes.Join(
|
||||
[][]byte{
|
||||
{byte(len(nameData)), byte(len(valueData))},
|
||||
[]byte(nameData),
|
||||
[]byte(valueData),
|
||||
},
|
||||
nil,
|
||||
)
|
||||
}
|
||||
|
||||
func makeRecord(
|
||||
recordType recType,
|
||||
requestId uint16,
|
||||
contentData []byte,
|
||||
) []byte {
|
||||
requestIdB1 := byte(requestId >> 8)
|
||||
requestIdB0 := byte(requestId)
|
||||
|
||||
contentLength := len(contentData)
|
||||
contentLengthB1 := byte(contentLength >> 8)
|
||||
contentLengthB0 := byte(contentLength)
|
||||
return bytes.Join([][]byte{
|
||||
{1, byte(recordType), requestIdB1, requestIdB0, contentLengthB1,
|
||||
contentLengthB0, 0, 0},
|
||||
contentData,
|
||||
},
|
||||
nil)
|
||||
}
|
||||
|
||||
// a series of FastCGI records that start a request and begin sending the
|
||||
// request body
|
||||
var streamBeginTypeStdin = bytes.Join([][]byte{
|
||||
// set up request 1
|
||||
makeRecord(typeBeginRequest, 1,
|
||||
[]byte{0, byte(roleResponder), 0, 0, 0, 0, 0, 0}),
|
||||
// add required parameters to request 1
|
||||
makeRecord(typeParams, 1, nameValuePair11("REQUEST_METHOD", "GET")),
|
||||
makeRecord(typeParams, 1, nameValuePair11("SERVER_PROTOCOL", "HTTP/1.1")),
|
||||
makeRecord(typeParams, 1, nil),
|
||||
// begin sending body of request 1
|
||||
makeRecord(typeStdin, 1, []byte("0123456789abcdef")),
|
||||
},
|
||||
nil)
|
||||
|
||||
var cleanUpTests = []struct {
|
||||
input []byte
|
||||
err error
|
||||
}{
|
||||
// confirm that child.handleRecord closes req.pw after aborting req
|
||||
{
|
||||
bytes.Join([][]byte{
|
||||
streamBeginTypeStdin,
|
||||
makeRecord(typeAbortRequest, 1, nil),
|
||||
},
|
||||
nil),
|
||||
ErrRequestAborted,
|
||||
},
|
||||
// confirm that child.serve closes all pipes after error reading record
|
||||
{
|
||||
bytes.Join([][]byte{
|
||||
streamBeginTypeStdin,
|
||||
nil,
|
||||
},
|
||||
nil),
|
||||
ErrConnClosed,
|
||||
},
|
||||
}
|
||||
|
||||
type nopWriteCloser struct {
|
||||
io.ReadWriter
|
||||
}
|
||||
|
||||
func (nopWriteCloser) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Test that child.serve closes the bodies of aborted requests and closes the
|
||||
// bodies of all requests before returning. Causes deadlock if either condition
|
||||
// isn't met. See issue 6934.
|
||||
func TestChildServeCleansUp(t *testing.T) {
|
||||
for _, tt := range cleanUpTests {
|
||||
input := make([]byte, len(tt.input))
|
||||
copy(input, tt.input)
|
||||
rc := nopWriteCloser{bytes.NewBuffer(input)}
|
||||
done := make(chan bool)
|
||||
c := newChild(rc, http.HandlerFunc(func(
|
||||
w http.ResponseWriter,
|
||||
r *http.Request,
|
||||
) {
|
||||
// block on reading body of request
|
||||
_, err := io.Copy(ioutil.Discard, r.Body)
|
||||
if err != tt.err {
|
||||
t.Errorf("Expected %#v, got %#v", tt.err, err)
|
||||
}
|
||||
// not reached if body of request isn't closed
|
||||
done <- true
|
||||
}))
|
||||
go c.serve()
|
||||
// wait for body of request to be closed or all goroutines to block
|
||||
<-done
|
||||
}
|
||||
}
|
||||
|
||||
type rwNopCloser struct {
|
||||
io.Reader
|
||||
io.Writer
|
||||
}
|
||||
|
||||
func (rwNopCloser) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Verifies it doesn't crash. Issue 11824.
|
||||
func TestMalformedParams(t *testing.T) {
|
||||
input := []byte{
|
||||
// beginRequest, requestId=1, contentLength=8, role=1, keepConn=1
|
||||
1, 1, 0, 1, 0, 8, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0,
|
||||
// params, requestId=1, contentLength=10, k1Len=50, v1Len=50 (malformed, wrong length)
|
||||
1, 4, 0, 1, 0, 10, 0, 0, 50, 50, 3, 4, 5, 6, 7, 8, 9, 10,
|
||||
// end of params
|
||||
1, 4, 0, 1, 0, 0, 0, 0,
|
||||
}
|
||||
rw := rwNopCloser{bytes.NewReader(input), ioutil.Discard}
|
||||
c := newChild(rw, http.DefaultServeMux)
|
||||
c.serve()
|
||||
}
|
|
@ -112,6 +112,7 @@ func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error {
|
|||
statusPath string
|
||||
)
|
||||
|
||||
var err error
|
||||
if strings.HasPrefix(addr, "fcgi://") || strings.HasPrefix(addr, "cgi://") {
|
||||
u, err := url.Parse(addr)
|
||||
if err != nil {
|
||||
|
@ -120,7 +121,7 @@ func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error {
|
|||
socketAddr := strings.Split(u.Host, ":")
|
||||
fcgiIp := socketAddr[0]
|
||||
fcgiPort, _ := strconv.Atoi(socketAddr[1])
|
||||
fcgi, _ = NewClient(fcgiIp, fcgiPort)
|
||||
fcgi, err = newFcgiClient(fcgiIp, fcgiPort)
|
||||
} else {
|
||||
socketAddr := strings.Split(addr, ":")
|
||||
if len(socketAddr) >= 2 {
|
||||
|
@ -134,8 +135,13 @@ func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error {
|
|||
if _, err := os.Stat(socketPath); os.IsNotExist(err) {
|
||||
return fmt.Errorf("Socket doesn't exist '%s': %s", socketPath, err)
|
||||
}
|
||||
fcgi, _ = NewClient("unix", socketPath)
|
||||
fcgi, err = newFcgiClient("unix", socketPath)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return g.gatherFcgi(fcgi, statusPath, acc)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue