From c73964c12deb78ffaa4c8036f8a7dce1e7187666 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Thu, 13 Oct 2016 16:36:35 +0100 Subject: [PATCH] Fixup http write handler changes --- plugins/inputs/http_listener/http_listener.go | 30 ++++++++++--------- plugins/parsers/influx/parser.go | 8 +++++ 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/plugins/inputs/http_listener/http_listener.go b/plugins/inputs/http_listener/http_listener.go index b5617bb8a..7f4d4ad09 100644 --- a/plugins/inputs/http_listener/http_listener.go +++ b/plugins/inputs/http_listener/http_listener.go @@ -7,7 +7,6 @@ import ( "log" "net" "net/http" - "strconv" "sync" "time" @@ -18,6 +17,8 @@ import ( "github.com/influxdata/telegraf/plugins/parsers" ) +const MAX_REQUEST_BODY_SIZE = 50 * 1024 * 1024 + type HttpListener struct { ServiceAddress string ReadTimeout internal.Duration @@ -124,6 +125,8 @@ func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { res.Header().Set("X-Influxdb-Version", "1.0") res.WriteHeader(http.StatusBadRequest) res.Write([]byte(fmt.Sprintf(`{"error":"%s"}`, http400msg.String()))) + } else { + res.WriteHeader(http.StatusNoContent) } }() @@ -138,32 +141,31 @@ func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { body = b } - var bs []byte - if cl := req.Header.Get("Content-length"); cl != "" { - if l, err := strconv.Atoi(cl); err == nil { - bs = make([]byte, 0, l) - } + allocSize := 512 + if req.ContentLength < MAX_REQUEST_BODY_SIZE { + allocSize = int(req.ContentLength) } - - buf := bytes.NewBuffer(bs) - _, err := buf.ReadFrom(body) + buf := bytes.NewBuffer(make([]byte, 0, allocSize)) + _, err := buf.ReadFrom(http.MaxBytesReader(res, body, MAX_REQUEST_BODY_SIZE)) if err != nil { - log.Printf("E! HttpListener unable to read request body. data: [%s], error: %s\n", string(buf.Bytes()), err.Error()) + log.Printf("E! HttpListener unable to read request body. error: %s\n", err.Error()) http400msg.WriteString("HttpHandler unable to read from request body: " + err.Error()) return } metrics, err := t.parser.Parse(buf.Bytes()) if err != nil { - log.Printf("E! HttpListener unable to parse metrics. data: [%s], error: %s \n", string(buf.Bytes()), err.Error()) - http400msg.WriteString("Error while parsing metrics: " + err.Error()) - return + log.Printf("E! HttpListener unable to parse metrics. error: %s \n", err.Error()) + if len(metrics) == 0 { + http400msg.WriteString(err.Error()) + } else { + http400msg.WriteString("partial write: " + err.Error()) + } } for _, m := range metrics { t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) } - res.WriteHeader(http.StatusNoContent) case "/query": // Deliver a dummy response to the query endpoint, as some InfluxDB // clients test endpoint availability with a query diff --git a/plugins/parsers/influx/parser.go b/plugins/parsers/influx/parser.go index 68b7497fe..e5ae7c7c2 100644 --- a/plugins/parsers/influx/parser.go +++ b/plugins/parsers/influx/parser.go @@ -24,6 +24,14 @@ func (p *InfluxParser) Parse(buf []byte) ([]telegraf.Metric, error) { // parse even if the buffer begins with a newline buf = bytes.TrimPrefix(buf, []byte("\n")) points, err := models.ParsePoints(buf) + + if err != nil { + if len(err.Error()) > 1024 { + err = fmt.Errorf("Error parsing influx line-protocol (error truncated): %s", + err.Error()[0:256]) + } + } + metrics := make([]telegraf.Metric, len(points)) for i, point := range points { for k, v := range p.DefaultTags {