diff --git a/CHANGELOG.md b/CHANGELOG.md index 55b836c7c..e3084fc3a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ - [#1542](https://github.com/influxdata/telegraf/pull/1542): Add filestack webhook plugin. - [#1599](https://github.com/influxdata/telegraf/pull/1599): Add server hostname for each docker measurements. - [#1697](https://github.com/influxdata/telegraf/pull/1697): Add NATS output plugin. +- [#1407](https://github.com/influxdata/telegraf/pull/1407): HTTP service listener input plugin. ### Bugfixes @@ -91,17 +92,6 @@ consistent with the behavior of `collection_jitter`. - [#1434](https://github.com/influxdata/telegraf/pull/1434): Add measurement name arg to logparser plugin. - [#1479](https://github.com/influxdata/telegraf/pull/1479): logparser: change resp_code from a field to a tag. - [#1411](https://github.com/influxdata/telegraf/pull/1411): Implement support for fetching hddtemp data -- [#1407](https://github.com/influxdata/telegraf/pull/1407): HTTP service listener input plugin. - -### Bugfixes - -- [#1384](https://github.com/influxdata/telegraf/pull/1384): Fix datarace in apache input plugin. -- [#1399](https://github.com/influxdata/telegraf/issues/1399): Add `read_repairs` statistics to riak plugin. - -## v1.0 beta 2 [2016-06-21] - -### Features - - [#1340](https://github.com/influxdata/telegraf/issues/1340): statsd: do not log every dropped metric. - [#1368](https://github.com/influxdata/telegraf/pull/1368): Add precision rounding to all metrics on collection. - [#1390](https://github.com/influxdata/telegraf/pull/1390): Add support for Tengine diff --git a/plugins/inputs/http_listener/http_listener.go b/plugins/inputs/http_listener/http_listener.go index 30f7411aa..9110fd106 100644 --- a/plugins/inputs/http_listener/http_listener.go +++ b/plugins/inputs/http_listener/http_listener.go @@ -21,6 +21,7 @@ type HttpListener struct { WriteTimeout internal.Duration sync.Mutex + wg sync.WaitGroup listener *stoppableListener.StoppableListener @@ -84,12 +85,13 @@ func (t *HttpListener) Stop() { t.listener.Stop() t.listener.Close() + t.wg.Wait() + log.Println("Stopped HTTP listener service on ", t.ServiceAddress) } // httpListen listens for HTTP requests. func (t *HttpListener) httpListen() error { - if t.ReadTimeout.Duration < time.Second { t.ReadTimeout.Duration = time.Second * 10 } @@ -107,48 +109,44 @@ func (t *HttpListener) httpListen() error { } func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { + t.wg.Add(1) + defer t.wg.Done() body, err := ioutil.ReadAll(req.Body) - if err != nil { log.Printf("Problem reading request: [%s], Error: %s\n", string(body), err) http.Error(res, "ERROR reading request", http.StatusInternalServerError) return } - var path = req.URL.Path[1:] - - if path == "write" { + switch req.URL.Path { + case "/write": var metrics []telegraf.Metric metrics, err = t.parser.Parse(body) if err == nil { - t.storeMetrics(metrics) + for _, m := range metrics { + t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + } res.WriteHeader(http.StatusNoContent) } else { log.Printf("Problem parsing body: [%s], Error: %s\n", string(body), err) http.Error(res, "ERROR parsing metrics", http.StatusInternalServerError) } - } else if path == "query" { - // Deliver a dummy response to the query endpoint, as some InfluxDB clients test endpoint availability with a query + case "/query": + // Deliver a dummy response to the query endpoint, as some InfluxDB + // clients test endpoint availability with a query res.Header().Set("Content-Type", "application/json") res.Header().Set("X-Influxdb-Version", "1.0") res.WriteHeader(http.StatusOK) res.Write([]byte("{\"results\":[]}")) - } else { + case "/ping": + // respond to ping requests + res.WriteHeader(http.StatusNoContent) + default: // Don't know how to respond to calls to other endpoints http.NotFound(res, req) } } -func (t *HttpListener) storeMetrics(metrics []telegraf.Metric) error { - t.Lock() - defer t.Unlock() - - for _, m := range metrics { - t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) - } - return nil -} - func init() { inputs.Add("http_listener", func() telegraf.Input { return &HttpListener{} diff --git a/plugins/inputs/http_listener/http_listener_test.go b/plugins/inputs/http_listener/http_listener_test.go index 27452e1d6..270e8264a 100644 --- a/plugins/inputs/http_listener/http_listener_test.go +++ b/plugins/inputs/http_listener/http_listener_test.go @@ -1,10 +1,10 @@ package http_listener import ( + "sync" "testing" "time" - "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" @@ -30,15 +30,14 @@ cpu_load_short,host=server06 value=12.0 1422568543702900257 func newTestHttpListener() *HttpListener { listener := &HttpListener{ ServiceAddress: ":8186", - ReadTimeout: internal.Duration{Duration: time.Second * 10}, - WriteTimeout: internal.Duration{Duration: time.Second * 10}, } return listener } func TestWriteHTTP(t *testing.T) { listener := newTestHttpListener() - listener.parser, _ = parsers.NewInfluxParser() + parser, _ := parsers.NewInfluxParser() + listener.SetParser(parser) acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -47,7 +46,7 @@ func TestWriteHTTP(t *testing.T) { time.Sleep(time.Millisecond * 25) // post single message to listener - var resp, err = http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(testMsg))) + resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(testMsg))) require.NoError(t, err) require.EqualValues(t, 204, resp.StatusCode) @@ -73,6 +72,55 @@ func TestWriteHTTP(t *testing.T) { } } +// writes 25,000 metrics to the listener with 10 different writers +func TestWriteHTTPHighTraffic(t *testing.T) { + listener := &HttpListener{ServiceAddress: ":8286"} + parser, _ := parsers.NewInfluxParser() + listener.SetParser(parser) + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + + // post many messages to listener + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + for i := 0; i < 500; i++ { + resp, err := http.Post("http://localhost:8286/write?db=mydb", "", bytes.NewBuffer([]byte(testMsgs))) + require.NoError(t, err) + require.EqualValues(t, 204, resp.StatusCode) + } + wg.Done() + }() + } + + wg.Wait() + time.Sleep(time.Millisecond * 50) + listener.Gather(acc) + + require.Equal(t, int64(25000), int64(acc.NMetrics())) +} + +func TestReceive404ForInvalidEndpoint(t *testing.T) { + listener := newTestHttpListener() + listener.parser, _ = parsers.NewInfluxParser() + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + + // post single message to listener + resp, err := http.Post("http://localhost:8186/foobar", "", bytes.NewBuffer([]byte(testMsg))) + require.NoError(t, err) + require.EqualValues(t, 404, resp.StatusCode) +} + func TestWriteHTTPInvalid(t *testing.T) { time.Sleep(time.Millisecond * 250) @@ -86,7 +134,7 @@ func TestWriteHTTPInvalid(t *testing.T) { time.Sleep(time.Millisecond * 25) // post single message to listener - var resp, err = http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(badMsg))) + resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(badMsg))) require.NoError(t, err) require.EqualValues(t, 500, resp.StatusCode) } @@ -104,12 +152,12 @@ func TestWriteHTTPEmpty(t *testing.T) { time.Sleep(time.Millisecond * 25) // post single message to listener - var resp, err = http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(emptyMsg))) + resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(emptyMsg))) require.NoError(t, err) require.EqualValues(t, 204, resp.StatusCode) } -func TestQueryHTTP(t *testing.T) { +func TestQueryAndPingHTTP(t *testing.T) { time.Sleep(time.Millisecond * 250) listener := newTestHttpListener() @@ -122,7 +170,12 @@ func TestQueryHTTP(t *testing.T) { time.Sleep(time.Millisecond * 25) // post query to listener - var resp, err = http.Post("http://localhost:8186/query?db=&q=CREATE+DATABASE+IF+NOT+EXISTS+%22mydb%22", "", nil) + resp, err := http.Post("http://localhost:8186/query?db=&q=CREATE+DATABASE+IF+NOT+EXISTS+%22mydb%22", "", nil) require.NoError(t, err) require.EqualValues(t, 200, resp.StatusCode) + + // post ping to listener + resp, err = http.Post("http://localhost:8186/ping", "", nil) + require.NoError(t, err) + require.EqualValues(t, 204, resp.StatusCode) }