Break out fcgi code into orig Go files, don't ignore errs

closes #816
This commit is contained in:
Cameron Sparr 2016-03-09 13:11:59 +01:00
parent bd3d0c330f
commit 805db7ca50
5 changed files with 705 additions and 78 deletions

View File

@ -0,0 +1,331 @@
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package phpfpm
// This file implements FastCGI from the perspective of a child process.
import (
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/http/cgi"
"os"
"strings"
"sync"
"time"
)
// request holds the state for an in-progress request. As soon as it's complete,
// it's converted to an http.Request.
type request struct {
pw *io.PipeWriter
reqId uint16
params map[string]string
buf [1024]byte
rawParams []byte
keepConn bool
}
func newRequest(reqId uint16, flags uint8) *request {
r := &request{
reqId: reqId,
params: map[string]string{},
keepConn: flags&flagKeepConn != 0,
}
r.rawParams = r.buf[:0]
return r
}
// parseParams reads an encoded []byte into Params.
func (r *request) parseParams() {
text := r.rawParams
r.rawParams = nil
for len(text) > 0 {
keyLen, n := readSize(text)
if n == 0 {
return
}
text = text[n:]
valLen, n := readSize(text)
if n == 0 {
return
}
text = text[n:]
if int(keyLen)+int(valLen) > len(text) {
return
}
key := readString(text, keyLen)
text = text[keyLen:]
val := readString(text, valLen)
text = text[valLen:]
r.params[key] = val
}
}
// response implements http.ResponseWriter.
type response struct {
req *request
header http.Header
w *bufWriter
wroteHeader bool
}
func newResponse(c *child, req *request) *response {
return &response{
req: req,
header: http.Header{},
w: newWriter(c.conn, typeStdout, req.reqId),
}
}
func (r *response) Header() http.Header {
return r.header
}
func (r *response) Write(data []byte) (int, error) {
if !r.wroteHeader {
r.WriteHeader(http.StatusOK)
}
return r.w.Write(data)
}
func (r *response) WriteHeader(code int) {
if r.wroteHeader {
return
}
r.wroteHeader = true
if code == http.StatusNotModified {
// Must not have body.
r.header.Del("Content-Type")
r.header.Del("Content-Length")
r.header.Del("Transfer-Encoding")
} else if r.header.Get("Content-Type") == "" {
r.header.Set("Content-Type", "text/html; charset=utf-8")
}
if r.header.Get("Date") == "" {
r.header.Set("Date", time.Now().UTC().Format(http.TimeFormat))
}
fmt.Fprintf(r.w, "Status: %d %s\r\n", code, http.StatusText(code))
r.header.Write(r.w)
r.w.WriteString("\r\n")
}
func (r *response) Flush() {
if !r.wroteHeader {
r.WriteHeader(http.StatusOK)
}
r.w.Flush()
}
func (r *response) Close() error {
r.Flush()
return r.w.Close()
}
type child struct {
conn *conn
handler http.Handler
mu sync.Mutex // protects requests:
requests map[uint16]*request // keyed by request ID
}
func newChild(rwc io.ReadWriteCloser, handler http.Handler) *child {
return &child{
conn: newConn(rwc),
handler: handler,
requests: make(map[uint16]*request),
}
}
func (c *child) serve() {
defer c.conn.Close()
defer c.cleanUp()
var rec record
for {
if err := rec.read(c.conn.rwc); err != nil {
return
}
if err := c.handleRecord(&rec); err != nil {
return
}
}
}
var errCloseConn = errors.New("fcgi: connection should be closed")
var emptyBody = ioutil.NopCloser(strings.NewReader(""))
// ErrRequestAborted is returned by Read when a handler attempts to read the
// body of a request that has been aborted by the web server.
var ErrRequestAborted = errors.New("fcgi: request aborted by web server")
// ErrConnClosed is returned by Read when a handler attempts to read the body of
// a request after the connection to the web server has been closed.
var ErrConnClosed = errors.New("fcgi: connection to web server closed")
func (c *child) handleRecord(rec *record) error {
c.mu.Lock()
req, ok := c.requests[rec.h.Id]
c.mu.Unlock()
if !ok && rec.h.Type != typeBeginRequest && rec.h.Type != typeGetValues {
// The spec says to ignore unknown request IDs.
return nil
}
switch rec.h.Type {
case typeBeginRequest:
if req != nil {
// The server is trying to begin a request with the same ID
// as an in-progress request. This is an error.
return errors.New("fcgi: received ID that is already in-flight")
}
var br beginRequest
if err := br.read(rec.content()); err != nil {
return err
}
if br.role != roleResponder {
c.conn.writeEndRequest(rec.h.Id, 0, statusUnknownRole)
return nil
}
req = newRequest(rec.h.Id, br.flags)
c.mu.Lock()
c.requests[rec.h.Id] = req
c.mu.Unlock()
return nil
case typeParams:
// NOTE(eds): Technically a key-value pair can straddle the boundary
// between two packets. We buffer until we've received all parameters.
if len(rec.content()) > 0 {
req.rawParams = append(req.rawParams, rec.content()...)
return nil
}
req.parseParams()
return nil
case typeStdin:
content := rec.content()
if req.pw == nil {
var body io.ReadCloser
if len(content) > 0 {
// body could be an io.LimitReader, but it shouldn't matter
// as long as both sides are behaving.
body, req.pw = io.Pipe()
} else {
body = emptyBody
}
go c.serveRequest(req, body)
}
if len(content) > 0 {
// TODO(eds): This blocks until the handler reads from the pipe.
// If the handler takes a long time, it might be a problem.
req.pw.Write(content)
} else if req.pw != nil {
req.pw.Close()
}
return nil
case typeGetValues:
values := map[string]string{"FCGI_MPXS_CONNS": "1"}
c.conn.writePairs(typeGetValuesResult, 0, values)
return nil
case typeData:
// If the filter role is implemented, read the data stream here.
return nil
case typeAbortRequest:
c.mu.Lock()
delete(c.requests, rec.h.Id)
c.mu.Unlock()
c.conn.writeEndRequest(rec.h.Id, 0, statusRequestComplete)
if req.pw != nil {
req.pw.CloseWithError(ErrRequestAborted)
}
if !req.keepConn {
// connection will close upon return
return errCloseConn
}
return nil
default:
b := make([]byte, 8)
b[0] = byte(rec.h.Type)
c.conn.writeRecord(typeUnknownType, 0, b)
return nil
}
}
func (c *child) serveRequest(req *request, body io.ReadCloser) {
r := newResponse(c, req)
httpReq, err := cgi.RequestFromMap(req.params)
if err != nil {
// there was an error reading the request
r.WriteHeader(http.StatusInternalServerError)
c.conn.writeRecord(typeStderr, req.reqId, []byte(err.Error()))
} else {
httpReq.Body = body
c.handler.ServeHTTP(r, httpReq)
}
r.Close()
c.mu.Lock()
delete(c.requests, req.reqId)
c.mu.Unlock()
c.conn.writeEndRequest(req.reqId, 0, statusRequestComplete)
// Consume the entire body, so the host isn't still writing to
// us when we close the socket below in the !keepConn case,
// otherwise we'd send a RST. (golang.org/issue/4183)
// TODO(bradfitz): also bound this copy in time. Or send
// some sort of abort request to the host, so the host
// can properly cut off the client sending all the data.
// For now just bound it a little and
io.CopyN(ioutil.Discard, body, 100<<20)
body.Close()
if !req.keepConn {
c.conn.Close()
}
}
func (c *child) cleanUp() {
c.mu.Lock()
defer c.mu.Unlock()
for _, req := range c.requests {
if req.pw != nil {
// race with call to Close in c.serveRequest doesn't matter because
// Pipe(Reader|Writer).Close are idempotent
req.pw.CloseWithError(ErrConnClosed)
}
}
}
// Serve accepts incoming FastCGI connections on the listener l, creating a new
// goroutine for each. The goroutine reads requests and then calls handler
// to reply to them.
// If l is nil, Serve accepts connections from os.Stdin.
// If handler is nil, http.DefaultServeMux is used.
func Serve(l net.Listener, handler http.Handler) error {
if l == nil {
var err error
l, err = net.FileListener(os.Stdin)
if err != nil {
return err
}
defer l.Close()
}
if handler == nil {
handler = http.DefaultServeMux
}
for {
rw, err := l.Accept()
if err != nil {
return err
}
c := newChild(rw, handler)
go c.serve()
}
}

