http listener refactor

in this commit:

- chunks out the http request body to avoid making very large
  allocations.
- establishes a limit for the maximum http request body size that the
  listener will accept.
- utilizes a pool of byte buffers to reduce GC pressure.
This commit is contained in:
Cameron Sparr 2016-10-18 12:22:23 +01:00
parent babd37bf35
commit 097b1e09db
7 changed files with 287 additions and 167 deletions

View File

@ -1,6 +1,8 @@
package buffer package buffer
import ( import (
"sync"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
) )
@ -11,6 +13,8 @@ type Buffer struct {
drops int drops int
// total metrics added // total metrics added
total int total int
sync.Mutex
} }
// NewBuffer returns a Buffer // NewBuffer returns a Buffer
@ -61,11 +65,13 @@ func (b *Buffer) Add(metrics ...telegraf.Metric) {
// the batch will be of maximum length batchSize. It can be less than batchSize, // the batch will be of maximum length batchSize. It can be less than batchSize,
// if the length of Buffer is less than batchSize. // if the length of Buffer is less than batchSize.
func (b *Buffer) Batch(batchSize int) []telegraf.Metric { func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
b.Lock()
n := min(len(b.buf), batchSize) n := min(len(b.buf), batchSize)
out := make([]telegraf.Metric, n) out := make([]telegraf.Metric, n)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
out[i] = <-b.buf out[i] = <-b.buf
} }
b.Unlock()
return out return out
} }

View File

@ -0,0 +1,43 @@
package http_listener
import (
"sync/atomic"
)
type pool struct {
buffers chan []byte
size int
created int64
}
// NewPool returns a new pool object.
// n is the number of buffers
// bufSize is the size (in bytes) of each buffer
func NewPool(n, bufSize int) *pool {
return &pool{
buffers: make(chan []byte, n),
size: bufSize,
}
}
func (p *pool) get() []byte {
select {
case b := <-p.buffers:
return b
default:
atomic.AddInt64(&p.created, 1)
return make([]byte, p.size)
}
}
func (p *pool) put(b []byte) {
select {
case p.buffers <- b:
default:
}
}
func (p *pool) ncreated() int64 {
return atomic.LoadInt64(&p.created)
}

View File

