Add a 404 and high-traffic test to http listener

also remove locking around adding metrics. Instead, keep a waitgroup on
the ServeHTTP function and wait for that to finish before returning from
the Stop() function

closes #1407
This commit is contained in:
Cameron Sparr 2016-09-06 15:09:37 +01:00
parent 67c288abda
commit 301c79e57c
3 changed files with 80 additions and 39 deletions

View File

@ -17,6 +17,7 @@
- [#1542](https://github.com/influxdata/telegraf/pull/1542): Add filestack webhook plugin.
- [#1599](https://github.com/influxdata/telegraf/pull/1599): Add server hostname for each docker measurements.
- [#1697](https://github.com/influxdata/telegraf/pull/1697): Add NATS output plugin.
- [#1407](https://github.com/influxdata/telegraf/pull/1407): HTTP service listener input plugin.
### Bugfixes
@ -91,17 +92,6 @@ consistent with the behavior of `collection_jitter`.
- [#1434](https://github.com/influxdata/telegraf/pull/1434): Add measurement name arg to logparser plugin.
- [#1479](https://github.com/influxdata/telegraf/pull/1479): logparser: change resp_code from a field to a tag.
- [#1411](https://github.com/influxdata/telegraf/pull/1411): Implement support for fetching hddtemp data
- [#1407](https://github.com/influxdata/telegraf/pull/1407): HTTP service listener input plugin.
### Bugfixes
- [#1384](https://github.com/influxdata/telegraf/pull/1384): Fix datarace in apache input plugin.
- [#1399](https://github.com/influxdata/telegraf/issues/1399): Add `read_repairs` statistics to riak plugin.
## v1.0 beta 2 [2016-06-21]
### Features
- [#1340](https://github.com/influxdata/telegraf/issues/1340): statsd: do not log every dropped metric.
- [#1368](https://github.com/influxdata/telegraf/pull/1368): Add precision rounding to all metrics on collection.
- [#1390](https://github.com/influxdata/telegraf/pull/1390): Add support for Tengine

View File

@ -21,6 +21,7 @@ type HttpListener struct {
WriteTimeout internal.Duration
sync.Mutex
wg sync.WaitGroup
listener *stoppableListener.StoppableListener
@ -84,12 +85,13 @@ func (t *HttpListener) Stop() {
t.listener.Stop()
t.listener.Close()
t.wg.Wait()
log.Println("Stopped HTTP listener service on ", t.ServiceAddress)
}
// httpListen listens for HTTP requests.
func (t *HttpListener) httpListen() error {
if t.ReadTimeout.Duration < time.Second {
t.ReadTimeout.Duration = time.Second * 10
}
@ -107,48 +109,44 @@ func (t *HttpListener) httpListen() error {
}
func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
t.wg.Add(1)
defer t.wg.Done()
body, err := ioutil.ReadAll(req.Body)
if err != nil {
log.Printf("Problem reading request: [%s], Error: %s\n", string(body), err)
http.Error(res, "ERROR reading request", http.StatusInternalServerError)
return
}
var path = req.URL.Path[1:]
if path == "write" {
switch req.URL.Path {
case "/write":
var metrics []telegraf.Metric
metrics, err = t.parser.Parse(body)
if err == nil {
t.storeMetrics(metrics)
for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
res.WriteHeader(http.StatusNoContent)
} else {
log.Printf("Problem parsing body: [%s], Error: %s\n", string(body), err)
http.Error(res, "ERROR parsing metrics", http.StatusInternalServerError)
}
} else if path == "query" {
// Deliver a dummy response to the query endpoint, as some InfluxDB clients test endpoint availability with a query
case "/query":
// Deliver a dummy response to the query endpoint, as some InfluxDB
// clients test endpoint availability with a query
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.WriteHeader(http.StatusOK)
res.Write([]byte("{\"results\":[]}"))
} else {
case "/ping":
// respond to ping requests
res.WriteHeader(http.StatusNoContent)
default:
// Don't know how to respond to calls to other endpoints
http.NotFound(res, req)
}
}
func (t *HttpListener) storeMetrics(metrics []telegraf.Metric) error {
t.Lock()
defer t.Unlock()
for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
return nil
}
func init() {
inputs.Add("http_listener", func() telegraf.Input {
return &HttpListener{}

View File

@ -1,10 +1,10 @@
package http_listener
import (
"sync"
"testing"
"time"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
@ -30,15 +30,14 @@ cpu_load_short,host=server06 value=12.0 1422568543702900257
func newTestHttpListener() *HttpListener {
listener := &HttpListener{
ServiceAddress: ":8186",
ReadTimeout: internal.Duration{Duration: time.Second * 10},
WriteTimeout: internal.Duration{Duration: time.Second * 10},
}
return listener
}
func TestWriteHTTP(t *testing.T) {
listener := newTestHttpListener()
listener.parser, _ = parsers.NewInfluxParser()
parser, _ := parsers.NewInfluxParser()
listener.SetParser(parser)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
@ -47,7 +46,7 @@ func TestWriteHTTP(t *testing.T) {
time.Sleep(time.Millisecond * 25)
// post single message to listener
var resp, err = http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(testMsg)))
resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(testMsg)))
require.NoError(t, err)
require.EqualValues(t, 204, resp.StatusCode)
@ -73,6 +72,55 @@ func TestWriteHTTP(t *testing.T) {
}
}
// writes 25,000 metrics to the listener with 10 different writers
func TestWriteHTTPHighTraffic(t *testing.T) {
listener := &HttpListener{ServiceAddress: ":8286"}
parser, _ := parsers.NewInfluxParser()
listener.SetParser(parser)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
time.Sleep(time.Millisecond * 25)
// post many messages to listener
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
for i := 0; i < 500; i++ {
resp, err := http.Post("http://localhost:8286/write?db=mydb", "", bytes.NewBuffer([]byte(testMsgs)))
require.NoError(t, err)
require.EqualValues(t, 204, resp.StatusCode)
}
wg.Done()
}()
}
wg.Wait()
time.Sleep(time.Millisecond * 50)
listener.Gather(acc)
require.Equal(t, int64(25000), int64(acc.NMetrics()))
}
func TestReceive404ForInvalidEndpoint(t *testing.T) {
listener := newTestHttpListener()
listener.parser, _ = parsers.NewInfluxParser()
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
time.Sleep(time.Millisecond * 25)
// post single message to listener
resp, err := http.Post("http://localhost:8186/foobar", "", bytes.NewBuffer([]byte(testMsg)))
require.NoError(t, err)
require.EqualValues(t, 404, resp.StatusCode)
}
func TestWriteHTTPInvalid(t *testing.T) {
time.Sleep(time.Millisecond * 250)
@ -86,7 +134,7 @@ func TestWriteHTTPInvalid(t *testing.T) {
time.Sleep(time.Millisecond * 25)
// post single message to listener
var resp, err = http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(badMsg)))
resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(badMsg)))
require.NoError(t, err)
require.EqualValues(t, 500, resp.StatusCode)
}
@ -104,12 +152,12 @@ func TestWriteHTTPEmpty(t *testing.T) {
time.Sleep(time.Millisecond * 25)
// post single message to listener
var resp, err = http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(emptyMsg)))
resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(emptyMsg)))
require.NoError(t, err)
require.EqualValues(t, 204, resp.StatusCode)
}
func TestQueryHTTP(t *testing.T) {
func TestQueryAndPingHTTP(t *testing.T) {
time.Sleep(time.Millisecond * 250)
listener := newTestHttpListener()
@ -122,7 +170,12 @@ func TestQueryHTTP(t *testing.T) {
time.Sleep(time.Millisecond * 25)
// post query to listener
var resp, err = http.Post("http://localhost:8186/query?db=&q=CREATE+DATABASE+IF+NOT+EXISTS+%22mydb%22", "", nil)
resp, err := http.Post("http://localhost:8186/query?db=&q=CREATE+DATABASE+IF+NOT+EXISTS+%22mydb%22", "", nil)
require.NoError(t, err)
require.EqualValues(t, 200, resp.StatusCode)
// post ping to listener
resp, err = http.Post("http://localhost:8186/ping", "", nil)
require.NoError(t, err)
require.EqualValues(t, 204, resp.StatusCode)
}