View File

@ -17,11 +17,6 @@ import (
"errors" "errors"
"io" "io"
"sync" "sync"
"net"
"strconv"
"strings"
) )
// recType is a record type, as defined by // recType is a record type, as defined by
@ -277,74 +272,3 @@ func (w *streamWriter) Close() error {
// send empty record to close the stream // send empty record to close the stream
return w.c.writeRecord(w.recType, w.reqId, nil) return w.c.writeRecord(w.recType, w.reqId, nil)
} }
func NewClient(h string, args ...interface{}) (fcgi *conn, err error) {
var con net.Conn
if len(args) != 1 {
err = errors.New("fcgi: not enough params")
return
}
switch args[0].(type) {
case int:
addr := h + ":" + strconv.FormatInt(int64(args[0].(int)), 10)
con, err = net.Dial("tcp", addr)
case string:
laddr := net.UnixAddr{Name: args[0].(string), Net: h}
con, err = net.DialUnix(h, nil, &laddr)
default:
err = errors.New("fcgi: we only accept int (port) or string (socket) params.")
}
fcgi = &conn{
rwc: con,
}
return
}
func (client *conn) Request(env map[string]string, requestData string) (retout []byte, reterr []byte, err error) {
defer client.rwc.Close()
var reqId uint16 = 1
err = client.writeBeginRequest(reqId, uint16(roleResponder), 0)
if err != nil {
return
}
err = client.writePairs(typeParams, reqId, env)
if err != nil {
return
}
if len(requestData) > 0 {
if err = client.writeRecord(typeStdin, reqId, []byte(requestData)); err != nil {
return
}
}
rec := &record{}
var err1 error
// recive untill EOF or FCGI_END_REQUEST
READ_LOOP:
for {
err1 = rec.read(client.rwc)
if err1 != nil && strings.Contains(err1.Error(), "use of closed network connection") {
if err1 != io.EOF {
err = err1
}
break
}
switch {
case rec.h.Type == typeStdout:
retout = append(retout, rec.content()...)
case rec.h.Type == typeStderr:
reterr = append(reterr, rec.content()...)
case rec.h.Type == typeEndRequest:
fallthrough
default:
break READ_LOOP
}
}
return
}

