Add formdata parser (#5749)

This commit is contained in:
Boris Yonchev
2019-06-17 23:34:54 +03:00
committed by Daniel Nelson
parent 1da81799cb
commit fd9abd2166
8 changed files with 551 additions and 26 deletions

View File

@@ -1,7 +1,7 @@
# HTTP Listener v2 Input Plugin
HTTP Listener v2 is a service input plugin that listens for metrics sent via
HTTP. Metrics may be sent in any supported [data format][data_format].
HTTP. Metrics may be sent in any supported [data format][data_format].
**Note:** The plugin previously known as `http_listener` has been renamed
`influxdb_listener`. If you would like Telegraf to act as a proxy/relay for
@@ -49,11 +49,17 @@ This is a sample configuration for the plugin.
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Part of the request to consume.
## Available options are "body" and "query".
## Note that the data source and data format are independent properties.
## To consume standard query params and POST forms - use "formdata" as a data_format.
# data_source = "body"
```
### Metrics:
Metrics are created from the request body and are dependant on the value of `data_format`.
Metrics are collected from the part of the request specified by the `data_source` param and are parsed depending on the value of `data_format`.
### Troubleshooting:
@@ -67,5 +73,10 @@ curl -i -XPOST 'http://localhost:8080/telegraf' --data-binary 'cpu_load_short,ho
curl -i -XPOST 'http://localhost:8080/telegraf' --data-binary '{"value1": 42, "value2": 42}'
```
**Send query params**
```
curl -i -XGET 'http://localhost:8080/telegraf?host=server01&value=0.42'
```
[data_format]: /docs/DATA_FORMATS_INPUT.md
[influxdb_listener]: /plugins/inputs/influxdb_listener/README.md

View File

@@ -8,6 +8,8 @@ import (
"log"
"net"
"net/http"
"net/url"
"strings"
"sync"
"time"
@@ -23,16 +25,25 @@ import (
// 500 MB
const defaultMaxBodySize = 500 * 1024 * 1024
const (
body = "body"
query = "query"
)
// TimeFunc provides a timestamp for the metrics
type TimeFunc func() time.Time
// HTTPListenerV2 is an input plugin that collects external metrics sent via HTTP
type HTTPListenerV2 struct {
ServiceAddress string
Path string
Methods []string
ReadTimeout internal.Duration
WriteTimeout internal.Duration
MaxBodySize internal.Size
Port int
DataSource string
ReadTimeout internal.Duration
WriteTimeout internal.Duration
MaxBodySize internal.Size
Port int
tlsint.ServerConfig
@@ -86,6 +97,12 @@ const sampleConfig = `
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Part of the request to consume.
## Available options are "body" and "query".
## Note that the data source and data format are independent properties.
## To consume standard query params and POST forms - use "formdata" as a data_format.
# data_source = "body"
`
func (h *HTTPListenerV2) SampleConfig() string {
@@ -164,11 +181,13 @@ func (h *HTTPListenerV2) Stop() {
}
func (h *HTTPListenerV2) ServeHTTP(res http.ResponseWriter, req *http.Request) {
if req.URL.Path == h.Path {
h.AuthenticateIfSet(h.serveWrite, res, req)
} else {
h.AuthenticateIfSet(http.NotFound, res, req)
handler := h.serveWrite
if req.URL.Path != h.Path {
handler = http.NotFound
}
h.authenticateIfSet(handler, res, req)
}
func (h *HTTPListenerV2) serveWrite(res http.ResponseWriter, req *http.Request) {
@@ -191,23 +210,17 @@ func (h *HTTPListenerV2) serveWrite(res http.ResponseWriter, req *http.Request)
return
}
// Handle gzip request bodies
body := req.Body
if req.Header.Get("Content-Encoding") == "gzip" {
var err error
body, err = gzip.NewReader(req.Body)
if err != nil {
log.Println("D! " + err.Error())
badRequest(res)
return
}
defer body.Close()
var bytes []byte
var ok bool
switch strings.ToLower(h.DataSource) {
case query:
bytes, ok = h.collectQuery(res, req)
default:
bytes, ok = h.collectBody(res, req)
}
body = http.MaxBytesReader(res, body, h.MaxBodySize.Size)
bytes, err := ioutil.ReadAll(body)
if err != nil {
tooLarge(res)
if !ok {
return
}
@@ -217,12 +230,52 @@ func (h *HTTPListenerV2) serveWrite(res http.ResponseWriter, req *http.Request)
badRequest(res)
return
}
for _, m := range metrics {
h.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
res.WriteHeader(http.StatusNoContent)
}
func (h *HTTPListenerV2) collectBody(res http.ResponseWriter, req *http.Request) ([]byte, bool) {
body := req.Body
// Handle gzip request bodies
if req.Header.Get("Content-Encoding") == "gzip" {
var err error
body, err = gzip.NewReader(req.Body)
if err != nil {
log.Println("D! " + err.Error())
badRequest(res)
return nil, false
}
defer body.Close()
}
body = http.MaxBytesReader(res, body, h.MaxBodySize.Size)
bytes, err := ioutil.ReadAll(body)
if err != nil {
tooLarge(res)
return nil, false
}
return bytes, true
}
func (h *HTTPListenerV2) collectQuery(res http.ResponseWriter, req *http.Request) ([]byte, bool) {
rawQuery := req.URL.RawQuery
query, err := url.QueryUnescape(rawQuery)
if err != nil {
log.Println("D! " + err.Error())
badRequest(res)
return nil, false
}
return []byte(query), true
}
func tooLarge(res http.ResponseWriter) {
res.Header().Set("Content-Type", "application/json")
res.WriteHeader(http.StatusRequestEntityTooLarge)
@@ -246,7 +299,7 @@ func badRequest(res http.ResponseWriter) {
res.Write([]byte(`{"error":"http: bad request"}`))
}
func (h *HTTPListenerV2) AuthenticateIfSet(handler http.HandlerFunc, res http.ResponseWriter, req *http.Request) {
func (h *HTTPListenerV2) authenticateIfSet(handler http.HandlerFunc, res http.ResponseWriter, req *http.Request) {
if h.BasicUsername != "" && h.BasicPassword != "" {
reqUsername, reqPassword, ok := req.BasicAuth()
if !ok ||
@@ -269,6 +322,7 @@ func init() {
TimeFunc: time.Now,
Path: "/telegraf",
Methods: []string{"POST", "PUT"},
DataSource: body,
}
})
}

File diff suppressed because one or more lines are too long