Fixup http write handler changes
This commit is contained in:
parent
4c670f719d
commit
c73964c12d
|
@ -7,7 +7,6 @@ import (
|
|||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -18,6 +17,8 @@ import (
|
|||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
)
|
||||
|
||||
const MAX_REQUEST_BODY_SIZE = 50 * 1024 * 1024
|
||||
|
||||
type HttpListener struct {
|
||||
ServiceAddress string
|
||||
ReadTimeout internal.Duration
|
||||
|
@ -124,6 +125,8 @@ func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
|
|||
res.Header().Set("X-Influxdb-Version", "1.0")
|
||||
res.WriteHeader(http.StatusBadRequest)
|
||||
res.Write([]byte(fmt.Sprintf(`{"error":"%s"}`, http400msg.String())))
|
||||
} else {
|
||||
res.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -138,32 +141,31 @@ func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
|
|||
body = b
|
||||
}
|
||||
|
||||
var bs []byte
|
||||
if cl := req.Header.Get("Content-length"); cl != "" {
|
||||
if l, err := strconv.Atoi(cl); err == nil {
|
||||
bs = make([]byte, 0, l)
|
||||
allocSize := 512
|
||||
if req.ContentLength < MAX_REQUEST_BODY_SIZE {
|
||||
allocSize = int(req.ContentLength)
|
||||
}
|
||||
}
|
||||
|
||||
buf := bytes.NewBuffer(bs)
|
||||
_, err := buf.ReadFrom(body)
|
||||
buf := bytes.NewBuffer(make([]byte, 0, allocSize))
|
||||
_, err := buf.ReadFrom(http.MaxBytesReader(res, body, MAX_REQUEST_BODY_SIZE))
|
||||
if err != nil {
|
||||
log.Printf("E! HttpListener unable to read request body. data: [%s], error: %s\n", string(buf.Bytes()), err.Error())
|
||||
log.Printf("E! HttpListener unable to read request body. error: %s\n", err.Error())
|
||||
http400msg.WriteString("HttpHandler unable to read from request body: " + err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
metrics, err := t.parser.Parse(buf.Bytes())
|
||||
if err != nil {
|
||||
log.Printf("E! HttpListener unable to parse metrics. data: [%s], error: %s \n", string(buf.Bytes()), err.Error())
|
||||
http400msg.WriteString("Error while parsing metrics: " + err.Error())
|
||||
return
|
||||
log.Printf("E! HttpListener unable to parse metrics. error: %s \n", err.Error())
|
||||
if len(metrics) == 0 {
|
||||
http400msg.WriteString(err.Error())
|
||||
} else {
|
||||
http400msg.WriteString("partial write: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
for _, m := range metrics {
|
||||
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
|
||||
}
|
||||
res.WriteHeader(http.StatusNoContent)
|
||||
case "/query":
|
||||
// Deliver a dummy response to the query endpoint, as some InfluxDB
|
||||
// clients test endpoint availability with a query
|
||||
|
|
|
@ -24,6 +24,14 @@ func (p *InfluxParser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
|||
// parse even if the buffer begins with a newline
|
||||
buf = bytes.TrimPrefix(buf, []byte("\n"))
|
||||
points, err := models.ParsePoints(buf)
|
||||
|
||||
if err != nil {
|
||||
if len(err.Error()) > 1024 {
|
||||
err = fmt.Errorf("Error parsing influx line-protocol (error truncated): %s",
|
||||
err.Error()[0:256])
|
||||
}
|
||||
}
|
||||
|
||||
metrics := make([]telegraf.Metric, len(points))
|
||||
for i, point := range points {
|
||||
for k, v := range p.DefaultTags {
|
||||
|
|
Loading…
Reference in New Issue