TLS and MTLS enhancements to HTTPListener input plugin (#3191)

This commit is contained in:
Jeff Nickoloff 2017-09-08 16:01:16 -07:00 committed by Daniel Nelson
parent 247c2e71fd
commit c809debfd4
3 changed files with 357 additions and 41 deletions

View File

@ -8,6 +8,10 @@ The `/write` endpoint supports the `precision` query parameter and can be set to
When chaining Telegraf instances using this plugin, CREATE DATABASE requests receive a 200 OK response with message body `{"results":[]}` but they are not relayed. The output configuration of the Telegraf instance which ultimately submits data to InfluxDB determines the destination database. When chaining Telegraf instances using this plugin, CREATE DATABASE requests receive a 200 OK response with message body `{"results":[]}` but they are not relayed. The output configuration of the Telegraf instance which ultimately submits data to InfluxDB determines the destination database.
Enable TLS by specifying the file names of a service TLS certificate and key.
Enable mutually authenticated TLS and authorize client connections by signing certificate authority by including a list of allowed CA certificate file names in ````tls_allowed_cacerts````.
See: [Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#influx). See: [Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#influx).
**Example:** **Example:**
@ -28,4 +32,11 @@ This is a sample configuration for the plugin.
## timeouts ## timeouts
read_timeout = "10s" read_timeout = "10s"
write_timeout = "10s" write_timeout = "10s"
## HTTPS
tls_cert= "/etc/telegraf/cert.pem"
tls_key = "/etc/telegraf/key.pem"
## MTLS
tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
``` ```

View File

@ -3,7 +3,10 @@ package http_listener
import ( import (
"bytes" "bytes"
"compress/gzip" "compress/gzip"
"crypto/tls"
"crypto/x509"
"io" "io"
"io/ioutil"
"log" "log"
"net" "net"
"net/http" "net/http"
@ -37,6 +40,10 @@ type HTTPListener struct {
MaxLineSize int MaxLineSize int
Port int Port int
TlsAllowedCacerts []string
TlsCert string
TlsKey string
mu sync.Mutex mu sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
@ -75,6 +82,14 @@ const sampleConfig = `
## Maximum line size allowed to be sent in bytes. ## Maximum line size allowed to be sent in bytes.
## 0 means to use the default of 65536 bytes (64 kibibytes) ## 0 means to use the default of 65536 bytes (64 kibibytes)
max_line_size = 0 max_line_size = 0
## Set one or more allowed client CA certificate file names to
## enable mutually authenticated TLS connections
tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
## Add service certificate and key
tls_cert = "/etc/telegraf/cert.pem"
tls_key = "/etc/telegraf/key.pem"
` `
func (h *HTTPListener) SampleConfig() string { func (h *HTTPListener) SampleConfig() string {
@ -117,10 +132,33 @@ func (h *HTTPListener) Start(acc telegraf.Accumulator) error {
h.MaxLineSize = DEFAULT_MAX_LINE_SIZE h.MaxLineSize = DEFAULT_MAX_LINE_SIZE
} }
if h.ReadTimeout.Duration < time.Second {
h.ReadTimeout.Duration = time.Second * 10
}
if h.WriteTimeout.Duration < time.Second {
h.WriteTimeout.Duration = time.Second * 10
}
h.acc = acc h.acc = acc
h.pool = NewPool(200, h.MaxLineSize) h.pool = NewPool(200, h.MaxLineSize)
var listener, err = net.Listen("tcp", h.ServiceAddress) tlsConf := h.getTLSConfig()
server := &http.Server{
Addr: h.ServiceAddress,
Handler: h,
ReadTimeout: h.ReadTimeout.Duration,
WriteTimeout: h.WriteTimeout.Duration,
TLSConfig: tlsConf,
}
var err error
var listener net.Listener
if tlsConf != nil {
listener, err = tls.Listen("tcp", h.ServiceAddress, tlsConf)
} else {
listener, err = net.Listen("tcp", h.ServiceAddress)
}
if err != nil { if err != nil {
return err return err
} }
@ -130,7 +168,7 @@ func (h *HTTPListener) Start(acc telegraf.Accumulator) error {
h.wg.Add(1) h.wg.Add(1)
go func() { go func() {
defer h.wg.Done() defer h.wg.Done()
h.httpListen() server.Serve(h.listener)
}() }()
log.Printf("I! Started HTTP listener service on %s\n", h.ServiceAddress) log.Printf("I! Started HTTP listener service on %s\n", h.ServiceAddress)
@ -149,27 +187,6 @@ func (h *HTTPListener) Stop() {
log.Println("I! Stopped HTTP listener service on ", h.ServiceAddress) log.Println("I! Stopped HTTP listener service on ", h.ServiceAddress)
} }
// httpListen sets up an http.Server and calls server.Serve.
// like server.Serve, httpListen will always return a non-nil error, for this
// reason, the error returned should probably be ignored.
// see https://golang.org/pkg/net/http/#Server.Serve
func (h *HTTPListener) httpListen() error {
if h.ReadTimeout.Duration < time.Second {
h.ReadTimeout.Duration = time.Second * 10
}
if h.WriteTimeout.Duration < time.Second {
h.WriteTimeout.Duration = time.Second * 10
}
var server = http.Server{
Handler: h,
ReadTimeout: h.ReadTimeout.Duration,
WriteTimeout: h.WriteTimeout.Duration,
}
return server.Serve(h.listener)
}
func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
h.RequestsRecv.Incr(1) h.RequestsRecv.Incr(1)
defer h.RequestsServed.Incr(1) defer h.RequestsServed.Incr(1)
@ -327,6 +344,38 @@ func badRequest(res http.ResponseWriter) {
res.Write([]byte(`{"error":"http: bad request"}`)) res.Write([]byte(`{"error":"http: bad request"}`))
} }
func (h *HTTPListener) getTLSConfig() *tls.Config {
tlsConf := &tls.Config{
InsecureSkipVerify: false,
Renegotiation: tls.RenegotiateNever,
}
if len(h.TlsCert) == 0 || len(h.TlsKey) == 0 {
return nil
}
cert, err := tls.LoadX509KeyPair(h.TlsCert, h.TlsKey)
if err != nil {
return nil
}
tlsConf.Certificates = []tls.Certificate{cert}
if h.TlsAllowedCacerts != nil {
tlsConf.ClientAuth = tls.RequireAndVerifyClientCert
clientPool := x509.NewCertPool()
for _, ca := range h.TlsAllowedCacerts {
c, err := ioutil.ReadFile(ca)
if err != nil {
continue
}
clientPool.AppendCertsFromPEM(c)
}
tlsConf.ClientCAs = clientPool
}
return tlsConf
}
func init() { func init() {
inputs.Add("http_listener", func() telegraf.Input { inputs.Add("http_listener", func() telegraf.Input {
return &HTTPListener{ return &HTTPListener{

View File

@ -2,6 +2,9 @@ package http_listener
import ( import (
"bytes" "bytes"
"crypto/tls"
"crypto/x509"
"io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
@ -29,6 +32,150 @@ cpu_load_short,host=server06 value=12.0 1422568543702900257
badMsg = "blahblahblah: 42\n" badMsg = "blahblahblah: 42\n"
emptyMsg = "" emptyMsg = ""
serviceRootPEM = `-----BEGIN CERTIFICATE-----
MIIDRTCCAi2gAwIBAgIUenakcvMDj2URxBvUHBe0Mfhac0cwDQYJKoZIhvcNAQEL
BQAwGzEZMBcGA1UEAxMQdGVsZWdyYWYtdGVzdC1jYTAeFw0xNzA4MzEwNTE5NDNa
Fw0yNzA4MjkwNTIwMTNaMBsxGTAXBgNVBAMTEHRlbGVncmFmLXRlc3QtY2EwggEi
MA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDxpDlUEC6LNXQMhvTtlWKUekwa
xh2OaiR16WvO8iA+sYmjlpFXOe+V6YWT+daOGujCqlGdrfDjj3C3pqFPJ6Q4VXaA
xQyd0Ena7kRtuQ/IUSpTWxyrpSIzKL3dAoV0NYpjFWznjVMP3Rq4l+4cHqviZSvK
bWUK5n0vBGpEw3A22V9urhlSNkSbECvzn9EFHyIeJX603zaKXYw5wiDwCp1swbXW
2WS2h45JeI5xrpKcFmLaqRNe0swi6bkGnmefyCv7nsbOLeKyEW9AExDSd6nSLdu9
TGzhAfnfodcajSmKiQ+7YL9JY1bQ9hlfXk1ULg4riSEMKF+trZFZUanaXeeBAgMB
AAGjgYAwfjAOBgNVHQ8BAf8EBAMCAQYwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4E
FgQUiPkCD8gEsSgIiV8jzACMoUZcHaIwHwYDVR0jBBgwFoAUiPkCD8gEsSgIiV8j
zACMoUZcHaIwGwYDVR0RBBQwEoIQdGVsZWdyYWYtdGVzdC1jYTANBgkqhkiG9w0B
AQsFAAOCAQEAXeadR7ZVkb2C0F8OEd2CQxVt2/JOqM4G2N2O8uTwf+hIn+qm+jbb
Q6JokGhr5Ybhvtv3U9JnI6RVI+TOYNkDzs5e2DtntFQmcKb2c+y5Z+OpvWd13ObK
GMCs4bho6O7h1qo1Z+Ftd6sYQ7JL0MuTGWCNbXv2c1iC4zPT54n1vGZC5so08RO0
r7bqLLEnkSawabvSAeTxtweCXJUw3D576e0sb8oU0AP/Hn/2IC9E1vFZdjDswEfs
ARE4Oc5XnN6sqjtp0q5CqPpW6tYFwxdtZFk0VYPXyRnETVgry7Dc/iX6mktIYUx+
qWSyPEDKALyxx6yUyVDqgcY2VUm0rM/1Iw==
-----END CERTIFICATE-----`
serviceCertPEM = `-----BEGIN CERTIFICATE-----
MIIDKjCCAhKgAwIBAgIUVYjQKruuFavlMZvV7X6RRF4OyBowDQYJKoZIhvcNAQEL
BQAwGzEZMBcGA1UEAxMQdGVsZWdyYWYtdGVzdC1jYTAeFw0xNzA4MzEwNTM3MjRa
Fw0xNzA5MzAwNTM3NTRaMBQxEjAQBgNVBAMTCWxvY2FsaG9zdDCCASIwDQYJKoZI
hvcNAQEBBQADggEPADCCAQoCggEBANojLHm+4ttLfl8xo4orZ436/o36wdQ30sWz
xE8eGejhARvCSNIR1Tau41Towq/MQVQQejQJRgqBSz7UEfzJNJGKKKc560j6fmTM
FHpFNZcTrNrTb0r3blUWF1oswhTgg313OXbVsz+E9tHkT1p/s9uURy3TJ3O/CFHq
2vTiTQMTq31v0FEN1E/d6uzMhnGy5+QuRu/0A2iPpgXgPopYZwG5t4hN1KklM//l
j2gMlX6mAYalctFOkDbhIe4/4dQcfT0sWA49KInZmUeB1RdyiNfCoXnDRZHocPIj
ltYAK/Igda0fdlMisoqh2ZMrCt8yhws7ycc12cFi7ZMv8zvi5p8CAwEAAaNtMGsw
EwYDVR0lBAwwCgYIKwYBBQUHAwEwHQYDVR0OBBYEFCdE87Nz7vPpgRmj++6J8rQR
0F/TMB8GA1UdIwQYMBaAFIj5Ag/IBLEoCIlfI8wAjKFGXB2iMBQGA1UdEQQNMAuC
CWxvY2FsaG9zdDANBgkqhkiG9w0BAQsFAAOCAQEAIPhMYCsCPvOcvLLkahaZVn2g
ZbzPDplFhEsH1cpc7vd3GCV2EYjNTbBTDs5NlovSbJLf1DFB+gwsfEjhlFVZB3UQ
6GtuA5CQh/Skv8ngCDiLP50BbKF0CLa4Ia0xrSTAyRsg2rt9APphbej0yKqJ7j8U
1KK6rjOSnuzrKseex26VVovjPFq0FgkghWRm0xrAeizGTBCSEStZEPhk3pBo2x95
a32VPpmhlQMDyiV6m1cc9/MfxMisnyeLqJl8E9nziNa4/BgwwN9DcOp63D9OOa6A
brtLz8OXqvV+7gKlq+nASFDimXwFKRyqRH6ECyHNTE2K14KZb7+JTa0AUm6Nlw==
-----END CERTIFICATE-----`
serviceKeyPEM = `-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEA2iMseb7i20t+XzGjiitnjfr+jfrB1DfSxbPETx4Z6OEBG8JI
0hHVNq7jVOjCr8xBVBB6NAlGCoFLPtQR/Mk0kYoopznrSPp+ZMwUekU1lxOs2tNv
SvduVRYXWizCFOCDfXc5dtWzP4T20eRPWn+z25RHLdMnc78IUera9OJNAxOrfW/Q
UQ3UT93q7MyGcbLn5C5G7/QDaI+mBeA+ilhnAbm3iE3UqSUz/+WPaAyVfqYBhqVy
0U6QNuEh7j/h1Bx9PSxYDj0oidmZR4HVF3KI18KhecNFkehw8iOW1gAr8iB1rR92
UyKyiqHZkysK3zKHCzvJxzXZwWLtky/zO+LmnwIDAQABAoIBABD8MidcrK9kpndl
FxXYIV0V0SJfBx6uJhRM1hlO/7d5ZauyqhbpWo/CeGMRKK+lmOShz9Ijcre4r5I5
0xi61gQLHPVAdkidcKAKoAGRSAX2ezwiwIS21Xl8md7ko0wa20I2uVu+chGdGdbo
DyG91dRgLFauHWFO26f9QIVW5aY6ifyjg1fyxR/9n2YZfkqbjvASW4Mmfv5GR1aT
mffajgsquy78PKs86f879iG+cfCzPYdoK+h7fsm4EEqDwK8JCsUIY1qN+Tuj5RQY
zuIuD34+wywe7Jd1vwjQ40Cyilgtnu8Q8s8J05bXrD3mqer5nrqIGOX0vKgs+EXx
1hV+6ZECgYEA+950L2u8oPzNXu9BAL8Y5Tl384qj1+Cj/g28MuZFoPf/KU0HRN6l
PBlXKaGP9iX+749tdiNPk5keIwOL8xCVXOpMLOA/jOlGODydG9rX67WCL/R1RcJR
+Pip8dxO1ZNpOKHud06XLMuuVz9qNq0Xwb1VCzNTOxnEDwtXNyDm6OkCgYEA3bcW
hMeDNn85UA4n0ljcdCmEu12WS7L//jaAOWuPPfM3GgKEIic6hqrPWEuZlZtQnybx
L6qQgaWyCfl/8z0+5WynQqkVPz1j1dSrSKnemxyeySrmUcOH5UJfAKaG5PUd7H3t
oPTCxkbW3Bi2QLlgd4nb7+OEk6w0V9Zzv4AFHkcCgYBL/aD2Ub4WoE9iLjNhg0aC
mmUrcI/gaSFxXDmE7d7iIxC0KE5iI/6cdFTM9bbWoD4bjx2KgDrZIGBsVfyaeE1o
PDSBcaMa46LRAtCv/8YXkqrVxx6+zlMnF/dGRp7uZ0xeztSA4JBR7p4KKtLj7jN1
u6b1+yVIdoylsVk+A8pHSQKBgQCUcsn5DTyleHl/SHsRM74naMUeToMbHDaalxMz
XvkBmZ8DIzwlQe7FzAgYLkYfDWblqMVEDQfERpT2aL9qtU8vfZhf4aYAObJmsYYd
mN8bLAaE2txrUmfi8JV7cgRPuG7YsVgxtK/U4glqRIGCxJv6bat86vERjvNc/JFz
XtwOcQKBgF83Ov+EA9pL0AFs+BMiO+0SX/OqLX0TDMSqUKg3jjVfgl+BSBEZIsOu
g5jqHBx3Om/UyrXdn+lnMhyEgCuNkeC6057B5iGcWucTlnINeejXk/pnbvMtGjD1
OGWmdXhgLtKg6Edqm+9fnH0UJN6DRxRRCUfzMfbY8TRmLzZG2W34
-----END RSA PRIVATE KEY-----`
clientRootPEM = `-----BEGIN CERTIFICATE-----
MIIDRTCCAi2gAwIBAgIUenakcvMDj2URxBvUHBe0Mfhac0cwDQYJKoZIhvcNAQEL
BQAwGzEZMBcGA1UEAxMQdGVsZWdyYWYtdGVzdC1jYTAeFw0xNzA4MzEwNTE5NDNa
Fw0yNzA4MjkwNTIwMTNaMBsxGTAXBgNVBAMTEHRlbGVncmFmLXRlc3QtY2EwggEi
MA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDxpDlUEC6LNXQMhvTtlWKUekwa
xh2OaiR16WvO8iA+sYmjlpFXOe+V6YWT+daOGujCqlGdrfDjj3C3pqFPJ6Q4VXaA
xQyd0Ena7kRtuQ/IUSpTWxyrpSIzKL3dAoV0NYpjFWznjVMP3Rq4l+4cHqviZSvK
bWUK5n0vBGpEw3A22V9urhlSNkSbECvzn9EFHyIeJX603zaKXYw5wiDwCp1swbXW
2WS2h45JeI5xrpKcFmLaqRNe0swi6bkGnmefyCv7nsbOLeKyEW9AExDSd6nSLdu9
TGzhAfnfodcajSmKiQ+7YL9JY1bQ9hlfXk1ULg4riSEMKF+trZFZUanaXeeBAgMB
AAGjgYAwfjAOBgNVHQ8BAf8EBAMCAQYwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4E
FgQUiPkCD8gEsSgIiV8jzACMoUZcHaIwHwYDVR0jBBgwFoAUiPkCD8gEsSgIiV8j
zACMoUZcHaIwGwYDVR0RBBQwEoIQdGVsZWdyYWYtdGVzdC1jYTANBgkqhkiG9w0B
AQsFAAOCAQEAXeadR7ZVkb2C0F8OEd2CQxVt2/JOqM4G2N2O8uTwf+hIn+qm+jbb
Q6JokGhr5Ybhvtv3U9JnI6RVI+TOYNkDzs5e2DtntFQmcKb2c+y5Z+OpvWd13ObK
GMCs4bho6O7h1qo1Z+Ftd6sYQ7JL0MuTGWCNbXv2c1iC4zPT54n1vGZC5so08RO0
r7bqLLEnkSawabvSAeTxtweCXJUw3D576e0sb8oU0AP/Hn/2IC9E1vFZdjDswEfs
ARE4Oc5XnN6sqjtp0q5CqPpW6tYFwxdtZFk0VYPXyRnETVgry7Dc/iX6mktIYUx+
qWSyPEDKALyxx6yUyVDqgcY2VUm0rM/1Iw==
-----END CERTIFICATE-----`
clientCertPEM = `-----BEGIN CERTIFICATE-----
MIIDMDCCAhigAwIBAgIUIVOF5g2zH6+J/dbGdu4q18aSJoMwDQYJKoZIhvcNAQEL
BQAwGzEZMBcGA1UEAxMQdGVsZWdyYWYtdGVzdC1jYTAeFw0xNzA4MzEwNTQ1MzJa
Fw0yNzA4MjUwMTQ2MDJaMBcxFTATBgNVBAMTDGR1bW15LWNsaWVudDCCASIwDQYJ
KoZIhvcNAQEBBQADggEPADCCAQoCggEBAKok1HJ40buyjrS+DG9ORLzrWIJad2y/
6X2Bg9MSENfpEUgaS7nK2ML3m1e2poHqBSR+V8VECNs+MDCLSOeQ4FC1TdBKMLfw
NxW88y5Gj6rTRcAXl092ba7stwbqJPBAZu1Eh1jXIp5nrFKh8Jq7kRxmMB5vC70V
fOSPS0RZtEd7D+QZ6jgkFJWsZzn4gJr8nc/kmLcntLw+g/tz9/8lfaV306tLlhMH
dv3Ka6Nt86j6/muOwvoeAkAnCEFAgDcXg4F37PFAiEHRw9DyTeWDuZqvnMZ3gosL
kl15QhnP0yG2QCjSb1gaLcKB42cyxDnPc31WsVuuzQnajazcVf3lJW0CAwEAAaNw
MG4wEwYDVR0lBAwwCgYIKwYBBQUHAwIwHQYDVR0OBBYEFCemMO+Qlj+YCLQ3ScAQ
8XYJJJ5ZMB8GA1UdIwQYMBaAFIj5Ag/IBLEoCIlfI8wAjKFGXB2iMBcGA1UdEQQQ
MA6CDGR1bW15LWNsaWVudDANBgkqhkiG9w0BAQsFAAOCAQEARThbApKvvGDp7uSc
mINaqDOHe69F9PepV0/3+B5+X1b3yd2sbzZL/ZoHl27kajSHVrUF+09gcTosfuY3
omnIPw+NseqTJG+qTMRb3AarLNO46EJZLOowAEhnJyVmhK5uU0YqhV1X9eN+g4/o
BuyOPvHj6UJWviZFy6fDIj2N+ygN/CNP5X3iLDBUoyCEHAehLiQr0aRgsqe4JLlS
P+0l0btTUpcqUhsQy+sD2lv3MO1tZ/P4zhzu0J0LUeLBDdOPf/FIvTgkCNxN9GGy
SLmeBeCzsKmWbzE3Yuahw3h4IblVyyGc7ZDGIobDrZgFqshcZylU8wrsjUnjNSPA
G+LOWQ==
-----END CERTIFICATE-----`
clientKeyPEM = `-----BEGIN RSA PRIVATE KEY-----
MIIEogIBAAKCAQEAqiTUcnjRu7KOtL4Mb05EvOtYglp3bL/pfYGD0xIQ1+kRSBpL
ucrYwvebV7amgeoFJH5XxUQI2z4wMItI55DgULVN0Eowt/A3FbzzLkaPqtNFwBeX
T3Ztruy3Buok8EBm7USHWNcinmesUqHwmruRHGYwHm8LvRV85I9LRFm0R3sP5Bnq
OCQUlaxnOfiAmvydz+SYtye0vD6D+3P3/yV9pXfTq0uWEwd2/cpro23zqPr+a47C
+h4CQCcIQUCANxeDgXfs8UCIQdHD0PJN5YO5mq+cxneCiwuSXXlCGc/TIbZAKNJv
WBotwoHjZzLEOc9zfVaxW67NCdqNrNxV/eUlbQIDAQABAoIBAAXZYEhTKPqn58oE
4o6NBUXtXUyV6ZcefdtnsW13KIcTpxlwdfv8IjmJo5h/WfgLYIPhqAjLDvbii2uP
zkDPtTZxFSy88DHSm0IvDbkgid3Yh4RUC0qbCqhB0QT21bBAtokfmvuN4c3KSJ1K
nefj3Ng6Fxtku+WTMIj2+CJwZwcyAH47ZUngYs/77gA0hAJcbdL/bj8Bpmd+lH6C
Ci22T2hrw+cpWMN6qwa3wxWIneCaqxkylSgpUzSNE0QO3mXkX+NYtL2BQ0w+wPqq
lww3QJOFAX1qRLflglL9K+ruTQofm49vxv6apsoqdkrxEBoPzkljlqiPRmzUxau4
cvbApQECgYEAy5m5O3mQt6DBrDRJWSwpZh6DRNd5USEqXOIFtp+Qze2Jx1pYQfZt
NOXOrwy04o0+6yLzc4O4W5ta2KfTlALFzCa6Na3Ca4ZUAeteWprrdh8b1b2w/wUH
E3uQFkvH0zFdPsA3pTTZ0k/ydmHnu4zZqBnSeh0dIW8xFYgZZCgQusECgYEA1e7O
ujCUa8y49sY42D/Y/c8B96xVfJZO5hhY7eLgkzqUlmFl31Ld7AjlJcXpbMeW1vaa
0Mxbfx2qAVaZEkvdnXq3V8spe6qOGBdlKzey4DMEfmEXLFp5DRYCSwpXiqDZcGqc
jwI58wuzKoDgydN9bLdF8XYGtQXnHIE9WyTYMa0CgYBKYSBgb+rEir/2LyvUneOJ
4P/HuIgjcWBOimvX6bc2495/q6uufV4sAwBcxuGWGk+wCxaxTp+dJ8YqfDU5T0H/
cO56Cb6LFYm/IcNYilwWzQqYLTJqF+Yb4fojiw+3QcN01zf87K/eu0IyqVXFGJGz
bauM3PH1cu+VlCDijBiAgQKBgDOQ9YmRriTx2t+41fjiIvbC0BGYG58FSA1UbxMg
LcuvQiOhZIHZIp8DYeCh/Or4jRZRqO2NZLyWNOVPr2Pmn4uXCdyCnwQtD0UlVoB9
U4ORKJMh6gkJ4cXSuUjHPGSw8tiTChu6iKdZ+ZzUJdrgPIpY/uX98g3uV0/aoyR2
FBqdAoGAQIrcOsTpCe6l3ZDtQyNIeAj1s7sZyW8MBy95RXw3y/yzVEOAu4yWNobj
RReeHQEsrQq+sJ/cols8HfoOpGpL3U0IGDi5vr1JlOXmBhFX2xuFrfh3jvgXlUqb
fqxPcT3d7I/UEi0ueDh3osyTn46mDfRfF7HBLBNeyQbIFWBDDus=
-----END RSA PRIVATE KEY-----`
)
var (
initClient sync.Once
client *http.Client
initServiceCertFiles sync.Once
allowedCAFiles []string
serviceCAFiles []string
serviceCertFile string
serviceKeyFile string
) )
func newTestHTTPListener() *HTTPListener { func newTestHTTPListener() *HTTPListener {
@ -38,9 +185,79 @@ func newTestHTTPListener() *HTTPListener {
return listener return listener
} }
func createURL(listener *HTTPListener, path string, rawquery string) string { func newTestHTTPSListener() *HTTPListener {
initServiceCertFiles.Do(func() {
acaf, err := ioutil.TempFile("", "allowedCAFile.crt")
if err != nil {
panic(err)
}
defer acaf.Close()
_, err = io.Copy(acaf, bytes.NewReader([]byte(clientRootPEM)))
allowedCAFiles = []string{acaf.Name()}
scaf, err := ioutil.TempFile("", "serviceCAFile.crt")
if err != nil {
panic(err)
}
defer scaf.Close()
_, err = io.Copy(scaf, bytes.NewReader([]byte(serviceRootPEM)))
serviceCAFiles = []string{scaf.Name()}
scf, err := ioutil.TempFile("", "serviceCertFile.crt")
if err != nil {
panic(err)
}
defer scf.Close()
_, err = io.Copy(scf, bytes.NewReader([]byte(serviceCertPEM)))
serviceCertFile = scf.Name()
skf, err := ioutil.TempFile("", "serviceKeyFile.crt")
if err != nil {
panic(err)
}
defer skf.Close()
_, err = io.Copy(skf, bytes.NewReader([]byte(serviceKeyPEM)))
serviceKeyFile = skf.Name()
})
listener := &HTTPListener{
ServiceAddress: ":0",
TlsAllowedCacerts: allowedCAFiles,
TlsCert: serviceCertFile,
TlsKey: serviceKeyFile,
}
return listener
}
func getHTTPSClient() *http.Client {
initClient.Do(func() {
cas := x509.NewCertPool()
cas.AppendCertsFromPEM([]byte(serviceRootPEM))
clientCert, err := tls.X509KeyPair([]byte(clientCertPEM), []byte(clientKeyPEM))
if err != nil {
panic(err)
}
client = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: cas,
Certificates: []tls.Certificate{clientCert},
MinVersion: tls.VersionTLS12,
MaxVersion: tls.VersionTLS12,
CipherSuites: []uint16{tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256},
Renegotiation: tls.RenegotiateNever,
InsecureSkipVerify: false,
},
},
}
})
return client
}
func createURL(listener *HTTPListener, scheme string, path string, rawquery string) string {
u := url.URL{ u := url.URL{
Scheme: "http", Scheme: scheme,
Host: "localhost:" + strconv.Itoa(listener.Port), Host: "localhost:" + strconv.Itoa(listener.Port),
Path: path, Path: path,
RawQuery: rawquery, RawQuery: rawquery,
@ -48,6 +265,45 @@ func createURL(listener *HTTPListener, path string, rawquery string) string {
return u.String() return u.String()
} }
func TestWriteHTTPSNoClientAuth(t *testing.T) {
listener := newTestHTTPSListener()
listener.TlsAllowedCacerts = nil
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
cas := x509.NewCertPool()
cas.AppendCertsFromPEM([]byte(serviceRootPEM))
noClientAuthClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: cas,
},
},
}
// post single message to listener
resp, err := noClientAuthClient.Post(createURL(listener, "https", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsg)))
require.NoError(t, err)
resp.Body.Close()
require.EqualValues(t, 204, resp.StatusCode)
}
func TestWriteHTTPSWithClientAuth(t *testing.T) {
listener := newTestHTTPSListener()
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
// post single message to listener
resp, err := getHTTPSClient().Post(createURL(listener, "https", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsg)))
require.NoError(t, err)
resp.Body.Close()
require.EqualValues(t, 204, resp.StatusCode)
}
func TestWriteHTTP(t *testing.T) { func TestWriteHTTP(t *testing.T) {
listener := newTestHTTPListener() listener := newTestHTTPListener()
@ -56,7 +312,7 @@ func TestWriteHTTP(t *testing.T) {
defer listener.Stop() defer listener.Stop()
// post single message to listener // post single message to listener
resp, err := http.Post(createURL(listener, "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsg))) resp, err := http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsg)))
require.NoError(t, err) require.NoError(t, err)
resp.Body.Close() resp.Body.Close()
require.EqualValues(t, 204, resp.StatusCode) require.EqualValues(t, 204, resp.StatusCode)
@ -68,7 +324,7 @@ func TestWriteHTTP(t *testing.T) {
) )
// post multiple message to listener // post multiple message to listener
resp, err = http.Post(createURL(listener, "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgs))) resp, err = http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgs)))
require.NoError(t, err) require.NoError(t, err)
resp.Body.Close() resp.Body.Close()
require.EqualValues(t, 204, resp.StatusCode) require.EqualValues(t, 204, resp.StatusCode)
@ -84,7 +340,7 @@ func TestWriteHTTP(t *testing.T) {
} }
// Post a gigantic metric to the listener and verify that an error is returned: // Post a gigantic metric to the listener and verify that an error is returned:
resp, err = http.Post(createURL(listener, "/write", "db=mydb"), "", bytes.NewBuffer([]byte(hugeMetric))) resp, err = http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(hugeMetric)))
require.NoError(t, err) require.NoError(t, err)
resp.Body.Close() resp.Body.Close()
require.EqualValues(t, 400, resp.StatusCode) require.EqualValues(t, 400, resp.StatusCode)
@ -105,7 +361,7 @@ func TestWriteHTTPNoNewline(t *testing.T) {
defer listener.Stop() defer listener.Stop()
// post single message to listener // post single message to listener
resp, err := http.Post(createURL(listener, "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgNoNewline))) resp, err := http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgNoNewline)))
require.NoError(t, err) require.NoError(t, err)
resp.Body.Close() resp.Body.Close()
require.EqualValues(t, 204, resp.StatusCode) require.EqualValues(t, 204, resp.StatusCode)
@ -128,7 +384,7 @@ func TestWriteHTTPMaxLineSizeIncrease(t *testing.T) {
defer listener.Stop() defer listener.Stop()
// Post a gigantic metric to the listener and verify that it writes OK this time: // Post a gigantic metric to the listener and verify that it writes OK this time:
resp, err := http.Post(createURL(listener, "/write", "db=mydb"), "", bytes.NewBuffer([]byte(hugeMetric))) resp, err := http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(hugeMetric)))
require.NoError(t, err) require.NoError(t, err)
resp.Body.Close() resp.Body.Close()
require.EqualValues(t, 204, resp.StatusCode) require.EqualValues(t, 204, resp.StatusCode)
@ -144,7 +400,7 @@ func TestWriteHTTPVerySmallMaxBody(t *testing.T) {
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
defer listener.Stop() defer listener.Stop()
resp, err := http.Post(createURL(listener, "/write", ""), "", bytes.NewBuffer([]byte(hugeMetric))) resp, err := http.Post(createURL(listener, "http", "/write", ""), "", bytes.NewBuffer([]byte(hugeMetric)))
require.NoError(t, err) require.NoError(t, err)
resp.Body.Close() resp.Body.Close()
require.EqualValues(t, 413, resp.StatusCode) require.EqualValues(t, 413, resp.StatusCode)
@ -160,7 +416,7 @@ func TestWriteHTTPVerySmallMaxLineSize(t *testing.T) {
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
defer listener.Stop() defer listener.Stop()
resp, err := http.Post(createURL(listener, "/write", ""), "", bytes.NewBuffer([]byte(testMsgs))) resp, err := http.Post(createURL(listener, "http", "/write", ""), "", bytes.NewBuffer([]byte(testMsgs)))
require.NoError(t, err) require.NoError(t, err)
resp.Body.Close() resp.Body.Close()
require.EqualValues(t, 204, resp.StatusCode) require.EqualValues(t, 204, resp.StatusCode)
@ -186,7 +442,7 @@ func TestWriteHTTPLargeLinesSkipped(t *testing.T) {
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
defer listener.Stop() defer listener.Stop()
resp, err := http.Post(createURL(listener, "/write", ""), "", bytes.NewBuffer([]byte(hugeMetric+testMsgs))) resp, err := http.Post(createURL(listener, "http", "/write", ""), "", bytes.NewBuffer([]byte(hugeMetric+testMsgs)))
require.NoError(t, err) require.NoError(t, err)
resp.Body.Close() resp.Body.Close()
require.EqualValues(t, 400, resp.StatusCode) require.EqualValues(t, 400, resp.StatusCode)
@ -213,7 +469,7 @@ func TestWriteHTTPGzippedData(t *testing.T) {
data, err := ioutil.ReadFile("./testdata/testmsgs.gz") data, err := ioutil.ReadFile("./testdata/testmsgs.gz")
require.NoError(t, err) require.NoError(t, err)
req, err := http.NewRequest("POST", createURL(listener, "/write", ""), bytes.NewBuffer(data)) req, err := http.NewRequest("POST", createURL(listener, "http", "/write", ""), bytes.NewBuffer(data))
require.NoError(t, err) require.NoError(t, err)
req.Header.Set("Content-Encoding", "gzip") req.Header.Set("Content-Encoding", "gzip")
@ -248,7 +504,7 @@ func TestWriteHTTPHighTraffic(t *testing.T) {
go func(innerwg *sync.WaitGroup) { go func(innerwg *sync.WaitGroup) {
defer innerwg.Done() defer innerwg.Done()
for i := 0; i < 500; i++ { for i := 0; i < 500; i++ {
resp, err := http.Post(createURL(listener, "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgs))) resp, err := http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgs)))
require.NoError(t, err) require.NoError(t, err)
resp.Body.Close() resp.Body.Close()
require.EqualValues(t, 204, resp.StatusCode) require.EqualValues(t, 204, resp.StatusCode)
@ -271,7 +527,7 @@ func TestReceive404ForInvalidEndpoint(t *testing.T) {
defer listener.Stop() defer listener.Stop()
// post single message to listener // post single message to listener
resp, err := http.Post(createURL(listener, "/foobar", ""), "", bytes.NewBuffer([]byte(testMsg))) resp, err := http.Post(createURL(listener, "http", "/foobar", ""), "", bytes.NewBuffer([]byte(testMsg)))
require.NoError(t, err) require.NoError(t, err)
resp.Body.Close() resp.Body.Close()
require.EqualValues(t, 404, resp.StatusCode) require.EqualValues(t, 404, resp.StatusCode)
@ -285,7 +541,7 @@ func TestWriteHTTPInvalid(t *testing.T) {
defer listener.Stop() defer listener.Stop()
// post single message to listener // post single message to listener
resp, err := http.Post(createURL(listener, "/write", "db=mydb"), "", bytes.NewBuffer([]byte(badMsg))) resp, err := http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(badMsg)))
require.NoError(t, err) require.NoError(t, err)
resp.Body.Close() resp.Body.Close()
require.EqualValues(t, 400, resp.StatusCode) require.EqualValues(t, 400, resp.StatusCode)
@ -299,7 +555,7 @@ func TestWriteHTTPEmpty(t *testing.T) {
defer listener.Stop() defer listener.Stop()
// post single message to listener // post single message to listener
resp, err := http.Post(createURL(listener, "/write", "db=mydb"), "", bytes.NewBuffer([]byte(emptyMsg))) resp, err := http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(emptyMsg)))
require.NoError(t, err) require.NoError(t, err)
resp.Body.Close() resp.Body.Close()
require.EqualValues(t, 204, resp.StatusCode) require.EqualValues(t, 204, resp.StatusCode)
@ -314,12 +570,12 @@ func TestQueryAndPingHTTP(t *testing.T) {
// post query to listener // post query to listener
resp, err := http.Post( resp, err := http.Post(
createURL(listener, "/query", "db=&q=CREATE+DATABASE+IF+NOT+EXISTS+%22mydb%22"), "", nil) createURL(listener, "http", "/query", "db=&q=CREATE+DATABASE+IF+NOT+EXISTS+%22mydb%22"), "", nil)
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, 200, resp.StatusCode) require.EqualValues(t, 200, resp.StatusCode)
// post ping to listener // post ping to listener
resp, err = http.Post(createURL(listener, "/ping", ""), "", nil) resp, err = http.Post(createURL(listener, "http", "/ping", ""), "", nil)
require.NoError(t, err) require.NoError(t, err)
resp.Body.Close() resp.Body.Close()
require.EqualValues(t, 204, resp.StatusCode) require.EqualValues(t, 204, resp.StatusCode)
@ -334,7 +590,7 @@ func TestWriteWithPrecision(t *testing.T) {
msg := "xyzzy value=42 1422568543\n" msg := "xyzzy value=42 1422568543\n"
resp, err := http.Post( resp, err := http.Post(
createURL(listener, "/write", "precision=s"), "", bytes.NewBuffer([]byte(msg))) createURL(listener, "http", "/write", "precision=s"), "", bytes.NewBuffer([]byte(msg)))
require.NoError(t, err) require.NoError(t, err)
resp.Body.Close() resp.Body.Close()
require.EqualValues(t, 204, resp.StatusCode) require.EqualValues(t, 204, resp.StatusCode)