View File

@ -0,0 +1,86 @@
package phpfpm
import (
"errors"
"io"
"net"
"strconv"
"strings"
)
// Create an fcgi client
func newFcgiClient(h string, args ...interface{}) (*conn, error) {
var con net.Conn
if len(args) != 1 {
return nil, errors.New("fcgi: not enough params")
}
var err error
switch args[0].(type) {
case int:
addr := h + ":" + strconv.FormatInt(int64(args[0].(int)), 10)
con, err = net.Dial("tcp", addr)
case string:
laddr := net.UnixAddr{Name: args[0].(string), Net: h}
con, err = net.DialUnix(h, nil, &laddr)
default:
err = errors.New("fcgi: we only accept int (port) or string (socket) params.")
}
fcgi := &conn{
rwc: con,
}
return fcgi, err
}
func (client *conn) Request(
env map[string]string,
requestData string,
) (retout []byte, reterr []byte, err error) {
defer client.rwc.Close()
var reqId uint16 = 1
err = client.writeBeginRequest(reqId, uint16(roleResponder), 0)
if err != nil {
return
}
err = client.writePairs(typeParams, reqId, env)
if err != nil {
return
}
if len(requestData) > 0 {
if err = client.writeRecord(typeStdin, reqId, []byte(requestData)); err != nil {
return
}
}
rec := &record{}
var err1 error
// recive untill EOF or FCGI_END_REQUEST
READ_LOOP:
for {
err1 = rec.read(client.rwc)
if err1 != nil && strings.Contains(err1.Error(), "use of closed network connection") {
if err1 != io.EOF {
err = err1
}
break
}
switch {
case rec.h.Type == typeStdout:
retout = append(retout, rec.content()...)
case rec.h.Type == typeStderr:
reterr = append(reterr, rec.content()...)
case rec.h.Type == typeEndRequest:
fallthrough
default:
break READ_LOOP
}
}
return
}

View File

