Add http_listener_v2 input input plugin (#4755)
This commit is contained in:
parent
65f7e988bb
commit
0bb264536f
|
@ -41,6 +41,7 @@ import (
|
|||
_ "github.com/influxdata/telegraf/plugins/inputs/hddtemp"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/http"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/http_listener"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/http_listener_v2"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/http_response"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/httpjson"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/icinga2"
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
# Generic HTTP listener service input plugin
|
||||
|
||||
> NOTE: This is a new version of HTTP listener plugin.
|
||||
> This plugin supports all [data formats](/docs/DATA_FORMATS_INPUT.md) while the old [http_listener](/plugins/inputs/http_listener)
|
||||
> only accepts data in InfluxDB line-protocol only
|
||||
|
||||
The HTTP listener is a service input plugin that listens for messages sent via HTTP POST.
|
||||
|
||||
Enable TLS by specifying the file names of a service TLS certificate and key.
|
||||
|
||||
Enable mutually authenticated TLS and authorize client connections by signing certificate authority by including a list of allowed CA certificate file names in ````tls_allowed_cacerts````.
|
||||
|
||||
Enable basic HTTP authentication of clients by specifying a username and password to check for. These credentials will be received from the client _as plain text_ if TLS is not configured.
|
||||
|
||||
**Example:**
|
||||
```
|
||||
curl -i -XPOST 'http://localhost:8080/write' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000'
|
||||
```
|
||||
|
||||
### Configuration:
|
||||
|
||||
This is a sample configuration for the plugin.
|
||||
|
||||
```toml
|
||||
[[inputs.http_listener_v2]]
|
||||
## Address and port to host HTTP listener on
|
||||
service_address = ":8080"
|
||||
|
||||
## Path to listen to.
|
||||
path = "/telegraf"
|
||||
|
||||
## HTTP methods to accept.
|
||||
methods = ["POST", "PUT"]
|
||||
|
||||
## maximum duration before timing out read of the request
|
||||
read_timeout = "10s"
|
||||
## maximum duration before timing out write of the response
|
||||
write_timeout = "10s"
|
||||
|
||||
## Maximum allowed http request body size in bytes.
|
||||
## 0 means to use the default of 536,870,912 bytes (500 mebibytes)
|
||||
max_body_size = 0
|
||||
|
||||
## Set one or more allowed client CA certificate file names to
|
||||
## enable mutually authenticated TLS connections
|
||||
tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
|
||||
|
||||
## Add service certificate and key
|
||||
tls_cert = "/etc/telegraf/cert.pem"
|
||||
tls_key = "/etc/telegraf/key.pem"
|
||||
|
||||
## Optional username and password to accept for HTTP basic authentication.
|
||||
## You probably want to make sure you have TLS configured above for this.
|
||||
basic_username = "foobar"
|
||||
basic_password = "barfoo"
|
||||
|
||||
## Data format to consume.
|
||||
## Each data format has its own unique set of configuration options, read
|
||||
## more about them here:
|
||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||
data_format = "influx"
|
||||
```
|
|
@ -0,0 +1,274 @@
|
|||
package http_listener_v2
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"crypto/subtle"
|
||||
"crypto/tls"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
tlsint "github.com/influxdata/telegraf/internal/tls"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
)
|
||||
|
||||
// defaultMaxBodySize is the default maximum request body size, in bytes.
|
||||
// if the request body is over this size, we will return an HTTP 413 error.
|
||||
// 500 MB
|
||||
const defaultMaxBodySize = 500 * 1024 * 1024
|
||||
|
||||
type TimeFunc func() time.Time
|
||||
|
||||
type HTTPListenerV2 struct {
|
||||
ServiceAddress string
|
||||
Path string
|
||||
Methods []string
|
||||
ReadTimeout internal.Duration
|
||||
WriteTimeout internal.Duration
|
||||
MaxBodySize int64
|
||||
Port int
|
||||
|
||||
tlsint.ServerConfig
|
||||
|
||||
BasicUsername string
|
||||
BasicPassword string
|
||||
|
||||
TimeFunc
|
||||
|
||||
wg sync.WaitGroup
|
||||
|
||||
listener net.Listener
|
||||
|
||||
parsers.Parser
|
||||
acc telegraf.Accumulator
|
||||
}
|
||||
|
||||
const sampleConfig = `
|
||||
## Address and port to host HTTP listener on
|
||||
service_address = ":8080"
|
||||
|
||||
## Path to listen to.
|
||||
path = "/telegraf"
|
||||
|
||||
## HTTP methods to accept.
|
||||
methods = ["POST", "PUT"]
|
||||
|
||||
## maximum duration before timing out read of the request
|
||||
read_timeout = "10s"
|
||||
## maximum duration before timing out write of the response
|
||||
write_timeout = "10s"
|
||||
|
||||
## Maximum allowed http request body size in bytes.
|
||||
## 0 means to use the default of 536,870,912 bytes (500 mebibytes)
|
||||
max_body_size = 0
|
||||
|
||||
## Set one or more allowed client CA certificate file names to
|
||||
## enable mutually authenticated TLS connections
|
||||
tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
|
||||
|
||||
## Add service certificate and key
|
||||
tls_cert = "/etc/telegraf/cert.pem"
|
||||
tls_key = "/etc/telegraf/key.pem"
|
||||
|
||||
## Optional username and password to accept for HTTP basic authentication.
|
||||
## You probably want to make sure you have TLS configured above for this.
|
||||
basic_username = "foobar"
|
||||
basic_password = "barfoo"
|
||||
|
||||
## Data format to consume.
|
||||
## Each data format has its own unique set of configuration options, read
|
||||
## more about them here:
|
||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||
data_format = "influx"
|
||||
`
|
||||
|
||||
func (h *HTTPListenerV2) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
func (h *HTTPListenerV2) Description() string {
|
||||
return "Generic HTTP write listener"
|
||||
}
|
||||
|
||||
func (h *HTTPListenerV2) Gather(_ telegraf.Accumulator) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *HTTPListenerV2) SetParser(parser parsers.Parser) {
|
||||
h.Parser = parser
|
||||
}
|
||||
|
||||
// Start starts the http listener service.
|
||||
func (h *HTTPListenerV2) Start(acc telegraf.Accumulator) error {
|
||||
if h.MaxBodySize == 0 {
|
||||
h.MaxBodySize = defaultMaxBodySize
|
||||
}
|
||||
|
||||
if h.ReadTimeout.Duration < time.Second {
|
||||
h.ReadTimeout.Duration = time.Second * 10
|
||||
}
|
||||
if h.WriteTimeout.Duration < time.Second {
|
||||
h.WriteTimeout.Duration = time.Second * 10
|
||||
}
|
||||
|
||||
h.acc = acc
|
||||
|
||||
tlsConf, err := h.ServerConfig.TLSConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
server := &http.Server{
|
||||
Addr: h.ServiceAddress,
|
||||
Handler: h,
|
||||
ReadTimeout: h.ReadTimeout.Duration,
|
||||
WriteTimeout: h.WriteTimeout.Duration,
|
||||
TLSConfig: tlsConf,
|
||||
}
|
||||
|
||||
var listener net.Listener
|
||||
if tlsConf != nil {
|
||||
listener, err = tls.Listen("tcp", h.ServiceAddress, tlsConf)
|
||||
} else {
|
||||
listener, err = net.Listen("tcp", h.ServiceAddress)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
h.listener = listener
|
||||
h.Port = listener.Addr().(*net.TCPAddr).Port
|
||||
|
||||
h.wg.Add(1)
|
||||
go func() {
|
||||
defer h.wg.Done()
|
||||
server.Serve(h.listener)
|
||||
}()
|
||||
|
||||
log.Printf("I! Started HTTP listener V2 service on %s\n", h.ServiceAddress)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop cleans up all resources
|
||||
func (h *HTTPListenerV2) Stop() {
|
||||
h.listener.Close()
|
||||
h.wg.Wait()
|
||||
|
||||
log.Println("I! Stopped HTTP listener V2 service on ", h.ServiceAddress)
|
||||
}
|
||||
|
||||
func (h *HTTPListenerV2) ServeHTTP(res http.ResponseWriter, req *http.Request) {
|
||||
if req.URL.Path == h.Path {
|
||||
h.AuthenticateIfSet(h.serveWrite, res, req)
|
||||
} else {
|
||||
h.AuthenticateIfSet(http.NotFound, res, req)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *HTTPListenerV2) serveWrite(res http.ResponseWriter, req *http.Request) {
|
||||
// Check that the content length is not too large for us to handle.
|
||||
if req.ContentLength > h.MaxBodySize {
|
||||
tooLarge(res)
|
||||
return
|
||||
}
|
||||
|
||||
// Check if the requested HTTP method was specified in config.
|
||||
isAcceptedMethod := false
|
||||
for _, method := range h.Methods {
|
||||
if req.Method == method {
|
||||
isAcceptedMethod = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !isAcceptedMethod {
|
||||
methodNotAllowed(res)
|
||||
return
|
||||
}
|
||||
|
||||
// Handle gzip request bodies
|
||||
body := req.Body
|
||||
if req.Header.Get("Content-Encoding") == "gzip" {
|
||||
var err error
|
||||
body, err = gzip.NewReader(req.Body)
|
||||
if err != nil {
|
||||
log.Println("D! " + err.Error())
|
||||
badRequest(res)
|
||||
return
|
||||
}
|
||||
defer body.Close()
|
||||
}
|
||||
|
||||
body = http.MaxBytesReader(res, body, h.MaxBodySize)
|
||||
bytes, err := ioutil.ReadAll(body)
|
||||
if err != nil {
|
||||
tooLarge(res)
|
||||
return
|
||||
}
|
||||
|
||||
metrics, err := h.Parse(bytes)
|
||||
if err != nil {
|
||||
log.Println("D! " + err.Error())
|
||||
badRequest(res)
|
||||
return
|
||||
}
|
||||
for _, m := range metrics {
|
||||
h.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
|
||||
}
|
||||
res.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
func tooLarge(res http.ResponseWriter) {
|
||||
res.Header().Set("Content-Type", "application/json")
|
||||
res.WriteHeader(http.StatusRequestEntityTooLarge)
|
||||
res.Write([]byte(`{"error":"http: request body too large"}`))
|
||||
}
|
||||
|
||||
func methodNotAllowed(res http.ResponseWriter) {
|
||||
res.Header().Set("Content-Type", "application/json")
|
||||
res.WriteHeader(http.StatusMethodNotAllowed)
|
||||
res.Write([]byte(`{"error":"http: method not allowed"}`))
|
||||
}
|
||||
|
||||
func internalServerError(res http.ResponseWriter) {
|
||||
res.Header().Set("Content-Type", "application/json")
|
||||
res.WriteHeader(http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
func badRequest(res http.ResponseWriter) {
|
||||
res.Header().Set("Content-Type", "application/json")
|
||||
res.WriteHeader(http.StatusBadRequest)
|
||||
res.Write([]byte(`{"error":"http: bad request"}`))
|
||||
}
|
||||
|
||||
func (h *HTTPListenerV2) AuthenticateIfSet(handler http.HandlerFunc, res http.ResponseWriter, req *http.Request) {
|
||||
if h.BasicUsername != "" && h.BasicPassword != "" {
|
||||
reqUsername, reqPassword, ok := req.BasicAuth()
|
||||
if !ok ||
|
||||
subtle.ConstantTimeCompare([]byte(reqUsername), []byte(h.BasicUsername)) != 1 ||
|
||||
subtle.ConstantTimeCompare([]byte(reqPassword), []byte(h.BasicPassword)) != 1 {
|
||||
|
||||
http.Error(res, "Unauthorized.", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
handler(res, req)
|
||||
} else {
|
||||
handler(res, req)
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("http_listener_v2", func() telegraf.Input {
|
||||
return &HTTPListenerV2{
|
||||
ServiceAddress: ":8080",
|
||||
TimeFunc: time.Now,
|
||||
Path: "/telegraf",
|
||||
Methods: []string{"POST", "PUT"},
|
||||
}
|
||||
})
|
||||
}
|
File diff suppressed because one or more lines are too long
Binary file not shown.
Loading…
Reference in New Issue