diff --git a/plugins/inputs/http_listener/README.md b/plugins/inputs/http_listener/README.md index 9643f6a2e..61d4bbfb0 100644 --- a/plugins/inputs/http_listener/README.md +++ b/plugins/inputs/http_listener/README.md @@ -3,9 +3,11 @@ The HTTP listener is a service input plugin that listens for messages sent via HTTP POST. The plugin expects messages in the InfluxDB line-protocol ONLY, other Telegraf input data formats are not supported. The intent of the plugin is to allow Telegraf to serve as a proxy/router for the /write endpoint of the InfluxDB HTTP API. -When chaining Telegraf instances using this plugin, CREATE DATABASE requests receive a 200 OK response with message body `{"results":[]}` but they are not relayed. The output configuration of the Telegraf instance which ultimately submits data to InfluxDB determines the destination database. +When chaining Telegraf instances using this plugin, CREATE DATABASE requests receive a 200 OK response with message body `{"results":[]}` but they are not relayed. +The output configuration of the Telegraf instance which ultimately submits data to InfluxDB determines the destination database. See: [Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#influx). + Example: curl -i -XPOST 'http://localhost:8186/write' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000' ### Configuration: @@ -19,6 +21,12 @@ This is a sample configuration for the plugin. service_address = ":8186" ## timeouts + ## maximum duration before timing out read of the request read_timeout = "10s" + ## maximum duration before timing out write of the response write_timeout = "10s" + + ## Maximum allowed http request body size in bytes. + ## 0 means to use the default of 1,000,000,000 bytes (1 gigabyte) + max_body_size = 0 ``` diff --git a/plugins/inputs/http_listener/http_listener.go b/plugins/inputs/http_listener/http_listener.go index 7f4d4ad09..e7133cbb3 100644 --- a/plugins/inputs/http_listener/http_listener.go +++ b/plugins/inputs/http_listener/http_listener.go @@ -1,9 +1,11 @@ package http_listener import ( + "bufio" "bytes" "compress/gzip" "fmt" + "io" "log" "net" "net/http" @@ -17,15 +19,25 @@ import ( "github.com/influxdata/telegraf/plugins/parsers" ) -const MAX_REQUEST_BODY_SIZE = 50 * 1024 * 1024 +const ( + // DEFAULT_REQUEST_BODY_MAX is the default maximum request body size, in bytes. + // if the request body is over this size, we will return an HTTP 413 error. + // 1 GB + DEFAULT_REQUEST_BODY_MAX = 1 * 1000 * 1000 * 1000 + + // MAX_ALLOCATION_SIZE is the maximum size, in bytes, of a single allocation + // of bytes that will be made handling a single HTTP request. + // 15 MB + MAX_ALLOCATION_SIZE = 10 * 1000 * 1000 +) type HttpListener struct { ServiceAddress string ReadTimeout internal.Duration WriteTimeout internal.Duration + MaxBodySize int64 sync.Mutex - wg sync.WaitGroup listener *stoppableListener.StoppableListener @@ -38,8 +50,14 @@ const sampleConfig = ` service_address = ":8186" ## timeouts + ## maximum duration before timing out read of the request read_timeout = "10s" + ## maximum duration before timing out write of the response write_timeout = "10s" + + ## Maximum allowed http request body size in bytes. + ## 0 means to use the default of 1,000,000,000 bytes (1 gigabyte) + max_body_size = 0 ` func (t *HttpListener) SampleConfig() string { @@ -63,6 +81,10 @@ func (t *HttpListener) Start(acc telegraf.Accumulator) error { t.Lock() defer t.Unlock() + if t.MaxBodySize == 0 { + t.MaxBodySize = DEFAULT_REQUEST_BODY_MAX + } + t.acc = acc var rawListener, err = net.Listen("tcp", t.ServiceAddress) @@ -89,8 +111,6 @@ func (t *HttpListener) Stop() { t.listener.Stop() t.listener.Close() - t.wg.Wait() - log.Println("I! Stopped HTTP listener service on ", t.ServiceAddress) } @@ -113,58 +133,112 @@ func (t *HttpListener) httpListen() error { } func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { - t.wg.Add(1) - defer t.wg.Done() - switch req.URL.Path { case "/write": - var http400msg bytes.Buffer + var msg413 bytes.Buffer + var msg400 bytes.Buffer defer func() { - if http400msg.Len() > 0 { + if msg413.Len() > 0 { + res.WriteHeader(http.StatusRequestEntityTooLarge) + res.Write([]byte(fmt.Sprintf(`{"error":"%s"}`, msg413.String()))) + } else if msg400.Len() > 0 { res.Header().Set("Content-Type", "application/json") res.Header().Set("X-Influxdb-Version", "1.0") res.WriteHeader(http.StatusBadRequest) - res.Write([]byte(fmt.Sprintf(`{"error":"%s"}`, http400msg.String()))) + res.Write([]byte(fmt.Sprintf(`{"error":"%s"}`, msg400.String()))) } else { res.WriteHeader(http.StatusNoContent) } }() - body := req.Body - if req.Header.Get("Content-Encoding") == "gzip" { - b, err := gzip.NewReader(req.Body) - if err != nil { - http400msg.WriteString(err.Error() + " ") - return - } - defer b.Close() - body = b - } - - allocSize := 512 - if req.ContentLength < MAX_REQUEST_BODY_SIZE { - allocSize = int(req.ContentLength) - } - buf := bytes.NewBuffer(make([]byte, 0, allocSize)) - _, err := buf.ReadFrom(http.MaxBytesReader(res, body, MAX_REQUEST_BODY_SIZE)) - if err != nil { - log.Printf("E! HttpListener unable to read request body. error: %s\n", err.Error()) - http400msg.WriteString("HttpHandler unable to read from request body: " + err.Error()) + // Check that the content length is not too large for us to handle. + if req.ContentLength > t.MaxBodySize { + msg413.WriteString("http: request body too large") return } - metrics, err := t.parser.Parse(buf.Bytes()) - if err != nil { - log.Printf("E! HttpListener unable to parse metrics. error: %s \n", err.Error()) - if len(metrics) == 0 { - http400msg.WriteString(err.Error()) - } else { - http400msg.WriteString("partial write: " + err.Error()) + // Handle gzip request bodies + var body io.ReadCloser + var err error + if req.Header.Get("Content-Encoding") == "gzip" { + body, err = gzip.NewReader(http.MaxBytesReader(res, req.Body, t.MaxBodySize)) + if err != nil { + msg400.WriteString(err.Error() + " ") + return + } + } else { + body = http.MaxBytesReader(res, req.Body, t.MaxBodySize) + } + + var buffer *bytes.Buffer + if req.ContentLength < MAX_ALLOCATION_SIZE { + // if the content length is less than the max allocation size, then + // read in the whole request at once: + buffer = bytes.NewBuffer(make([]byte, 0, req.ContentLength+1)) + _, err := buffer.ReadFrom(body) + if err != nil { + msg := "E! " + if netErr, ok := err.(net.Error); ok { + if netErr.Timeout() { + msg += "Read timeout error, you may want to increase the read_timeout setting. " + } + } + log.Printf(msg + err.Error()) + msg400.WriteString("Error reading request body: " + err.Error()) + return + } + } else { + // If the body is larger than the max allocation size then set the + // maximum size of the buffer that we will allocate at a time. + // The following loop goes through the request body byte-by-byte. + // If there is a newline within 256 kilobytes of the end of the body + // we will attempt to parse metrics, reset the buffer, and continue. + buffer = bytes.NewBuffer(make([]byte, 0, MAX_ALLOCATION_SIZE)) + reader := bufio.NewReader(body) + for { + b, err := reader.ReadByte() + if err != nil { + if err != io.EOF { + msg := "E! " + if netErr, ok := err.(net.Error); ok { + if netErr.Timeout() { + msg += "Read timeout error, you may want to increase the read_timeout setting. " + } + } else { + // if it's not an EOF or a net.Error, then it's almost certainly a + // tooLarge error coming from http.MaxBytesReader. It's unlikely + // that this code path will get hit because the client should + // be setting the ContentLength header, unless it's malicious. + msg413.WriteString(err.Error()) + } + log.Printf(msg + err.Error()) + return + } + break + } + // returned error is always nil: + // https://golang.org/pkg/bytes/#Buffer.WriteByte + buffer.WriteByte(b) + // if we have a newline and we're nearing the end of the buffer, + // do a write and continue with a fresh buffer. + if buffer.Len() > MAX_ALLOCATION_SIZE-256*1000 && b == '\n' { + t.parse(buffer.Bytes(), &msg400) + buffer.Reset() + } else if buffer.Len() == buffer.Cap() { + // we've reached the end of our buffer without finding a newline + // in the body, so we insert a newline here and attempt to parse. + if buffer.Len() == 0 { + continue + } + buffer.WriteByte('\n') + t.parse(buffer.Bytes(), &msg400) + buffer.Reset() + } } } - for _, m := range metrics { - t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + if buffer.Len() != 0 { + t.parse(buffer.Bytes(), &msg400) } case "/query": // Deliver a dummy response to the query endpoint, as some InfluxDB @@ -177,11 +251,25 @@ func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { // respond to ping requests res.WriteHeader(http.StatusNoContent) default: - // Don't know how to respond to calls to other endpoints http.NotFound(res, req) } } +func (t *HttpListener) parse(b []byte, errmsg *bytes.Buffer) { + metrics, err := t.parser.Parse(b) + if err != nil { + if len(metrics) == 0 { + errmsg.WriteString(err.Error()) + } else { + errmsg.WriteString("partial write: " + err.Error()) + } + } + + for _, m := range metrics { + t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + } +} + func init() { inputs.Add("http_listener", func() telegraf.Input { return &HttpListener{} diff --git a/plugins/inputs/http_listener/http_listener_test.go b/plugins/inputs/http_listener/http_listener_test.go index 9da222441..e47f6573d 100644 --- a/plugins/inputs/http_listener/http_listener_test.go +++ b/plugins/inputs/http_listener/http_listener_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" @@ -133,7 +134,11 @@ func TestWriteHTTPHighTraffic(t *testing.T) { // // writes 5000 metrics to the listener with 10 different writers func TestWriteHTTPHighBatchSize(t *testing.T) { - listener := &HttpListener{ServiceAddress: ":8287"} + listener := &HttpListener{ + ServiceAddress: ":8287", + ReadTimeout: internal.Duration{Duration: time.Second * 30}, + WriteTimeout: internal.Duration{Duration: time.Second * 30}, + } parser, _ := parsers.NewInfluxParser() listener.SetParser(parser) @@ -143,6 +148,11 @@ func TestWriteHTTPHighBatchSize(t *testing.T) { time.Sleep(time.Millisecond * 25) + type result struct { + err error + resp *http.Response + } + results := make(chan *result, 10) // post many messages to listener var wg sync.WaitGroup for i := 0; i < 10; i++ { @@ -150,12 +160,17 @@ func TestWriteHTTPHighBatchSize(t *testing.T) { go func() { defer wg.Done() resp, err := http.Post("http://localhost:8287/write?db=mydb", "", bytes.NewBuffer(makeMetricsBatch(5000))) - require.NoError(t, err) - require.EqualValues(t, 204, resp.StatusCode) + results <- &result{err: err, resp: resp} }() } wg.Wait() + close(results) + for result := range results { + require.NoError(t, result.err) + require.EqualValues(t, 204, result.resp.StatusCode) + } + time.Sleep(time.Millisecond * 50) listener.Gather(acc)