@ -0,0 +1,280 @@
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package phpfpm
import (
"bytes"
"errors"
"io"
"io/ioutil"
"net/http"
"testing"
)
var sizeTests = []struct {
size uint32
bytes []byte
}{
{0, []byte{0x00}},
{127, []byte{0x7F}},
{128, []byte{0x80, 0x00, 0x00, 0x80}},
{1000, []byte{0x80, 0x00, 0x03, 0xE8}},
{33554431, []byte{0x81, 0xFF, 0xFF, 0xFF}},
}
func TestSize(t *testing.T) {
b := make([]byte, 4)
for i, test := range sizeTests {
n := encodeSize(b, test.size)
if !bytes.Equal(b[:n], test.bytes) {
t.Errorf("%d expected %x, encoded %x", i, test.bytes, b)
}
size, n := readSize(test.bytes)
if size != test.size {
t.Errorf("%d expected %d, read %d", i, test.size, size)
}
if len(test.bytes) != n {
t.Errorf("%d did not consume all the bytes", i)
}
}
}
var streamTests = []struct {
desc string
recType recType
reqId uint16
content []byte
raw []byte
}{
{"single record", typeStdout, 1, nil,
[]byte{1, byte(typeStdout), 0, 1, 0, 0, 0, 0},
},
// this data will have to be split into two records
{"two records", typeStdin, 300, make([]byte, 66000),
bytes.Join([][]byte{
// header for the first record
{1, byte(typeStdin), 0x01, 0x2C, 0xFF, 0xFF, 1, 0},
make([]byte, 65536),
// header for the second
{1, byte(typeStdin), 0x01, 0x2C, 0x01, 0xD1, 7, 0},
make([]byte, 472),
// header for the empty record
{1, byte(typeStdin), 0x01, 0x2C, 0, 0, 0, 0},
},
nil),
},
}
type nilCloser struct {
io.ReadWriter
}
func (c *nilCloser) Close() error { return nil }
func TestStreams(t *testing.T) {
var rec record
outer:
for _, test := range streamTests {
buf := bytes.NewBuffer(test.raw)
var content []byte
for buf.Len() > 0 {
if err := rec.read(buf); err != nil {
t.Errorf("%s: error reading record: %v", test.desc, err)
continue outer
}
content = append(content, rec.content()...)
}
if rec.h.Type != test.recType {
t.Errorf("%s: got type %d expected %d", test.desc, rec.h.Type, test.recType)
continue
}
if rec.h.Id != test.reqId {
t.Errorf("%s: got request ID %d expected %d", test.desc, rec.h.Id, test.reqId)
continue
}
if !bytes.Equal(content, test.content) {
t.Errorf("%s: read wrong content", test.desc)
continue
}
buf.Reset()
c := newConn(&nilCloser{buf})
w := newWriter(c, test.recType, test.reqId)
if _, err := w.Write(test.content); err != nil {
t.Errorf("%s: error writing record: %v", test.desc, err)
continue
}
if err := w.Close(); err != nil {
t.Errorf("%s: error closing stream: %v", test.desc, err)
continue
}
if !bytes.Equal(buf.Bytes(), test.raw) {
t.Errorf("%s: wrote wrong content", test.desc)
}
}
}
type writeOnlyConn struct {
buf []byte
}
func (c *writeOnlyConn) Write(p []byte) (int, error) {
c.buf = append(c.buf, p...)
return len(p), nil
}
func (c *writeOnlyConn) Read(p []byte) (int, error) {
return 0, errors.New("conn is write-only")
}
func (c *writeOnlyConn) Close() error {
return nil
}
func TestGetValues(t *testing.T) {
var rec record
rec.h.Type = typeGetValues
wc := new(writeOnlyConn)
c := newChild(wc, nil)
err := c.handleRecord(&rec)
if err != nil {
t.Fatalf("handleRecord: %v", err)
}
const want = "\x01\n\x00\x00\x00\x12\x06\x00" +
"\x0f\x01FCGI_MPXS_CONNS1" +
"\x00\x00\x00\x00\x00\x00\x01\n\x00\x00\x00\x00\x00\x00"
if got := string(wc.buf); got != want {
t.Errorf(" got: %q\nwant: %q\n", got, want)
}
}
func nameValuePair11(nameData, valueData string) []byte {
return bytes.Join(
[][]byte{
{byte(len(nameData)), byte(len(valueData))},
[]byte(nameData),
[]byte(valueData),
},
nil,
)
}
func makeRecord(
recordType recType,
requestId uint16,
contentData []byte,
) []byte {
requestIdB1 := byte(requestId >> 8)
requestIdB0 := byte(requestId)
contentLength := len(contentData)
contentLengthB1 := byte(contentLength >> 8)
contentLengthB0 := byte(contentLength)
return bytes.Join([][]byte{
{1, byte(recordType), requestIdB1, requestIdB0, contentLengthB1,
contentLengthB0, 0, 0},
contentData,
},
nil)
}
// a series of FastCGI records that start a request and begin sending the
// request body
var streamBeginTypeStdin = bytes.Join([][]byte{
// set up request 1
makeRecord(typeBeginRequest, 1,
[]byte{0, byte(roleResponder), 0, 0, 0, 0, 0, 0}),
// add required parameters to request 1
makeRecord(typeParams, 1, nameValuePair11("REQUEST_METHOD", "GET")),
makeRecord(typeParams, 1, nameValuePair11("SERVER_PROTOCOL", "HTTP/1.1")),
makeRecord(typeParams, 1, nil),
// begin sending body of request 1
makeRecord(typeStdin, 1, []byte("0123456789abcdef")),
},
nil)
var cleanUpTests = []struct {
input []byte
err error
}{
// confirm that child.handleRecord closes req.pw after aborting req
{
bytes.Join([][]byte{
streamBeginTypeStdin,
makeRecord(typeAbortRequest, 1, nil),
},
nil),
ErrRequestAborted,
},
// confirm that child.serve closes all pipes after error reading record
{
bytes.Join([][]byte{
streamBeginTypeStdin,
nil,
},
nil),
ErrConnClosed,
},
}
type nopWriteCloser struct {
io.ReadWriter
}
func (nopWriteCloser) Close() error {
return nil
}
// Test that child.serve closes the bodies of aborted requests and closes the
// bodies of all requests before returning. Causes deadlock if either condition
// isn't met. See issue 6934.
func TestChildServeCleansUp(t *testing.T) {
for _, tt := range cleanUpTests {
input := make([]byte, len(tt.input))
copy(input, tt.input)
rc := nopWriteCloser{bytes.NewBuffer(input)}
done := make(chan bool)
c := newChild(rc, http.HandlerFunc(func(
w http.ResponseWriter,
r *http.Request,
) {
// block on reading body of request
_, err := io.Copy(ioutil.Discard, r.Body)
if err != tt.err {
t.Errorf("Expected %#v, got %#v", tt.err, err)
}
// not reached if body of request isn't closed
done <- true
}))
go c.serve()
// wait for body of request to be closed or all goroutines to block
<-done
}
}
type rwNopCloser struct {
io.Reader
io.Writer
}
func (rwNopCloser) Close() error {
return nil
}
// Verifies it doesn't crash. Issue 11824.
func TestMalformedParams(t *testing.T) {
input := []byte{
// beginRequest, requestId=1, contentLength=8, role=1, keepConn=1
1, 1, 0, 1, 0, 8, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0,
// params, requestId=1, contentLength=10, k1Len=50, v1Len=50 (malformed, wrong length)
1, 4, 0, 1, 0, 10, 0, 0, 50, 50, 3, 4, 5, 6, 7, 8, 9, 10,
// end of params
1, 4, 0, 1, 0, 0, 0, 0,
}
rw := rwNopCloser{bytes.NewReader(input), ioutil.Discard}
c := newChild(rw, http.DefaultServeMux)
c.serve()
}

