From 805db7ca5071eaa2eb9275ca212dab10a230c7b9 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Wed, 9 Mar 2016 13:11:59 +0100 Subject: [PATCH] Break out fcgi code into orig Go files, don't ignore errs closes #816 --- plugins/inputs/phpfpm/child.go | 331 ++++++++++++++++++ .../inputs/phpfpm/{phpfpm_fcgi.go => fcgi.go} | 76 ---- plugins/inputs/phpfpm/fcgi_client.go | 86 +++++ plugins/inputs/phpfpm/fcgi_test.go | 280 +++++++++++++++ plugins/inputs/phpfpm/phpfpm.go | 10 +- 5 files changed, 705 insertions(+), 78 deletions(-) create mode 100644 plugins/inputs/phpfpm/child.go rename plugins/inputs/phpfpm/{phpfpm_fcgi.go => fcgi.go} (79%) create mode 100644 plugins/inputs/phpfpm/fcgi_client.go create mode 100644 plugins/inputs/phpfpm/fcgi_test.go diff --git a/plugins/inputs/phpfpm/child.go b/plugins/inputs/phpfpm/child.go new file mode 100644 index 000000000..2ebdf2ffb --- /dev/null +++ b/plugins/inputs/phpfpm/child.go @@ -0,0 +1,331 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package phpfpm + +// This file implements FastCGI from the perspective of a child process. + +import ( + "errors" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "net/http/cgi" + "os" + "strings" + "sync" + "time" +) + +// request holds the state for an in-progress request. As soon as it's complete, +// it's converted to an http.Request. +type request struct { + pw *io.PipeWriter + reqId uint16 + params map[string]string + buf [1024]byte + rawParams []byte + keepConn bool +} + +func newRequest(reqId uint16, flags uint8) *request { + r := &request{ + reqId: reqId, + params: map[string]string{}, + keepConn: flags&flagKeepConn != 0, + } + r.rawParams = r.buf[:0] + return r +} + +// parseParams reads an encoded []byte into Params. +func (r *request) parseParams() { + text := r.rawParams + r.rawParams = nil + for len(text) > 0 { + keyLen, n := readSize(text) + if n == 0 { + return + } + text = text[n:] + valLen, n := readSize(text) + if n == 0 { + return + } + text = text[n:] + if int(keyLen)+int(valLen) > len(text) { + return + } + key := readString(text, keyLen) + text = text[keyLen:] + val := readString(text, valLen) + text = text[valLen:] + r.params[key] = val + } +} + +// response implements http.ResponseWriter. +type response struct { + req *request + header http.Header + w *bufWriter + wroteHeader bool +} + +func newResponse(c *child, req *request) *response { + return &response{ + req: req, + header: http.Header{}, + w: newWriter(c.conn, typeStdout, req.reqId), + } +} + +func (r *response) Header() http.Header { + return r.header +} + +func (r *response) Write(data []byte) (int, error) { + if !r.wroteHeader { + r.WriteHeader(http.StatusOK) + } + return r.w.Write(data) +} + +func (r *response) WriteHeader(code int) { + if r.wroteHeader { + return + } + r.wroteHeader = true + if code == http.StatusNotModified { + // Must not have body. + r.header.Del("Content-Type") + r.header.Del("Content-Length") + r.header.Del("Transfer-Encoding") + } else if r.header.Get("Content-Type") == "" { + r.header.Set("Content-Type", "text/html; charset=utf-8") + } + + if r.header.Get("Date") == "" { + r.header.Set("Date", time.Now().UTC().Format(http.TimeFormat)) + } + + fmt.Fprintf(r.w, "Status: %d %s\r\n", code, http.StatusText(code)) + r.header.Write(r.w) + r.w.WriteString("\r\n") +} + +func (r *response) Flush() { + if !r.wroteHeader { + r.WriteHeader(http.StatusOK) + } + r.w.Flush() +} + +func (r *response) Close() error { + r.Flush() + return r.w.Close() +} + +type child struct { + conn *conn + handler http.Handler + + mu sync.Mutex // protects requests: + requests map[uint16]*request // keyed by request ID +} + +func newChild(rwc io.ReadWriteCloser, handler http.Handler) *child { + return &child{ + conn: newConn(rwc), + handler: handler, + requests: make(map[uint16]*request), + } +} + +func (c *child) serve() { + defer c.conn.Close() + defer c.cleanUp() + var rec record + for { + if err := rec.read(c.conn.rwc); err != nil { + return + } + if err := c.handleRecord(&rec); err != nil { + return + } + } +} + +var errCloseConn = errors.New("fcgi: connection should be closed") + +var emptyBody = ioutil.NopCloser(strings.NewReader("")) + +// ErrRequestAborted is returned by Read when a handler attempts to read the +// body of a request that has been aborted by the web server. +var ErrRequestAborted = errors.New("fcgi: request aborted by web server") + +// ErrConnClosed is returned by Read when a handler attempts to read the body of +// a request after the connection to the web server has been closed. +var ErrConnClosed = errors.New("fcgi: connection to web server closed") + +func (c *child) handleRecord(rec *record) error { + c.mu.Lock() + req, ok := c.requests[rec.h.Id] + c.mu.Unlock() + if !ok && rec.h.Type != typeBeginRequest && rec.h.Type != typeGetValues { + // The spec says to ignore unknown request IDs. + return nil + } + + switch rec.h.Type { + case typeBeginRequest: + if req != nil { + // The server is trying to begin a request with the same ID + // as an in-progress request. This is an error. + return errors.New("fcgi: received ID that is already in-flight") + } + + var br beginRequest + if err := br.read(rec.content()); err != nil { + return err + } + if br.role != roleResponder { + c.conn.writeEndRequest(rec.h.Id, 0, statusUnknownRole) + return nil + } + req = newRequest(rec.h.Id, br.flags) + c.mu.Lock() + c.requests[rec.h.Id] = req + c.mu.Unlock() + return nil + case typeParams: + // NOTE(eds): Technically a key-value pair can straddle the boundary + // between two packets. We buffer until we've received all parameters. + if len(rec.content()) > 0 { + req.rawParams = append(req.rawParams, rec.content()...) + return nil + } + req.parseParams() + return nil + case typeStdin: + content := rec.content() + if req.pw == nil { + var body io.ReadCloser + if len(content) > 0 { + // body could be an io.LimitReader, but it shouldn't matter + // as long as both sides are behaving. + body, req.pw = io.Pipe() + } else { + body = emptyBody + } + go c.serveRequest(req, body) + } + if len(content) > 0 { + // TODO(eds): This blocks until the handler reads from the pipe. + // If the handler takes a long time, it might be a problem. + req.pw.Write(content) + } else if req.pw != nil { + req.pw.Close() + } + return nil + case typeGetValues: + values := map[string]string{"FCGI_MPXS_CONNS": "1"} + c.conn.writePairs(typeGetValuesResult, 0, values) + return nil + case typeData: + // If the filter role is implemented, read the data stream here. + return nil + case typeAbortRequest: + c.mu.Lock() + delete(c.requests, rec.h.Id) + c.mu.Unlock() + c.conn.writeEndRequest(rec.h.Id, 0, statusRequestComplete) + if req.pw != nil { + req.pw.CloseWithError(ErrRequestAborted) + } + if !req.keepConn { + // connection will close upon return + return errCloseConn + } + return nil + default: + b := make([]byte, 8) + b[0] = byte(rec.h.Type) + c.conn.writeRecord(typeUnknownType, 0, b) + return nil + } +} + +func (c *child) serveRequest(req *request, body io.ReadCloser) { + r := newResponse(c, req) + httpReq, err := cgi.RequestFromMap(req.params) + if err != nil { + // there was an error reading the request + r.WriteHeader(http.StatusInternalServerError) + c.conn.writeRecord(typeStderr, req.reqId, []byte(err.Error())) + } else { + httpReq.Body = body + c.handler.ServeHTTP(r, httpReq) + } + r.Close() + c.mu.Lock() + delete(c.requests, req.reqId) + c.mu.Unlock() + c.conn.writeEndRequest(req.reqId, 0, statusRequestComplete) + + // Consume the entire body, so the host isn't still writing to + // us when we close the socket below in the !keepConn case, + // otherwise we'd send a RST. (golang.org/issue/4183) + // TODO(bradfitz): also bound this copy in time. Or send + // some sort of abort request to the host, so the host + // can properly cut off the client sending all the data. + // For now just bound it a little and + io.CopyN(ioutil.Discard, body, 100<<20) + body.Close() + + if !req.keepConn { + c.conn.Close() + } +} + +func (c *child) cleanUp() { + c.mu.Lock() + defer c.mu.Unlock() + for _, req := range c.requests { + if req.pw != nil { + // race with call to Close in c.serveRequest doesn't matter because + // Pipe(Reader|Writer).Close are idempotent + req.pw.CloseWithError(ErrConnClosed) + } + } +} + +// Serve accepts incoming FastCGI connections on the listener l, creating a new +// goroutine for each. The goroutine reads requests and then calls handler +// to reply to them. +// If l is nil, Serve accepts connections from os.Stdin. +// If handler is nil, http.DefaultServeMux is used. +func Serve(l net.Listener, handler http.Handler) error { + if l == nil { + var err error + l, err = net.FileListener(os.Stdin) + if err != nil { + return err + } + defer l.Close() + } + if handler == nil { + handler = http.DefaultServeMux + } + for { + rw, err := l.Accept() + if err != nil { + return err + } + c := newChild(rw, handler) + go c.serve() + } +} diff --git a/plugins/inputs/phpfpm/phpfpm_fcgi.go b/plugins/inputs/phpfpm/fcgi.go similarity index 79% rename from plugins/inputs/phpfpm/phpfpm_fcgi.go rename to plugins/inputs/phpfpm/fcgi.go index 03aac7634..689660ea0 100644 --- a/plugins/inputs/phpfpm/phpfpm_fcgi.go +++ b/plugins/inputs/phpfpm/fcgi.go @@ -17,11 +17,6 @@ import ( "errors" "io" "sync" - - "net" - "strconv" - - "strings" ) // recType is a record type, as defined by @@ -277,74 +272,3 @@ func (w *streamWriter) Close() error { // send empty record to close the stream return w.c.writeRecord(w.recType, w.reqId, nil) } - -func NewClient(h string, args ...interface{}) (fcgi *conn, err error) { - var con net.Conn - if len(args) != 1 { - err = errors.New("fcgi: not enough params") - return - } - switch args[0].(type) { - case int: - addr := h + ":" + strconv.FormatInt(int64(args[0].(int)), 10) - con, err = net.Dial("tcp", addr) - case string: - laddr := net.UnixAddr{Name: args[0].(string), Net: h} - con, err = net.DialUnix(h, nil, &laddr) - default: - err = errors.New("fcgi: we only accept int (port) or string (socket) params.") - } - fcgi = &conn{ - rwc: con, - } - return -} - -func (client *conn) Request(env map[string]string, requestData string) (retout []byte, reterr []byte, err error) { - defer client.rwc.Close() - var reqId uint16 = 1 - - err = client.writeBeginRequest(reqId, uint16(roleResponder), 0) - if err != nil { - return - } - - err = client.writePairs(typeParams, reqId, env) - if err != nil { - return - } - - if len(requestData) > 0 { - if err = client.writeRecord(typeStdin, reqId, []byte(requestData)); err != nil { - return - } - } - - rec := &record{} - var err1 error - - // recive untill EOF or FCGI_END_REQUEST -READ_LOOP: - for { - err1 = rec.read(client.rwc) - if err1 != nil && strings.Contains(err1.Error(), "use of closed network connection") { - if err1 != io.EOF { - err = err1 - } - break - } - - switch { - case rec.h.Type == typeStdout: - retout = append(retout, rec.content()...) - case rec.h.Type == typeStderr: - reterr = append(reterr, rec.content()...) - case rec.h.Type == typeEndRequest: - fallthrough - default: - break READ_LOOP - } - } - - return -} diff --git a/plugins/inputs/phpfpm/fcgi_client.go b/plugins/inputs/phpfpm/fcgi_client.go new file mode 100644 index 000000000..56978ad3a --- /dev/null +++ b/plugins/inputs/phpfpm/fcgi_client.go @@ -0,0 +1,86 @@ +package phpfpm + +import ( + "errors" + "io" + "net" + "strconv" + "strings" +) + +// Create an fcgi client +func newFcgiClient(h string, args ...interface{}) (*conn, error) { + var con net.Conn + if len(args) != 1 { + return nil, errors.New("fcgi: not enough params") + } + + var err error + switch args[0].(type) { + case int: + addr := h + ":" + strconv.FormatInt(int64(args[0].(int)), 10) + con, err = net.Dial("tcp", addr) + case string: + laddr := net.UnixAddr{Name: args[0].(string), Net: h} + con, err = net.DialUnix(h, nil, &laddr) + default: + err = errors.New("fcgi: we only accept int (port) or string (socket) params.") + } + fcgi := &conn{ + rwc: con, + } + + return fcgi, err +} + +func (client *conn) Request( + env map[string]string, + requestData string, +) (retout []byte, reterr []byte, err error) { + defer client.rwc.Close() + var reqId uint16 = 1 + + err = client.writeBeginRequest(reqId, uint16(roleResponder), 0) + if err != nil { + return + } + + err = client.writePairs(typeParams, reqId, env) + if err != nil { + return + } + + if len(requestData) > 0 { + if err = client.writeRecord(typeStdin, reqId, []byte(requestData)); err != nil { + return + } + } + + rec := &record{} + var err1 error + + // recive untill EOF or FCGI_END_REQUEST +READ_LOOP: + for { + err1 = rec.read(client.rwc) + if err1 != nil && strings.Contains(err1.Error(), "use of closed network connection") { + if err1 != io.EOF { + err = err1 + } + break + } + + switch { + case rec.h.Type == typeStdout: + retout = append(retout, rec.content()...) + case rec.h.Type == typeStderr: + reterr = append(reterr, rec.content()...) + case rec.h.Type == typeEndRequest: + fallthrough + default: + break READ_LOOP + } + } + + return +} diff --git a/plugins/inputs/phpfpm/fcgi_test.go b/plugins/inputs/phpfpm/fcgi_test.go new file mode 100644 index 000000000..15e0030a7 --- /dev/null +++ b/plugins/inputs/phpfpm/fcgi_test.go @@ -0,0 +1,280 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package phpfpm + +import ( + "bytes" + "errors" + "io" + "io/ioutil" + "net/http" + "testing" +) + +var sizeTests = []struct { + size uint32 + bytes []byte +}{ + {0, []byte{0x00}}, + {127, []byte{0x7F}}, + {128, []byte{0x80, 0x00, 0x00, 0x80}}, + {1000, []byte{0x80, 0x00, 0x03, 0xE8}}, + {33554431, []byte{0x81, 0xFF, 0xFF, 0xFF}}, +} + +func TestSize(t *testing.T) { + b := make([]byte, 4) + for i, test := range sizeTests { + n := encodeSize(b, test.size) + if !bytes.Equal(b[:n], test.bytes) { + t.Errorf("%d expected %x, encoded %x", i, test.bytes, b) + } + size, n := readSize(test.bytes) + if size != test.size { + t.Errorf("%d expected %d, read %d", i, test.size, size) + } + if len(test.bytes) != n { + t.Errorf("%d did not consume all the bytes", i) + } + } +} + +var streamTests = []struct { + desc string + recType recType + reqId uint16 + content []byte + raw []byte +}{ + {"single record", typeStdout, 1, nil, + []byte{1, byte(typeStdout), 0, 1, 0, 0, 0, 0}, + }, + // this data will have to be split into two records + {"two records", typeStdin, 300, make([]byte, 66000), + bytes.Join([][]byte{ + // header for the first record + {1, byte(typeStdin), 0x01, 0x2C, 0xFF, 0xFF, 1, 0}, + make([]byte, 65536), + // header for the second + {1, byte(typeStdin), 0x01, 0x2C, 0x01, 0xD1, 7, 0}, + make([]byte, 472), + // header for the empty record + {1, byte(typeStdin), 0x01, 0x2C, 0, 0, 0, 0}, + }, + nil), + }, +} + +type nilCloser struct { + io.ReadWriter +} + +func (c *nilCloser) Close() error { return nil } + +func TestStreams(t *testing.T) { + var rec record +outer: + for _, test := range streamTests { + buf := bytes.NewBuffer(test.raw) + var content []byte + for buf.Len() > 0 { + if err := rec.read(buf); err != nil { + t.Errorf("%s: error reading record: %v", test.desc, err) + continue outer + } + content = append(content, rec.content()...) + } + if rec.h.Type != test.recType { + t.Errorf("%s: got type %d expected %d", test.desc, rec.h.Type, test.recType) + continue + } + if rec.h.Id != test.reqId { + t.Errorf("%s: got request ID %d expected %d", test.desc, rec.h.Id, test.reqId) + continue + } + if !bytes.Equal(content, test.content) { + t.Errorf("%s: read wrong content", test.desc) + continue + } + buf.Reset() + c := newConn(&nilCloser{buf}) + w := newWriter(c, test.recType, test.reqId) + if _, err := w.Write(test.content); err != nil { + t.Errorf("%s: error writing record: %v", test.desc, err) + continue + } + if err := w.Close(); err != nil { + t.Errorf("%s: error closing stream: %v", test.desc, err) + continue + } + if !bytes.Equal(buf.Bytes(), test.raw) { + t.Errorf("%s: wrote wrong content", test.desc) + } + } +} + +type writeOnlyConn struct { + buf []byte +} + +func (c *writeOnlyConn) Write(p []byte) (int, error) { + c.buf = append(c.buf, p...) + return len(p), nil +} + +func (c *writeOnlyConn) Read(p []byte) (int, error) { + return 0, errors.New("conn is write-only") +} + +func (c *writeOnlyConn) Close() error { + return nil +} + +func TestGetValues(t *testing.T) { + var rec record + rec.h.Type = typeGetValues + + wc := new(writeOnlyConn) + c := newChild(wc, nil) + err := c.handleRecord(&rec) + if err != nil { + t.Fatalf("handleRecord: %v", err) + } + + const want = "\x01\n\x00\x00\x00\x12\x06\x00" + + "\x0f\x01FCGI_MPXS_CONNS1" + + "\x00\x00\x00\x00\x00\x00\x01\n\x00\x00\x00\x00\x00\x00" + if got := string(wc.buf); got != want { + t.Errorf(" got: %q\nwant: %q\n", got, want) + } +} + +func nameValuePair11(nameData, valueData string) []byte { + return bytes.Join( + [][]byte{ + {byte(len(nameData)), byte(len(valueData))}, + []byte(nameData), + []byte(valueData), + }, + nil, + ) +} + +func makeRecord( + recordType recType, + requestId uint16, + contentData []byte, +) []byte { + requestIdB1 := byte(requestId >> 8) + requestIdB0 := byte(requestId) + + contentLength := len(contentData) + contentLengthB1 := byte(contentLength >> 8) + contentLengthB0 := byte(contentLength) + return bytes.Join([][]byte{ + {1, byte(recordType), requestIdB1, requestIdB0, contentLengthB1, + contentLengthB0, 0, 0}, + contentData, + }, + nil) +} + +// a series of FastCGI records that start a request and begin sending the +// request body +var streamBeginTypeStdin = bytes.Join([][]byte{ + // set up request 1 + makeRecord(typeBeginRequest, 1, + []byte{0, byte(roleResponder), 0, 0, 0, 0, 0, 0}), + // add required parameters to request 1 + makeRecord(typeParams, 1, nameValuePair11("REQUEST_METHOD", "GET")), + makeRecord(typeParams, 1, nameValuePair11("SERVER_PROTOCOL", "HTTP/1.1")), + makeRecord(typeParams, 1, nil), + // begin sending body of request 1 + makeRecord(typeStdin, 1, []byte("0123456789abcdef")), +}, + nil) + +var cleanUpTests = []struct { + input []byte + err error +}{ + // confirm that child.handleRecord closes req.pw after aborting req + { + bytes.Join([][]byte{ + streamBeginTypeStdin, + makeRecord(typeAbortRequest, 1, nil), + }, + nil), + ErrRequestAborted, + }, + // confirm that child.serve closes all pipes after error reading record + { + bytes.Join([][]byte{ + streamBeginTypeStdin, + nil, + }, + nil), + ErrConnClosed, + }, +} + +type nopWriteCloser struct { + io.ReadWriter +} + +func (nopWriteCloser) Close() error { + return nil +} + +// Test that child.serve closes the bodies of aborted requests and closes the +// bodies of all requests before returning. Causes deadlock if either condition +// isn't met. See issue 6934. +func TestChildServeCleansUp(t *testing.T) { + for _, tt := range cleanUpTests { + input := make([]byte, len(tt.input)) + copy(input, tt.input) + rc := nopWriteCloser{bytes.NewBuffer(input)} + done := make(chan bool) + c := newChild(rc, http.HandlerFunc(func( + w http.ResponseWriter, + r *http.Request, + ) { + // block on reading body of request + _, err := io.Copy(ioutil.Discard, r.Body) + if err != tt.err { + t.Errorf("Expected %#v, got %#v", tt.err, err) + } + // not reached if body of request isn't closed + done <- true + })) + go c.serve() + // wait for body of request to be closed or all goroutines to block + <-done + } +} + +type rwNopCloser struct { + io.Reader + io.Writer +} + +func (rwNopCloser) Close() error { + return nil +} + +// Verifies it doesn't crash. Issue 11824. +func TestMalformedParams(t *testing.T) { + input := []byte{ + // beginRequest, requestId=1, contentLength=8, role=1, keepConn=1 + 1, 1, 0, 1, 0, 8, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, + // params, requestId=1, contentLength=10, k1Len=50, v1Len=50 (malformed, wrong length) + 1, 4, 0, 1, 0, 10, 0, 0, 50, 50, 3, 4, 5, 6, 7, 8, 9, 10, + // end of params + 1, 4, 0, 1, 0, 0, 0, 0, + } + rw := rwNopCloser{bytes.NewReader(input), ioutil.Discard} + c := newChild(rw, http.DefaultServeMux) + c.serve() +} diff --git a/plugins/inputs/phpfpm/phpfpm.go b/plugins/inputs/phpfpm/phpfpm.go index c07262342..199b0005b 100644 --- a/plugins/inputs/phpfpm/phpfpm.go +++ b/plugins/inputs/phpfpm/phpfpm.go @@ -112,6 +112,7 @@ func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error { statusPath string ) + var err error if strings.HasPrefix(addr, "fcgi://") || strings.HasPrefix(addr, "cgi://") { u, err := url.Parse(addr) if err != nil { @@ -120,7 +121,7 @@ func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error { socketAddr := strings.Split(u.Host, ":") fcgiIp := socketAddr[0] fcgiPort, _ := strconv.Atoi(socketAddr[1]) - fcgi, _ = NewClient(fcgiIp, fcgiPort) + fcgi, err = newFcgiClient(fcgiIp, fcgiPort) } else { socketAddr := strings.Split(addr, ":") if len(socketAddr) >= 2 { @@ -134,8 +135,13 @@ func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error { if _, err := os.Stat(socketPath); os.IsNotExist(err) { return fmt.Errorf("Socket doesn't exist '%s': %s", socketPath, err) } - fcgi, _ = NewClient("unix", socketPath) + fcgi, err = newFcgiClient("unix", socketPath) } + + if err != nil { + return err + } + return g.gatherFcgi(fcgi, statusPath, acc) }