@ -1,9 +1,9 @@
package http_listener package http_listener
import ( import (
"bufio"
"bytes" "bytes"
"fmt" "compress/gzip"
"io"
"log" "log"
"net" "net"
"net/http" "net/http"
@ -13,135 +13,138 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/inputs/http_listener/stoppableListener" "github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers" )
const (
// DEFAULT_MAX_BODY_SIZE is the default maximum request body size, in bytes.
// if the request body is over this size, we will return an HTTP 413 error.
// 500 MB
DEFAULT_MAX_BODY_SIZE = 500 * 1024 * 1024
// MAX_LINE_SIZE is the maximum size, in bytes, that can be allocated for
// a single InfluxDB point.
// 64 KB
DEFAULT_MAX_LINE_SIZE = 64 * 1024
) )
type HttpListener struct { type HttpListener struct {
ServiceAddress string ServiceAddress string
ReadTimeout internal.Duration ReadTimeout internal.Duration
WriteTimeout internal.Duration WriteTimeout internal.Duration
MaxBodySize int64
MaxLineSize int
sync.Mutex mu sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
listener *stoppableListener.StoppableListener listener net.Listener
parser parsers.Parser parser influx.InfluxParser
acc telegraf.Accumulator acc telegraf.Accumulator
pool *pool
} }
const sampleConfig = ` const sampleConfig = `
## Address and port to host HTTP listener on ## Address and port to host HTTP listener on
service_address = ":8186" service_address = ":8186"
## timeouts ## maximum duration before timing out read of the request
read_timeout = "10s" read_timeout = "10s"
## maximum duration before timing out write of the response
write_timeout = "10s" write_timeout = "10s"
## Maximum allowed http request body size in bytes.
## 0 means to use the default of 536,870,912 bytes (500 mebibytes)
max_body_size = 0
## Maximum line size allowed to be sent in bytes.
## 0 means to use the default of 65536 bytes (64 kibibytes)
max_line_size = 0
` `
func (t *HttpListener) SampleConfig() string { func (h *HttpListener) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (t *HttpListener) Description() string { func (h *HttpListener) Description() string {
return "Influx HTTP write listener" return "Influx HTTP write listener"
} }
func (t *HttpListener) Gather(_ telegraf.Accumulator) error { func (h *HttpListener) Gather(_ telegraf.Accumulator) error {
log.Printf("D! The http_listener has created %d buffers", h.pool.ncreated())
return nil return nil
} }
func (t *HttpListener) SetParser(parser parsers.Parser) {
t.parser = parser
}
// Start starts the http listener service. // Start starts the http listener service.
func (t *HttpListener) Start(acc telegraf.Accumulator) error { func (h *HttpListener) Start(acc telegraf.Accumulator) error {
t.Lock() h.mu.Lock()
defer t.Unlock() defer h.mu.Unlock()
h.parser = influx.InfluxParser{}
t.acc = acc if h.MaxBodySize == 0 {
h.MaxBodySize = DEFAULT_MAX_BODY_SIZE
}
if h.MaxLineSize == 0 {
h.MaxLineSize = DEFAULT_MAX_LINE_SIZE
}
var rawListener, err = net.Listen("tcp", t.ServiceAddress) h.acc = acc
if err != nil { h.pool = NewPool(200, h.MaxLineSize)
return err
} var listener, err = net.Listen("tcp", h.ServiceAddress)
t.listener, err = stoppableListener.New(rawListener)
if err != nil { if err != nil {
return err return err
} }
h.listener = listener
go t.httpListen() h.wg.Add(1)
go func() {
defer h.wg.Done()
h.httpListen()
}()
log.Printf("I! Started HTTP listener service on %s\n", t.ServiceAddress) log.Printf("I! Started HTTP listener service on %s\n", h.ServiceAddress)
return nil return nil
} }
// Stop cleans up all resources // Stop cleans up all resources
func (t *HttpListener) Stop() { func (h *HttpListener) Stop() {
t.Lock() h.mu.Lock()
defer t.Unlock() defer h.mu.Unlock()
t.listener.Stop() h.listener.Close()
t.listener.Close() h.wg.Wait()
t.wg.Wait() log.Println("I! Stopped HTTP listener service on ", h.ServiceAddress)
log.Println("I! Stopped HTTP listener service on ", t.ServiceAddress)
} }
// httpListen listens for HTTP requests. // httpListen sets up an http.Server and calls server.Serve.
func (t *HttpListener) httpListen() error { // like server.Serve, httpListen will always return a non-nil error, for this
if t.ReadTimeout.Duration < time.Second { // reason, the error returned should probably be ignored.
t.ReadTimeout.Duration = time.Second * 10 // see https://golang.org/pkg/net/http/#Server.Serve
func (h *HttpListener) httpListen() error {
if h.ReadTimeout.Duration < time.Second {
h.ReadTimeout.Duration = time.Second * 10
} }
if t.WriteTimeout.Duration < time.Second { if h.WriteTimeout.Duration < time.Second {
t.WriteTimeout.Duration = time.Second * 10 h.WriteTimeout.Duration = time.Second * 10
} }
var server = http.Server{ var server = http.Server{
Handler: t, Handler: h,
ReadTimeout: t.ReadTimeout.Duration, ReadTimeout: h.ReadTimeout.Duration,
WriteTimeout: t.WriteTimeout.Duration, WriteTimeout: h.WriteTimeout.Duration,
} }
return server.Serve(t.listener) return server.Serve(h.listener)
} }
func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { func (h *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
t.wg.Add(1)
defer t.wg.Done()
switch req.URL.Path { switch req.URL.Path {
case "/write": case "/write":
var http400msg bytes.Buffer h.serveWrite(res, req)
var partial string
scanner := bufio.NewScanner(req.Body)
scanner.Buffer([]byte(""), 128*1024)
for scanner.Scan() {
metrics, err := t.parser.Parse(scanner.Bytes())
if err == nil {
for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
partial = "partial write: "
} else {
http400msg.WriteString(err.Error() + " ")
}
}
if err := scanner.Err(); err != nil {
http.Error(res, "Internal server error: "+err.Error(), http.StatusInternalServerError)
} else if http400msg.Len() > 0 {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.WriteHeader(http.StatusBadRequest)
res.Write([]byte(fmt.Sprintf(`{"error":"%s%s"}`, partial, http400msg.String())))
} else {
res.WriteHeader(http.StatusNoContent)
}
case "/query": case "/query":
// Deliver a dummy response to the query endpoint, as some InfluxDB // Deliver a dummy response to the query endpoint, as some InfluxDB
// clients test endpoint availability with a query // clients test endpoint availability with a query
@ -158,8 +161,134 @@ func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
} }
} }
func (h *HttpListener) serveWrite(res http.ResponseWriter, req *http.Request) {
// Check that the content length is not too large for us to handle.
if req.ContentLength > h.MaxBodySize {
tooLarge(res)
return
}
now := time.Now()
// Handle gzip request bodies
body := req.Body
if req.Header.Get("Content-Encoding") == "gzip" {
body, err := gzip.NewReader(req.Body)
defer body.Close()
if err != nil {
log.Println("E! " + err.Error())
badRequest(res)
return
}
}
body = http.MaxBytesReader(res, body, h.MaxBodySize)
var return400 bool
var hangingBytes bool
buf := h.pool.get()
defer func() { h.pool.put(buf) }()
bufStart := 0
for {
n, err := io.ReadFull(body, buf[bufStart:])
if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
log.Println("E! " + err.Error())
// problem reading the request body
badRequest(res)
return
}
if err == io.EOF {
if return400 {
badRequest(res)
} else {
res.WriteHeader(http.StatusNoContent)
}
return
}
if hangingBytes {
i := bytes.IndexByte(buf, '\n')
if i == -1 {
// still didn't find a newline, keep scanning
continue
}
// rotate the bit remaining after the first newline to the front of the buffer
i++ // start copying after the newline
bufStart = len(buf) - i
if bufStart > 0 {
copy(buf, buf[i:])
}
hangingBytes = false
continue
}
if err == io.ErrUnexpectedEOF {
// finished reading the request body
if err := h.parse(buf[:n+bufStart], now); err != nil {
log.Println("E! " + err.Error())
return400 = true
}
if return400 {
badRequest(res)
} else {
res.WriteHeader(http.StatusNoContent)
}
return
}
// if we got down here it means that we filled our buffer, and there
// are still bytes remaining to be read. So we will parse up until the
// final newline, then push the rest of the bytes into the next buffer.
i := bytes.LastIndexByte(buf, '\n')
if i == -1 {
// drop any line longer than the max buffer size
log.Printf("E! http_listener received a single line longer than the maximum of %d bytes",
len(buf))
hangingBytes = true
return400 = true
bufStart = 0
continue
}
if err := h.parse(buf[:i], now); err != nil {
log.Println("E! " + err.Error())
return400 = true
}
// rotate the bit remaining after the last newline to the front of the buffer
i++ // start copying after the newline
bufStart = len(buf) - i
if bufStart > 0 {
copy(buf, buf[i:])
}
}
}
func (h *HttpListener) parse(b []byte, t time.Time) error {
metrics, err := h.parser.ParseWithDefaultTime(b, t)
for _, m := range metrics {
h.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
return err
}
func tooLarge(res http.ResponseWriter) {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.WriteHeader(http.StatusRequestEntityTooLarge)
res.Write([]byte(`{"error":"http: request body too large"}`))
}
func badRequest(res http.ResponseWriter) {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.WriteHeader(http.StatusBadRequest)
res.Write([]byte(`{"error":"http: bad request"}`))
}
func init() { func init() {
inputs.Add("http_listener", func() telegraf.Input { inputs.Add("http_listener", func() telegraf.Input {
return &HttpListener{} return &HttpListener{
ServiceAddress: ":8186",
}
}) })
} }

View File

@ -1,16 +1,15 @@
package http_listener package http_listener
import ( import (
"bytes"
"net/http"
"sync" "sync"
"testing" "testing"
"time" "time"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"bytes"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"net/http"
) )
const ( const (
@ -36,8 +35,6 @@ func newTestHttpListener() *HttpListener {
func TestWriteHTTP(t *testing.T) { func TestWriteHTTP(t *testing.T) {
listener := newTestHttpListener() listener := newTestHttpListener()
parser, _ := parsers.NewInfluxParser()
listener.SetParser(parser)
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
@ -71,10 +68,10 @@ func TestWriteHTTP(t *testing.T) {
) )
} }
// Post a gigantic metric to the listener: // Post a gigantic metric to the listener and verify that an error is returned:
resp, err = http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(hugeMetric))) resp, err = http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(hugeMetric)))
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, 204, resp.StatusCode) require.EqualValues(t, 400, resp.StatusCode)
time.Sleep(time.Millisecond * 15) time.Sleep(time.Millisecond * 15)
acc.AssertContainsTaggedFields(t, "cpu_load_short", acc.AssertContainsTaggedFields(t, "cpu_load_short",
@ -83,11 +80,27 @@ func TestWriteHTTP(t *testing.T) {
) )
} }
func TestWriteHTTPMaxLineSizeIncrease(t *testing.T) {
listener := &HttpListener{
ServiceAddress: ":8296",
MaxLineSize: 128 * 1000,
}
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
time.Sleep(time.Millisecond * 25)
// Post a gigantic metric to the listener and verify that an error is returned:
resp, err := http.Post("http://localhost:8296/write?db=mydb", "", bytes.NewBuffer([]byte(hugeMetric)))
require.NoError(t, err)
require.EqualValues(t, 204, resp.StatusCode)
}
// writes 25,000 metrics to the listener with 10 different writers // writes 25,000 metrics to the listener with 10 different writers
func TestWriteHTTPHighTraffic(t *testing.T) { func TestWriteHTTPHighTraffic(t *testing.T) {
listener := &HttpListener{ServiceAddress: ":8286"} listener := &HttpListener{ServiceAddress: ":8286"}
parser, _ := parsers.NewInfluxParser()
listener.SetParser(parser)
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
@ -118,7 +131,6 @@ func TestWriteHTTPHighTraffic(t *testing.T) {
func TestReceive404ForInvalidEndpoint(t *testing.T) { func TestReceive404ForInvalidEndpoint(t *testing.T) {
listener := newTestHttpListener() listener := newTestHttpListener()
listener.parser, _ = parsers.NewInfluxParser()
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
@ -136,7 +148,6 @@ func TestWriteHTTPInvalid(t *testing.T) {
time.Sleep(time.Millisecond * 250) time.Sleep(time.Millisecond * 250)
listener := newTestHttpListener() listener := newTestHttpListener()
listener.parser, _ = parsers.NewInfluxParser()
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
@ -154,7 +165,6 @@ func TestWriteHTTPEmpty(t *testing.T) {
time.Sleep(time.Millisecond * 250) time.Sleep(time.Millisecond * 250)
listener := newTestHttpListener() listener := newTestHttpListener()
listener.parser, _ = parsers.NewInfluxParser()
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
@ -172,7 +182,6 @@ func TestQueryAndPingHTTP(t *testing.T) {
time.Sleep(time.Millisecond * 250) time.Sleep(time.Millisecond * 250)
listener := newTestHttpListener() listener := newTestHttpListener()
listener.parser, _ = parsers.NewInfluxParser()
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))

View File

@ -1,10 +0,0 @@
Copyright (c) 2014, Eric Urban
All rights reserved.
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -1,62 +0,0 @@
package stoppableListener
import (
"errors"
"net"
"time"
)
type StoppableListener struct {
*net.TCPListener //Wrapped listener
stop chan int //Channel used only to indicate listener should shutdown
}
func New(l net.Listener) (*StoppableListener, error) {
tcpL, ok := l.(*net.TCPListener)
if !ok {
return nil, errors.New("Cannot wrap listener")
}
retval := &StoppableListener{}
retval.TCPListener = tcpL
retval.stop = make(chan int)
return retval, nil
}
var StoppedError = errors.New("Listener stopped")
func (sl *StoppableListener) Accept() (net.Conn, error) {
for {
//Wait up to one second for a new connection
sl.SetDeadline(time.Now().Add(time.Second))
newConn, err := sl.TCPListener.Accept()
//Check for the channel being closed
select {
case <-sl.stop:
return nil, StoppedError
default:
//If the channel is still open, continue as normal
}
if err != nil {
netErr, ok := err.(net.Error)
//If this is a timeout, then continue to wait for
//new connections
if ok && netErr.Timeout() && netErr.Temporary() {
continue
}
}
return newConn, err
}
}
func (sl *StoppableListener) Stop() {
close(sl.stop)
}

View File

@ -3,6 +3,7 @@ package influx
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
@ -15,15 +16,10 @@ type InfluxParser struct {
DefaultTags map[string]string DefaultTags map[string]string
} }
// Parse returns a slice of Metrics from a text representation of a func (p *InfluxParser) ParseWithDefaultTime(buf []byte, t time.Time) ([]telegraf.Metric, error) {
// metric (in line-protocol format)
// with each metric separated by newlines. If any metrics fail to parse,
// a non-nil error will be returned in addition to the metrics that parsed
// successfully.
func (p *InfluxParser) Parse(buf []byte) ([]telegraf.Metric, error) {
// parse even if the buffer begins with a newline // parse even if the buffer begins with a newline
buf = bytes.TrimPrefix(buf, []byte("\n")) buf = bytes.TrimPrefix(buf, []byte("\n"))
points, err := models.ParsePoints(buf) points, err := models.ParsePointsWithPrecision(buf, t, "n")
metrics := make([]telegraf.Metric, len(points)) metrics := make([]telegraf.Metric, len(points))
for i, point := range points { for i, point := range points {
for k, v := range p.DefaultTags { for k, v := range p.DefaultTags {
@ -39,6 +35,15 @@ func (p *InfluxParser) Parse(buf []byte) ([]telegraf.Metric, error) {
return metrics, err return metrics, err
} }
// Parse returns a slice of Metrics from a text representation of a
// metric (in line-protocol format)
// with each metric separated by newlines. If any metrics fail to parse,
// a non-nil error will be returned in addition to the metrics that parsed
// successfully.
func (p *InfluxParser) Parse(buf []byte) ([]telegraf.Metric, error) {
return p.ParseWithDefaultTime(buf, time.Now())
}
func (p *InfluxParser) ParseLine(line string) (telegraf.Metric, error) { func (p *InfluxParser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line + "\n")) metrics, err := p.Parse([]byte(line + "\n"))