View File

@ -112,6 +112,7 @@ func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error {
statusPath string statusPath string
) )
var err error
if strings.HasPrefix(addr, "fcgi://") || strings.HasPrefix(addr, "cgi://") { if strings.HasPrefix(addr, "fcgi://") || strings.HasPrefix(addr, "cgi://") {
u, err := url.Parse(addr) u, err := url.Parse(addr)
if err != nil { if err != nil {
@ -120,7 +121,7 @@ func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error {
socketAddr := strings.Split(u.Host, ":") socketAddr := strings.Split(u.Host, ":")
fcgiIp := socketAddr[0] fcgiIp := socketAddr[0]
fcgiPort, _ := strconv.Atoi(socketAddr[1]) fcgiPort, _ := strconv.Atoi(socketAddr[1])
fcgi, _ = NewClient(fcgiIp, fcgiPort) fcgi, err = newFcgiClient(fcgiIp, fcgiPort)
} else { } else {
socketAddr := strings.Split(addr, ":") socketAddr := strings.Split(addr, ":")
if len(socketAddr) >= 2 { if len(socketAddr) >= 2 {
@ -134,8 +135,13 @@ func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error {
if _, err := os.Stat(socketPath); os.IsNotExist(err) { if _, err := os.Stat(socketPath); os.IsNotExist(err) {
return fmt.Errorf("Socket doesn't exist '%s': %s", socketPath, err) return fmt.Errorf("Socket doesn't exist '%s': %s", socketPath, err)
} }
fcgi, _ = NewClient("unix", socketPath) fcgi, err = newFcgiClient("unix", socketPath)
} }
if err != nil {
return err
}
return g.gatherFcgi(fcgi, statusPath, acc) return g.gatherFcgi(fcgi, statusPath, acc)
} }