Limit the maximum size of a single http body read

if the content-length of the http request is smaller than the maximum
    size, then we will read the entire body at once.

if the content-length of the http request is larger than the maximum
    size, then we will read the request body in chunks no larger than
    20MB at a time. This is to prevent the http handler creating huge
    allocations and potentially OOMing if a user posts a large file
    of metrics to the /write endpoint.
This commit is contained in:
Cameron Sparr 2016-10-14 15:35:27 +01:00
parent c73964c12d
commit dee15aa224
3 changed files with 155 additions and 44 deletions

View File

@ -3,9 +3,11 @@
The HTTP listener is a service input plugin that listens for messages sent via HTTP POST. The HTTP listener is a service input plugin that listens for messages sent via HTTP POST.
The plugin expects messages in the InfluxDB line-protocol ONLY, other Telegraf input data formats are not supported. The plugin expects messages in the InfluxDB line-protocol ONLY, other Telegraf input data formats are not supported.
The intent of the plugin is to allow Telegraf to serve as a proxy/router for the /write endpoint of the InfluxDB HTTP API. The intent of the plugin is to allow Telegraf to serve as a proxy/router for the /write endpoint of the InfluxDB HTTP API.
When chaining Telegraf instances using this plugin, CREATE DATABASE requests receive a 200 OK response with message body `{"results":[]}` but they are not relayed. The output configuration of the Telegraf instance which ultimately submits data to InfluxDB determines the destination database. When chaining Telegraf instances using this plugin, CREATE DATABASE requests receive a 200 OK response with message body `{"results":[]}` but they are not relayed.
The output configuration of the Telegraf instance which ultimately submits data to InfluxDB determines the destination database.
See: [Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#influx). See: [Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#influx).
Example: curl -i -XPOST 'http://localhost:8186/write' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000' Example: curl -i -XPOST 'http://localhost:8186/write' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000'
### Configuration: ### Configuration:
@ -19,6 +21,12 @@ This is a sample configuration for the plugin.
service_address = ":8186" service_address = ":8186"
## timeouts ## timeouts
## maximum duration before timing out read of the request
read_timeout = "10s" read_timeout = "10s"
## maximum duration before timing out write of the response
write_timeout = "10s" write_timeout = "10s"
## Maximum allowed http request body size in bytes.
## 0 means to use the default of 1,000,000,000 bytes (1 gigabyte)
max_body_size = 0
``` ```

View File

@ -1,9 +1,11 @@
package http_listener package http_listener
import ( import (
"bufio"
"bytes" "bytes"
"compress/gzip" "compress/gzip"
"fmt" "fmt"
"io"
"log" "log"
"net" "net"
"net/http" "net/http"
@ -17,15 +19,25 @@ import (
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
) )
const MAX_REQUEST_BODY_SIZE = 50 * 1024 * 1024 const (
// DEFAULT_REQUEST_BODY_MAX is the default maximum request body size, in bytes.
// if the request body is over this size, we will return an HTTP 413 error.
// 1 GB
DEFAULT_REQUEST_BODY_MAX = 1 * 1000 * 1000 * 1000
// MAX_ALLOCATION_SIZE is the maximum size, in bytes, of a single allocation
// of bytes that will be made handling a single HTTP request.
// 15 MB
MAX_ALLOCATION_SIZE = 10 * 1000 * 1000
)
type HttpListener struct { type HttpListener struct {
ServiceAddress string ServiceAddress string
ReadTimeout internal.Duration ReadTimeout internal.Duration
WriteTimeout internal.Duration WriteTimeout internal.Duration
MaxBodySize int64
sync.Mutex sync.Mutex
wg sync.WaitGroup
listener *stoppableListener.StoppableListener listener *stoppableListener.StoppableListener
@ -38,8 +50,14 @@ const sampleConfig = `
service_address = ":8186" service_address = ":8186"
## timeouts ## timeouts
## maximum duration before timing out read of the request
read_timeout = "10s" read_timeout = "10s"
## maximum duration before timing out write of the response
write_timeout = "10s" write_timeout = "10s"
## Maximum allowed http request body size in bytes.
## 0 means to use the default of 1,000,000,000 bytes (1 gigabyte)
max_body_size = 0
` `
func (t *HttpListener) SampleConfig() string { func (t *HttpListener) SampleConfig() string {
@ -63,6 +81,10 @@ func (t *HttpListener) Start(acc telegraf.Accumulator) error {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
if t.MaxBodySize == 0 {
t.MaxBodySize = DEFAULT_REQUEST_BODY_MAX
}
t.acc = acc t.acc = acc
var rawListener, err = net.Listen("tcp", t.ServiceAddress) var rawListener, err = net.Listen("tcp", t.ServiceAddress)
@ -89,8 +111,6 @@ func (t *HttpListener) Stop() {
t.listener.Stop() t.listener.Stop()
t.listener.Close() t.listener.Close()
t.wg.Wait()
log.Println("I! Stopped HTTP listener service on ", t.ServiceAddress) log.Println("I! Stopped HTTP listener service on ", t.ServiceAddress)
} }
@ -113,58 +133,112 @@ func (t *HttpListener) httpListen() error {
} }
func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
t.wg.Add(1)
defer t.wg.Done()
switch req.URL.Path { switch req.URL.Path {
case "/write": case "/write":
var http400msg bytes.Buffer var msg413 bytes.Buffer
var msg400 bytes.Buffer
defer func() { defer func() {
if http400msg.Len() > 0 { if msg413.Len() > 0 {
res.WriteHeader(http.StatusRequestEntityTooLarge)
res.Write([]byte(fmt.Sprintf(`{"error":"%s"}`, msg413.String())))
} else if msg400.Len() > 0 {
res.Header().Set("Content-Type", "application/json") res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0") res.Header().Set("X-Influxdb-Version", "1.0")
res.WriteHeader(http.StatusBadRequest) res.WriteHeader(http.StatusBadRequest)
res.Write([]byte(fmt.Sprintf(`{"error":"%s"}`, http400msg.String()))) res.Write([]byte(fmt.Sprintf(`{"error":"%s"}`, msg400.String())))
} else { } else {
res.WriteHeader(http.StatusNoContent) res.WriteHeader(http.StatusNoContent)
} }
}() }()
body := req.Body // Check that the content length is not too large for us to handle.
if req.Header.Get("Content-Encoding") == "gzip" { if req.ContentLength > t.MaxBodySize {
b, err := gzip.NewReader(req.Body) msg413.WriteString("http: request body too large")
if err != nil {
http400msg.WriteString(err.Error() + " ")
return
}
defer b.Close()
body = b
}
allocSize := 512
if req.ContentLength < MAX_REQUEST_BODY_SIZE {
allocSize = int(req.ContentLength)
}
buf := bytes.NewBuffer(make([]byte, 0, allocSize))
_, err := buf.ReadFrom(http.MaxBytesReader(res, body, MAX_REQUEST_BODY_SIZE))
if err != nil {
log.Printf("E! HttpListener unable to read request body. error: %s\n", err.Error())
http400msg.WriteString("HttpHandler unable to read from request body: " + err.Error())
return return
} }
metrics, err := t.parser.Parse(buf.Bytes()) // Handle gzip request bodies
if err != nil { var body io.ReadCloser
log.Printf("E! HttpListener unable to parse metrics. error: %s \n", err.Error()) var err error
if len(metrics) == 0 { if req.Header.Get("Content-Encoding") == "gzip" {
http400msg.WriteString(err.Error()) body, err = gzip.NewReader(http.MaxBytesReader(res, req.Body, t.MaxBodySize))
} else { if err != nil {
http400msg.WriteString("partial write: " + err.Error()) msg400.WriteString(err.Error() + " ")
return
}
} else {
body = http.MaxBytesReader(res, req.Body, t.MaxBodySize)
}
var buffer *bytes.Buffer
if req.ContentLength < MAX_ALLOCATION_SIZE {
// if the content length is less than the max allocation size, then
// read in the whole request at once:
buffer = bytes.NewBuffer(make([]byte, 0, req.ContentLength+1))
_, err := buffer.ReadFrom(body)
if err != nil {
msg := "E! "
if netErr, ok := err.(net.Error); ok {
if netErr.Timeout() {
msg += "Read timeout error, you may want to increase the read_timeout setting. "
}
}
log.Printf(msg + err.Error())
msg400.WriteString("Error reading request body: " + err.Error())
return
}
} else {
// If the body is larger than the max allocation size then set the
// maximum size of the buffer that we will allocate at a time.
// The following loop goes through the request body byte-by-byte.
// If there is a newline within 256 kilobytes of the end of the body
// we will attempt to parse metrics, reset the buffer, and continue.
buffer = bytes.NewBuffer(make([]byte, 0, MAX_ALLOCATION_SIZE))
reader := bufio.NewReader(body)
for {
b, err := reader.ReadByte()
if err != nil {
if err != io.EOF {
msg := "E! "
if netErr, ok := err.(net.Error); ok {
if netErr.Timeout() {
msg += "Read timeout error, you may want to increase the read_timeout setting. "
}
} else {
// if it's not an EOF or a net.Error, then it's almost certainly a
// tooLarge error coming from http.MaxBytesReader. It's unlikely
// that this code path will get hit because the client should
// be setting the ContentLength header, unless it's malicious.
msg413.WriteString(err.Error())
}
log.Printf(msg + err.Error())
return
}
break
}
// returned error is always nil:
// https://golang.org/pkg/bytes/#Buffer.WriteByte
buffer.WriteByte(b)
// if we have a newline and we're nearing the end of the buffer,
// do a write and continue with a fresh buffer.
if buffer.Len() > MAX_ALLOCATION_SIZE-256*1000 && b == '\n' {
t.parse(buffer.Bytes(), &msg400)
buffer.Reset()
} else if buffer.Len() == buffer.Cap() {
// we've reached the end of our buffer without finding a newline
// in the body, so we insert a newline here and attempt to parse.
if buffer.Len() == 0 {
continue
}
buffer.WriteByte('\n')
t.parse(buffer.Bytes(), &msg400)
buffer.Reset()
}
} }
} }
for _, m := range metrics { if buffer.Len() != 0 {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) t.parse(buffer.Bytes(), &msg400)
} }
case "/query": case "/query":
// Deliver a dummy response to the query endpoint, as some InfluxDB // Deliver a dummy response to the query endpoint, as some InfluxDB
@ -177,11 +251,25 @@ func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
// respond to ping requests // respond to ping requests
res.WriteHeader(http.StatusNoContent) res.WriteHeader(http.StatusNoContent)
default: default:
// Don't know how to respond to calls to other endpoints
http.NotFound(res, req) http.NotFound(res, req)
} }
} }
func (t *HttpListener) parse(b []byte, errmsg *bytes.Buffer) {
metrics, err := t.parser.Parse(b)
if err != nil {
if len(metrics) == 0 {
errmsg.WriteString(err.Error())
} else {
errmsg.WriteString("partial write: " + err.Error())
}
}
for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
}
func init() { func init() {
inputs.Add("http_listener", func() telegraf.Input { inputs.Add("http_listener", func() telegraf.Input {
return &HttpListener{} return &HttpListener{}

View File

@ -6,6 +6,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
@ -133,7 +134,11 @@ func TestWriteHTTPHighTraffic(t *testing.T) {
// // writes 5000 metrics to the listener with 10 different writers // // writes 5000 metrics to the listener with 10 different writers
func TestWriteHTTPHighBatchSize(t *testing.T) { func TestWriteHTTPHighBatchSize(t *testing.T) {
listener := &HttpListener{ServiceAddress: ":8287"} listener := &HttpListener{
ServiceAddress: ":8287",
ReadTimeout: internal.Duration{Duration: time.Second * 30},
WriteTimeout: internal.Duration{Duration: time.Second * 30},
}
parser, _ := parsers.NewInfluxParser() parser, _ := parsers.NewInfluxParser()
listener.SetParser(parser) listener.SetParser(parser)
@ -143,6 +148,11 @@ func TestWriteHTTPHighBatchSize(t *testing.T) {
time.Sleep(time.Millisecond * 25) time.Sleep(time.Millisecond * 25)
type result struct {
err error
resp *http.Response
}
results := make(chan *result, 10)
// post many messages to listener // post many messages to listener
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
@ -150,12 +160,17 @@ func TestWriteHTTPHighBatchSize(t *testing.T) {
go func() { go func() {
defer wg.Done() defer wg.Done()
resp, err := http.Post("http://localhost:8287/write?db=mydb", "", bytes.NewBuffer(makeMetricsBatch(5000))) resp, err := http.Post("http://localhost:8287/write?db=mydb", "", bytes.NewBuffer(makeMetricsBatch(5000)))
require.NoError(t, err) results <- &result{err: err, resp: resp}
require.EqualValues(t, 204, resp.StatusCode)
}() }()
} }
wg.Wait() wg.Wait()
close(results)
for result := range results {
require.NoError(t, result.err)
require.EqualValues(t, 204, result.resp.StatusCode)
}
time.Sleep(time.Millisecond * 50) time.Sleep(time.Millisecond * 50)
listener.Gather(acc) listener.Gather(acc)