http_listener: pre-allocate buffer for reading request body
Fixes: #1823 and #1856
This commit is contained in:
parent
a65447d22e
commit
4c670f719d
|
@ -1,12 +1,13 @@
|
||||||
package http_listener
|
package http_listener
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"compress/gzip"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -117,31 +118,52 @@ func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
|
||||||
switch req.URL.Path {
|
switch req.URL.Path {
|
||||||
case "/write":
|
case "/write":
|
||||||
var http400msg bytes.Buffer
|
var http400msg bytes.Buffer
|
||||||
var partial string
|
defer func() {
|
||||||
scanner := bufio.NewScanner(req.Body)
|
if http400msg.Len() > 0 {
|
||||||
scanner.Buffer([]byte(""), 128*1024)
|
res.Header().Set("Content-Type", "application/json")
|
||||||
for scanner.Scan() {
|
res.Header().Set("X-Influxdb-Version", "1.0")
|
||||||
metrics, err := t.parser.Parse(scanner.Bytes())
|
res.WriteHeader(http.StatusBadRequest)
|
||||||
if err == nil {
|
res.Write([]byte(fmt.Sprintf(`{"error":"%s"}`, http400msg.String())))
|
||||||
for _, m := range metrics {
|
}
|
||||||
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
|
}()
|
||||||
}
|
|
||||||
partial = "partial write: "
|
body := req.Body
|
||||||
} else {
|
if req.Header.Get("Content-Encoding") == "gzip" {
|
||||||
|
b, err := gzip.NewReader(req.Body)
|
||||||
|
if err != nil {
|
||||||
http400msg.WriteString(err.Error() + " ")
|
http400msg.WriteString(err.Error() + " ")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer b.Close()
|
||||||
|
body = b
|
||||||
|
}
|
||||||
|
|
||||||
|
var bs []byte
|
||||||
|
if cl := req.Header.Get("Content-length"); cl != "" {
|
||||||
|
if l, err := strconv.Atoi(cl); err == nil {
|
||||||
|
bs = make([]byte, 0, l)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := scanner.Err(); err != nil {
|
buf := bytes.NewBuffer(bs)
|
||||||
http.Error(res, "Internal server error: "+err.Error(), http.StatusInternalServerError)
|
_, err := buf.ReadFrom(body)
|
||||||
} else if http400msg.Len() > 0 {
|
if err != nil {
|
||||||
res.Header().Set("Content-Type", "application/json")
|
log.Printf("E! HttpListener unable to read request body. data: [%s], error: %s\n", string(buf.Bytes()), err.Error())
|
||||||
res.Header().Set("X-Influxdb-Version", "1.0")
|
http400msg.WriteString("HttpHandler unable to read from request body: " + err.Error())
|
||||||
res.WriteHeader(http.StatusBadRequest)
|
return
|
||||||
res.Write([]byte(fmt.Sprintf(`{"error":"%s%s"}`, partial, http400msg.String())))
|
|
||||||
} else {
|
|
||||||
res.WriteHeader(http.StatusNoContent)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metrics, err := t.parser.Parse(buf.Bytes())
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("E! HttpListener unable to parse metrics. data: [%s], error: %s \n", string(buf.Bytes()), err.Error())
|
||||||
|
http400msg.WriteString("Error while parsing metrics: " + err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, m := range metrics {
|
||||||
|
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
|
||||||
|
}
|
||||||
|
res.WriteHeader(http.StatusNoContent)
|
||||||
case "/query":
|
case "/query":
|
||||||
// Deliver a dummy response to the query endpoint, as some InfluxDB
|
// Deliver a dummy response to the query endpoint, as some InfluxDB
|
||||||
// clients test endpoint availability with a query
|
// clients test endpoint availability with a query
|
||||||
|
|
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue