From 097b1e09dbf2be1e9dac6b3b26cfee210126f285 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 18 Oct 2016 12:22:23 +0100 Subject: [PATCH 01/12] http listener refactor in this commit: - chunks out the http request body to avoid making very large allocations. - establishes a limit for the maximum http request body size that the listener will accept. - utilizes a pool of byte buffers to reduce GC pressure. --- internal/buffer/buffer.go | 6 + plugins/inputs/http_listener/bufferpool.go | 43 +++ plugins/inputs/http_listener/http_listener.go | 279 +++++++++++++----- .../http_listener/http_listener_test.go | 35 ++- .../http_listener/stoppableListener/LICENSE | 10 - .../stoppableListener/listener.go | 62 ---- plugins/parsers/influx/parser.go | 19 +- 7 files changed, 287 insertions(+), 167 deletions(-) create mode 100644 plugins/inputs/http_listener/bufferpool.go delete mode 100644 plugins/inputs/http_listener/stoppableListener/LICENSE delete mode 100644 plugins/inputs/http_listener/stoppableListener/listener.go diff --git a/internal/buffer/buffer.go b/internal/buffer/buffer.go index b7a05bf03..75899f0f8 100644 --- a/internal/buffer/buffer.go +++ b/internal/buffer/buffer.go @@ -1,6 +1,8 @@ package buffer import ( + "sync" + "github.com/influxdata/telegraf" ) @@ -11,6 +13,8 @@ type Buffer struct { drops int // total metrics added total int + + sync.Mutex } // NewBuffer returns a Buffer @@ -61,11 +65,13 @@ func (b *Buffer) Add(metrics ...telegraf.Metric) { // the batch will be of maximum length batchSize. It can be less than batchSize, // if the length of Buffer is less than batchSize. func (b *Buffer) Batch(batchSize int) []telegraf.Metric { + b.Lock() n := min(len(b.buf), batchSize) out := make([]telegraf.Metric, n) for i := 0; i < n; i++ { out[i] = <-b.buf } + b.Unlock() return out } diff --git a/plugins/inputs/http_listener/bufferpool.go b/plugins/inputs/http_listener/bufferpool.go new file mode 100644 index 000000000..00a93652d --- /dev/null +++ b/plugins/inputs/http_listener/bufferpool.go @@ -0,0 +1,43 @@ +package http_listener + +import ( + "sync/atomic" +) + +type pool struct { + buffers chan []byte + size int + + created int64 +} + +// NewPool returns a new pool object. +// n is the number of buffers +// bufSize is the size (in bytes) of each buffer +func NewPool(n, bufSize int) *pool { + return &pool{ + buffers: make(chan []byte, n), + size: bufSize, + } +} + +func (p *pool) get() []byte { + select { + case b := <-p.buffers: + return b + default: + atomic.AddInt64(&p.created, 1) + return make([]byte, p.size) + } +} + +func (p *pool) put(b []byte) { + select { + case p.buffers <- b: + default: + } +} + +func (p *pool) ncreated() int64 { + return atomic.LoadInt64(&p.created) +} diff --git a/plugins/inputs/http_listener/http_listener.go b/plugins/inputs/http_listener/http_listener.go index 2eeee8e75..b63f62295 100644 --- a/plugins/inputs/http_listener/http_listener.go +++ b/plugins/inputs/http_listener/http_listener.go @@ -1,9 +1,9 @@ package http_listener import ( - "bufio" "bytes" - "fmt" + "compress/gzip" + "io" "log" "net" "net/http" @@ -13,135 +13,138 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" - "github.com/influxdata/telegraf/plugins/inputs/http_listener/stoppableListener" - "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/influx" +) + +const ( + // DEFAULT_MAX_BODY_SIZE is the default maximum request body size, in bytes. + // if the request body is over this size, we will return an HTTP 413 error. + // 500 MB + DEFAULT_MAX_BODY_SIZE = 500 * 1024 * 1024 + + // MAX_LINE_SIZE is the maximum size, in bytes, that can be allocated for + // a single InfluxDB point. + // 64 KB + DEFAULT_MAX_LINE_SIZE = 64 * 1024 ) type HttpListener struct { ServiceAddress string ReadTimeout internal.Duration WriteTimeout internal.Duration + MaxBodySize int64 + MaxLineSize int - sync.Mutex + mu sync.Mutex wg sync.WaitGroup - listener *stoppableListener.StoppableListener + listener net.Listener - parser parsers.Parser + parser influx.InfluxParser acc telegraf.Accumulator + pool *pool } const sampleConfig = ` ## Address and port to host HTTP listener on service_address = ":8186" - ## timeouts + ## maximum duration before timing out read of the request read_timeout = "10s" + ## maximum duration before timing out write of the response write_timeout = "10s" + + ## Maximum allowed http request body size in bytes. + ## 0 means to use the default of 536,870,912 bytes (500 mebibytes) + max_body_size = 0 + + ## Maximum line size allowed to be sent in bytes. + ## 0 means to use the default of 65536 bytes (64 kibibytes) + max_line_size = 0 ` -func (t *HttpListener) SampleConfig() string { +func (h *HttpListener) SampleConfig() string { return sampleConfig } -func (t *HttpListener) Description() string { +func (h *HttpListener) Description() string { return "Influx HTTP write listener" } -func (t *HttpListener) Gather(_ telegraf.Accumulator) error { +func (h *HttpListener) Gather(_ telegraf.Accumulator) error { + log.Printf("D! The http_listener has created %d buffers", h.pool.ncreated()) return nil } -func (t *HttpListener) SetParser(parser parsers.Parser) { - t.parser = parser -} - // Start starts the http listener service. -func (t *HttpListener) Start(acc telegraf.Accumulator) error { - t.Lock() - defer t.Unlock() +func (h *HttpListener) Start(acc telegraf.Accumulator) error { + h.mu.Lock() + defer h.mu.Unlock() + h.parser = influx.InfluxParser{} - t.acc = acc + if h.MaxBodySize == 0 { + h.MaxBodySize = DEFAULT_MAX_BODY_SIZE + } + if h.MaxLineSize == 0 { + h.MaxLineSize = DEFAULT_MAX_LINE_SIZE + } - var rawListener, err = net.Listen("tcp", t.ServiceAddress) - if err != nil { - return err - } - t.listener, err = stoppableListener.New(rawListener) + h.acc = acc + h.pool = NewPool(200, h.MaxLineSize) + + var listener, err = net.Listen("tcp", h.ServiceAddress) if err != nil { return err } + h.listener = listener - go t.httpListen() + h.wg.Add(1) + go func() { + defer h.wg.Done() + h.httpListen() + }() - log.Printf("I! Started HTTP listener service on %s\n", t.ServiceAddress) + log.Printf("I! Started HTTP listener service on %s\n", h.ServiceAddress) return nil } // Stop cleans up all resources -func (t *HttpListener) Stop() { - t.Lock() - defer t.Unlock() +func (h *HttpListener) Stop() { + h.mu.Lock() + defer h.mu.Unlock() - t.listener.Stop() - t.listener.Close() + h.listener.Close() + h.wg.Wait() - t.wg.Wait() - - log.Println("I! Stopped HTTP listener service on ", t.ServiceAddress) + log.Println("I! Stopped HTTP listener service on ", h.ServiceAddress) } -// httpListen listens for HTTP requests. -func (t *HttpListener) httpListen() error { - if t.ReadTimeout.Duration < time.Second { - t.ReadTimeout.Duration = time.Second * 10 +// httpListen sets up an http.Server and calls server.Serve. +// like server.Serve, httpListen will always return a non-nil error, for this +// reason, the error returned should probably be ignored. +// see https://golang.org/pkg/net/http/#Server.Serve +func (h *HttpListener) httpListen() error { + if h.ReadTimeout.Duration < time.Second { + h.ReadTimeout.Duration = time.Second * 10 } - if t.WriteTimeout.Duration < time.Second { - t.WriteTimeout.Duration = time.Second * 10 + if h.WriteTimeout.Duration < time.Second { + h.WriteTimeout.Duration = time.Second * 10 } var server = http.Server{ - Handler: t, - ReadTimeout: t.ReadTimeout.Duration, - WriteTimeout: t.WriteTimeout.Duration, + Handler: h, + ReadTimeout: h.ReadTimeout.Duration, + WriteTimeout: h.WriteTimeout.Duration, } - return server.Serve(t.listener) + return server.Serve(h.listener) } -func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { - t.wg.Add(1) - defer t.wg.Done() - +func (h *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { switch req.URL.Path { case "/write": - var http400msg bytes.Buffer - var partial string - scanner := bufio.NewScanner(req.Body) - scanner.Buffer([]byte(""), 128*1024) - for scanner.Scan() { - metrics, err := t.parser.Parse(scanner.Bytes()) - if err == nil { - for _, m := range metrics { - t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) - } - partial = "partial write: " - } else { - http400msg.WriteString(err.Error() + " ") - } - } - - if err := scanner.Err(); err != nil { - http.Error(res, "Internal server error: "+err.Error(), http.StatusInternalServerError) - } else if http400msg.Len() > 0 { - res.Header().Set("Content-Type", "application/json") - res.Header().Set("X-Influxdb-Version", "1.0") - res.WriteHeader(http.StatusBadRequest) - res.Write([]byte(fmt.Sprintf(`{"error":"%s%s"}`, partial, http400msg.String()))) - } else { - res.WriteHeader(http.StatusNoContent) - } + h.serveWrite(res, req) case "/query": // Deliver a dummy response to the query endpoint, as some InfluxDB // clients test endpoint availability with a query @@ -158,8 +161,134 @@ func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { } } +func (h *HttpListener) serveWrite(res http.ResponseWriter, req *http.Request) { + // Check that the content length is not too large for us to handle. + if req.ContentLength > h.MaxBodySize { + tooLarge(res) + return + } + now := time.Now() + + // Handle gzip request bodies + body := req.Body + if req.Header.Get("Content-Encoding") == "gzip" { + body, err := gzip.NewReader(req.Body) + defer body.Close() + if err != nil { + log.Println("E! " + err.Error()) + badRequest(res) + return + } + } + body = http.MaxBytesReader(res, body, h.MaxBodySize) + + var return400 bool + var hangingBytes bool + buf := h.pool.get() + defer func() { h.pool.put(buf) }() + bufStart := 0 + for { + n, err := io.ReadFull(body, buf[bufStart:]) + if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF { + log.Println("E! " + err.Error()) + // problem reading the request body + badRequest(res) + return + } + + if err == io.EOF { + if return400 { + badRequest(res) + } else { + res.WriteHeader(http.StatusNoContent) + } + return + } + + if hangingBytes { + i := bytes.IndexByte(buf, '\n') + if i == -1 { + // still didn't find a newline, keep scanning + continue + } + // rotate the bit remaining after the first newline to the front of the buffer + i++ // start copying after the newline + bufStart = len(buf) - i + if bufStart > 0 { + copy(buf, buf[i:]) + } + hangingBytes = false + continue + } + + if err == io.ErrUnexpectedEOF { + // finished reading the request body + if err := h.parse(buf[:n+bufStart], now); err != nil { + log.Println("E! " + err.Error()) + return400 = true + } + if return400 { + badRequest(res) + } else { + res.WriteHeader(http.StatusNoContent) + } + return + } + + // if we got down here it means that we filled our buffer, and there + // are still bytes remaining to be read. So we will parse up until the + // final newline, then push the rest of the bytes into the next buffer. + i := bytes.LastIndexByte(buf, '\n') + if i == -1 { + // drop any line longer than the max buffer size + log.Printf("E! http_listener received a single line longer than the maximum of %d bytes", + len(buf)) + hangingBytes = true + return400 = true + bufStart = 0 + continue + } + if err := h.parse(buf[:i], now); err != nil { + log.Println("E! " + err.Error()) + return400 = true + } + // rotate the bit remaining after the last newline to the front of the buffer + i++ // start copying after the newline + bufStart = len(buf) - i + if bufStart > 0 { + copy(buf, buf[i:]) + } + } +} + +func (h *HttpListener) parse(b []byte, t time.Time) error { + metrics, err := h.parser.ParseWithDefaultTime(b, t) + + for _, m := range metrics { + h.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + } + + return err +} + +func tooLarge(res http.ResponseWriter) { + res.Header().Set("Content-Type", "application/json") + res.Header().Set("X-Influxdb-Version", "1.0") + res.WriteHeader(http.StatusRequestEntityTooLarge) + res.Write([]byte(`{"error":"http: request body too large"}`)) +} + +func badRequest(res http.ResponseWriter) { + res.Header().Set("Content-Type", "application/json") + res.Header().Set("X-Influxdb-Version", "1.0") + res.WriteHeader(http.StatusBadRequest) + res.Write([]byte(`{"error":"http: bad request"}`)) +} + func init() { inputs.Add("http_listener", func() telegraf.Input { - return &HttpListener{} + return &HttpListener{ + ServiceAddress: ":8186", + } }) } diff --git a/plugins/inputs/http_listener/http_listener_test.go b/plugins/inputs/http_listener/http_listener_test.go index 267ba56a1..9f021885e 100644 --- a/plugins/inputs/http_listener/http_listener_test.go +++ b/plugins/inputs/http_listener/http_listener_test.go @@ -1,16 +1,15 @@ package http_listener import ( + "bytes" + "net/http" "sync" "testing" "time" - "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" - "bytes" "github.com/stretchr/testify/require" - "net/http" ) const ( @@ -36,8 +35,6 @@ func newTestHttpListener() *HttpListener { func TestWriteHTTP(t *testing.T) { listener := newTestHttpListener() - parser, _ := parsers.NewInfluxParser() - listener.SetParser(parser) acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -71,10 +68,10 @@ func TestWriteHTTP(t *testing.T) { ) } - // Post a gigantic metric to the listener: + // Post a gigantic metric to the listener and verify that an error is returned: resp, err = http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(hugeMetric))) require.NoError(t, err) - require.EqualValues(t, 204, resp.StatusCode) + require.EqualValues(t, 400, resp.StatusCode) time.Sleep(time.Millisecond * 15) acc.AssertContainsTaggedFields(t, "cpu_load_short", @@ -83,11 +80,27 @@ func TestWriteHTTP(t *testing.T) { ) } +func TestWriteHTTPMaxLineSizeIncrease(t *testing.T) { + listener := &HttpListener{ + ServiceAddress: ":8296", + MaxLineSize: 128 * 1000, + } + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + + // Post a gigantic metric to the listener and verify that an error is returned: + resp, err := http.Post("http://localhost:8296/write?db=mydb", "", bytes.NewBuffer([]byte(hugeMetric))) + require.NoError(t, err) + require.EqualValues(t, 204, resp.StatusCode) +} + // writes 25,000 metrics to the listener with 10 different writers func TestWriteHTTPHighTraffic(t *testing.T) { listener := &HttpListener{ServiceAddress: ":8286"} - parser, _ := parsers.NewInfluxParser() - listener.SetParser(parser) acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -118,7 +131,6 @@ func TestWriteHTTPHighTraffic(t *testing.T) { func TestReceive404ForInvalidEndpoint(t *testing.T) { listener := newTestHttpListener() - listener.parser, _ = parsers.NewInfluxParser() acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -136,7 +148,6 @@ func TestWriteHTTPInvalid(t *testing.T) { time.Sleep(time.Millisecond * 250) listener := newTestHttpListener() - listener.parser, _ = parsers.NewInfluxParser() acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -154,7 +165,6 @@ func TestWriteHTTPEmpty(t *testing.T) { time.Sleep(time.Millisecond * 250) listener := newTestHttpListener() - listener.parser, _ = parsers.NewInfluxParser() acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -172,7 +182,6 @@ func TestQueryAndPingHTTP(t *testing.T) { time.Sleep(time.Millisecond * 250) listener := newTestHttpListener() - listener.parser, _ = parsers.NewInfluxParser() acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) diff --git a/plugins/inputs/http_listener/stoppableListener/LICENSE b/plugins/inputs/http_listener/stoppableListener/LICENSE deleted file mode 100644 index eb0782451..000000000 --- a/plugins/inputs/http_listener/stoppableListener/LICENSE +++ /dev/null @@ -1,10 +0,0 @@ -Copyright (c) 2014, Eric Urban -All rights reserved. - -Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: - -1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. - -2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/plugins/inputs/http_listener/stoppableListener/listener.go b/plugins/inputs/http_listener/stoppableListener/listener.go deleted file mode 100644 index 69a9f33cc..000000000 --- a/plugins/inputs/http_listener/stoppableListener/listener.go +++ /dev/null @@ -1,62 +0,0 @@ -package stoppableListener - -import ( - "errors" - "net" - "time" -) - -type StoppableListener struct { - *net.TCPListener //Wrapped listener - stop chan int //Channel used only to indicate listener should shutdown -} - -func New(l net.Listener) (*StoppableListener, error) { - tcpL, ok := l.(*net.TCPListener) - - if !ok { - return nil, errors.New("Cannot wrap listener") - } - - retval := &StoppableListener{} - retval.TCPListener = tcpL - retval.stop = make(chan int) - - return retval, nil -} - -var StoppedError = errors.New("Listener stopped") - -func (sl *StoppableListener) Accept() (net.Conn, error) { - - for { - //Wait up to one second for a new connection - sl.SetDeadline(time.Now().Add(time.Second)) - - newConn, err := sl.TCPListener.Accept() - - //Check for the channel being closed - select { - case <-sl.stop: - return nil, StoppedError - default: - //If the channel is still open, continue as normal - } - - if err != nil { - netErr, ok := err.(net.Error) - - //If this is a timeout, then continue to wait for - //new connections - if ok && netErr.Timeout() && netErr.Temporary() { - continue - } - } - - return newConn, err - } -} - -func (sl *StoppableListener) Stop() { - close(sl.stop) -} diff --git a/plugins/parsers/influx/parser.go b/plugins/parsers/influx/parser.go index 68b7497fe..8ced6ed50 100644 --- a/plugins/parsers/influx/parser.go +++ b/plugins/parsers/influx/parser.go @@ -3,6 +3,7 @@ package influx import ( "bytes" "fmt" + "time" "github.com/influxdata/telegraf" @@ -15,15 +16,10 @@ type InfluxParser struct { DefaultTags map[string]string } -// Parse returns a slice of Metrics from a text representation of a -// metric (in line-protocol format) -// with each metric separated by newlines. If any metrics fail to parse, -// a non-nil error will be returned in addition to the metrics that parsed -// successfully. -func (p *InfluxParser) Parse(buf []byte) ([]telegraf.Metric, error) { +func (p *InfluxParser) ParseWithDefaultTime(buf []byte, t time.Time) ([]telegraf.Metric, error) { // parse even if the buffer begins with a newline buf = bytes.TrimPrefix(buf, []byte("\n")) - points, err := models.ParsePoints(buf) + points, err := models.ParsePointsWithPrecision(buf, t, "n") metrics := make([]telegraf.Metric, len(points)) for i, point := range points { for k, v := range p.DefaultTags { @@ -39,6 +35,15 @@ func (p *InfluxParser) Parse(buf []byte) ([]telegraf.Metric, error) { return metrics, err } +// Parse returns a slice of Metrics from a text representation of a +// metric (in line-protocol format) +// with each metric separated by newlines. If any metrics fail to parse, +// a non-nil error will be returned in addition to the metrics that parsed +// successfully. +func (p *InfluxParser) Parse(buf []byte) ([]telegraf.Metric, error) { + return p.ParseWithDefaultTime(buf, time.Now()) +} + func (p *InfluxParser) ParseLine(line string) (telegraf.Metric, error) { metrics, err := p.Parse([]byte(line + "\n")) From c849b58de930907cfb3ffbce397661f5343ab339 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 24 Oct 2016 16:15:46 +0100 Subject: [PATCH 02/12] http_listener input unit tests --- CHANGELOG.md | 2 +- internal/buffer/buffer.go | 6 +- plugins/inputs/http_listener/http_listener.go | 28 ++-- .../http_listener/http_listener_test.go | 129 ++++++++++++++++-- .../inputs/http_listener/testdata/testmsgs.gz | Bin 0 -> 97 bytes 5 files changed, 136 insertions(+), 29 deletions(-) create mode 100644 plugins/inputs/http_listener/testdata/testmsgs.gz diff --git a/CHANGELOG.md b/CHANGELOG.md index cd3b840cf..7bc5e7f8d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,7 +30,7 @@ continue sending logs to /var/log/telegraf/telegraf.log. - [#1542](https://github.com/influxdata/telegraf/pull/1542): Add filestack webhook plugin. - [#1599](https://github.com/influxdata/telegraf/pull/1599): Add server hostname for each docker measurements. - [#1697](https://github.com/influxdata/telegraf/pull/1697): Add NATS output plugin. -- [#1407](https://github.com/influxdata/telegraf/pull/1407): HTTP service listener input plugin. +- [#1407](https://github.com/influxdata/telegraf/pull/1407) & [#1915](https://github.com/influxdata/telegraf/pull/1915): HTTP service listener input plugin. - [#1699](https://github.com/influxdata/telegraf/pull/1699): Add database blacklist option for Postgresql - [#1791](https://github.com/influxdata/telegraf/pull/1791): Add Docker container state metrics to Docker input plugin output - [#1755](https://github.com/influxdata/telegraf/issues/1755): Add support to SNMP for IP & MAC address conversion. diff --git a/internal/buffer/buffer.go b/internal/buffer/buffer.go index 75899f0f8..58cd1c376 100644 --- a/internal/buffer/buffer.go +++ b/internal/buffer/buffer.go @@ -14,7 +14,7 @@ type Buffer struct { // total metrics added total int - sync.Mutex + mu sync.Mutex } // NewBuffer returns a Buffer @@ -65,13 +65,13 @@ func (b *Buffer) Add(metrics ...telegraf.Metric) { // the batch will be of maximum length batchSize. It can be less than batchSize, // if the length of Buffer is less than batchSize. func (b *Buffer) Batch(batchSize int) []telegraf.Metric { - b.Lock() + b.mu.Lock() n := min(len(b.buf), batchSize) out := make([]telegraf.Metric, n) for i := 0; i < n; i++ { out[i] = <-b.buf } - b.Unlock() + b.mu.Unlock() return out } diff --git a/plugins/inputs/http_listener/http_listener.go b/plugins/inputs/http_listener/http_listener.go index b63f62295..ddc9ac7bf 100644 --- a/plugins/inputs/http_listener/http_listener.go +++ b/plugins/inputs/http_listener/http_listener.go @@ -28,7 +28,7 @@ const ( DEFAULT_MAX_LINE_SIZE = 64 * 1024 ) -type HttpListener struct { +type HTTPListener struct { ServiceAddress string ReadTimeout internal.Duration WriteTimeout internal.Duration @@ -63,24 +63,23 @@ const sampleConfig = ` max_line_size = 0 ` -func (h *HttpListener) SampleConfig() string { +func (h *HTTPListener) SampleConfig() string { return sampleConfig } -func (h *HttpListener) Description() string { +func (h *HTTPListener) Description() string { return "Influx HTTP write listener" } -func (h *HttpListener) Gather(_ telegraf.Accumulator) error { +func (h *HTTPListener) Gather(_ telegraf.Accumulator) error { log.Printf("D! The http_listener has created %d buffers", h.pool.ncreated()) return nil } // Start starts the http listener service. -func (h *HttpListener) Start(acc telegraf.Accumulator) error { +func (h *HTTPListener) Start(acc telegraf.Accumulator) error { h.mu.Lock() defer h.mu.Unlock() - h.parser = influx.InfluxParser{} if h.MaxBodySize == 0 { h.MaxBodySize = DEFAULT_MAX_BODY_SIZE @@ -110,7 +109,7 @@ func (h *HttpListener) Start(acc telegraf.Accumulator) error { } // Stop cleans up all resources -func (h *HttpListener) Stop() { +func (h *HTTPListener) Stop() { h.mu.Lock() defer h.mu.Unlock() @@ -124,7 +123,7 @@ func (h *HttpListener) Stop() { // like server.Serve, httpListen will always return a non-nil error, for this // reason, the error returned should probably be ignored. // see https://golang.org/pkg/net/http/#Server.Serve -func (h *HttpListener) httpListen() error { +func (h *HTTPListener) httpListen() error { if h.ReadTimeout.Duration < time.Second { h.ReadTimeout.Duration = time.Second * 10 } @@ -141,7 +140,7 @@ func (h *HttpListener) httpListen() error { return server.Serve(h.listener) } -func (h *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { +func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { switch req.URL.Path { case "/write": h.serveWrite(res, req) @@ -161,7 +160,7 @@ func (h *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { } } -func (h *HttpListener) serveWrite(res http.ResponseWriter, req *http.Request) { +func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) { // Check that the content length is not too large for us to handle. if req.ContentLength > h.MaxBodySize { tooLarge(res) @@ -171,8 +170,9 @@ func (h *HttpListener) serveWrite(res http.ResponseWriter, req *http.Request) { // Handle gzip request bodies body := req.Body + var err error if req.Header.Get("Content-Encoding") == "gzip" { - body, err := gzip.NewReader(req.Body) + body, err = gzip.NewReader(req.Body) defer body.Close() if err != nil { log.Println("E! " + err.Error()) @@ -185,7 +185,7 @@ func (h *HttpListener) serveWrite(res http.ResponseWriter, req *http.Request) { var return400 bool var hangingBytes bool buf := h.pool.get() - defer func() { h.pool.put(buf) }() + defer h.pool.put(buf) bufStart := 0 for { n, err := io.ReadFull(body, buf[bufStart:]) @@ -261,7 +261,7 @@ func (h *HttpListener) serveWrite(res http.ResponseWriter, req *http.Request) { } } -func (h *HttpListener) parse(b []byte, t time.Time) error { +func (h *HTTPListener) parse(b []byte, t time.Time) error { metrics, err := h.parser.ParseWithDefaultTime(b, t) for _, m := range metrics { @@ -287,7 +287,7 @@ func badRequest(res http.ResponseWriter) { func init() { inputs.Add("http_listener", func() telegraf.Input { - return &HttpListener{ + return &HTTPListener{ ServiceAddress: ":8186", } }) diff --git a/plugins/inputs/http_listener/http_listener_test.go b/plugins/inputs/http_listener/http_listener_test.go index 9f021885e..84cf209ff 100644 --- a/plugins/inputs/http_listener/http_listener_test.go +++ b/plugins/inputs/http_listener/http_listener_test.go @@ -2,6 +2,7 @@ package http_listener import ( "bytes" + "io/ioutil" "net/http" "sync" "testing" @@ -26,15 +27,15 @@ cpu_load_short,host=server06 value=12.0 1422568543702900257 emptyMsg = "" ) -func newTestHttpListener() *HttpListener { - listener := &HttpListener{ +func newTestHTTPListener() *HTTPListener { + listener := &HTTPListener{ ServiceAddress: ":8186", } return listener } func TestWriteHTTP(t *testing.T) { - listener := newTestHttpListener() + listener := newTestHTTPListener() acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -81,7 +82,7 @@ func TestWriteHTTP(t *testing.T) { } func TestWriteHTTPMaxLineSizeIncrease(t *testing.T) { - listener := &HttpListener{ + listener := &HTTPListener{ ServiceAddress: ":8296", MaxLineSize: 128 * 1000, } @@ -92,15 +93,121 @@ func TestWriteHTTPMaxLineSizeIncrease(t *testing.T) { time.Sleep(time.Millisecond * 25) - // Post a gigantic metric to the listener and verify that an error is returned: + // Post a gigantic metric to the listener and verify that it writes OK this time: resp, err := http.Post("http://localhost:8296/write?db=mydb", "", bytes.NewBuffer([]byte(hugeMetric))) require.NoError(t, err) require.EqualValues(t, 204, resp.StatusCode) } +func TestWriteHTTPVerySmallMaxBody(t *testing.T) { + listener := &HTTPListener{ + ServiceAddress: ":8297", + MaxBodySize: 4096, + } + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + + resp, err := http.Post("http://localhost:8297/write", "", bytes.NewBuffer([]byte(hugeMetric))) + require.NoError(t, err) + require.EqualValues(t, 413, resp.StatusCode) +} + +func TestWriteHTTPVerySmallMaxLineSize(t *testing.T) { + listener := &HTTPListener{ + ServiceAddress: ":8298", + MaxLineSize: 70, + } + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + + resp, err := http.Post("http://localhost:8298/write", "", bytes.NewBuffer([]byte(testMsgs))) + require.NoError(t, err) + require.EqualValues(t, 204, resp.StatusCode) + + time.Sleep(time.Millisecond * 15) + hostTags := []string{"server02", "server03", + "server04", "server05", "server06"} + for _, hostTag := range hostTags { + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": hostTag}, + ) + } +} + +func TestWriteHTTPLargeLinesSkipped(t *testing.T) { + listener := &HTTPListener{ + ServiceAddress: ":8300", + MaxLineSize: 100, + } + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + + resp, err := http.Post("http://localhost:8300/write", "", bytes.NewBuffer([]byte(hugeMetric+testMsgs))) + require.NoError(t, err) + require.EqualValues(t, 400, resp.StatusCode) + + time.Sleep(time.Millisecond * 15) + hostTags := []string{"server02", "server03", + "server04", "server05", "server06"} + for _, hostTag := range hostTags { + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": hostTag}, + ) + } +} + +// test that writing gzipped data works +func TestWriteHTTPGzippedData(t *testing.T) { + listener := &HTTPListener{ + ServiceAddress: ":8299", + } + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + + data, err := ioutil.ReadFile("./testdata/testmsgs.gz") + require.NoError(t, err) + + req, err := http.NewRequest("POST", "http://localhost:8299/write", bytes.NewBuffer(data)) + require.NoError(t, err) + req.Header.Set("Content-Encoding", "gzip") + + client := &http.Client{} + resp, err := client.Do(req) + require.NoError(t, err) + require.EqualValues(t, 204, resp.StatusCode) + + time.Sleep(time.Millisecond * 50) + hostTags := []string{"server02", "server03", + "server04", "server05", "server06"} + for _, hostTag := range hostTags { + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": hostTag}, + ) + } +} + // writes 25,000 metrics to the listener with 10 different writers func TestWriteHTTPHighTraffic(t *testing.T) { - listener := &HttpListener{ServiceAddress: ":8286"} + listener := &HTTPListener{ServiceAddress: ":8286"} acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -123,14 +230,14 @@ func TestWriteHTTPHighTraffic(t *testing.T) { } wg.Wait() - time.Sleep(time.Millisecond * 50) + time.Sleep(time.Millisecond * 250) listener.Gather(acc) require.Equal(t, int64(25000), int64(acc.NMetrics())) } func TestReceive404ForInvalidEndpoint(t *testing.T) { - listener := newTestHttpListener() + listener := newTestHTTPListener() acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -147,7 +254,7 @@ func TestReceive404ForInvalidEndpoint(t *testing.T) { func TestWriteHTTPInvalid(t *testing.T) { time.Sleep(time.Millisecond * 250) - listener := newTestHttpListener() + listener := newTestHTTPListener() acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -164,7 +271,7 @@ func TestWriteHTTPInvalid(t *testing.T) { func TestWriteHTTPEmpty(t *testing.T) { time.Sleep(time.Millisecond * 250) - listener := newTestHttpListener() + listener := newTestHTTPListener() acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -181,7 +288,7 @@ func TestWriteHTTPEmpty(t *testing.T) { func TestQueryAndPingHTTP(t *testing.T) { time.Sleep(time.Millisecond * 250) - listener := newTestHttpListener() + listener := newTestHTTPListener() acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) diff --git a/plugins/inputs/http_listener/testdata/testmsgs.gz b/plugins/inputs/http_listener/testdata/testmsgs.gz new file mode 100644 index 0000000000000000000000000000000000000000..f524dc07128b95fa256b4e0df66bc2b6f04d7058 GIT binary patch literal 97 zcmV-n0G|IJiwFSz6b@Jb14}L_jnBzXOo=bf$S*3<$;dA*u`Nz5DoZUgFj6Q>%qdN^ zH8j#QP%tzxGBP!@Ff}nYH!!j^FfcMT=Ss${*O&smCKTv3r9iJ4A-!Axe?y61Edc-k Db0r~l literal 0 HcmV?d00001 From 662db7a944a44d7b594375b7071c8dfc12618a7d Mon Sep 17 00:00:00 2001 From: Alex Zorin Date: Tue, 25 Oct 2016 18:30:01 +1100 Subject: [PATCH 03/12] Fix panic in internal.Duration UnmarshalTOML --- internal/internal.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/internal/internal.go b/internal/internal.go index 664a1d13b..a278d2da0 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -35,10 +35,13 @@ type Duration struct { // UnmarshalTOML parses the duration from the TOML config file func (d *Duration) UnmarshalTOML(b []byte) error { var err error + // Parse string duration, ie, "1s" - d.Duration, err = time.ParseDuration(string(b[1 : len(b)-1])) - if err == nil { - return nil + if uq, err := strconv.Unquote(string(b)); err == nil && len(uq) > 0 { + d.Duration, err = time.ParseDuration(uq) + if err == nil { + return nil + } } // First try parsing as integer seconds From f729fa990d6b355f8d30915460ac0cff50799c6f Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 25 Oct 2016 13:11:32 +0100 Subject: [PATCH 04/12] Unit testing for internal.Duration Unmarshal closes #1926 --- internal/internal.go | 6 ++++++ internal/internal_test.go | 19 +++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/internal/internal.go b/internal/internal.go index a278d2da0..28b37c09f 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -36,6 +36,12 @@ type Duration struct { func (d *Duration) UnmarshalTOML(b []byte) error { var err error + // see if we can straight convert it + d.Duration, err = time.ParseDuration(string(b)) + if err == nil { + return nil + } + // Parse string duration, ie, "1s" if uq, err := strconv.Unquote(string(b)); err == nil && len(uq) > 0 { d.Duration, err = time.ParseDuration(uq) diff --git a/internal/internal_test.go b/internal/internal_test.go index c18991c2d..0d9821857 100644 --- a/internal/internal_test.go +++ b/internal/internal_test.go @@ -131,3 +131,22 @@ func TestRandomSleep(t *testing.T) { elapsed = time.Since(s) assert.True(t, elapsed < time.Millisecond*150) } + +func TestDuration(t *testing.T) { + var d Duration + + d.UnmarshalTOML([]byte(`"1s"`)) + assert.Equal(t, time.Second, d.Duration) + + d = Duration{} + d.UnmarshalTOML([]byte(`1s`)) + assert.Equal(t, time.Second, d.Duration) + + d = Duration{} + d.UnmarshalTOML([]byte(`10`)) + assert.Equal(t, 10*time.Second, d.Duration) + + d = Duration{} + d.UnmarshalTOML([]byte(`1.5`)) + assert.Equal(t, time.Second, d.Duration) +} From 80d4864844ab1e74f28852b0475532e3ce9415d5 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 25 Oct 2016 13:17:27 +0100 Subject: [PATCH 05/12] Only install fpm,rpm,boto if we need them --- CHANGELOG.md | 1 + circle.yml | 3 --- scripts/circle-test.sh | 4 ++++ 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7bc5e7f8d..efea23f16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ continue sending logs to /var/log/telegraf/telegraf.log. ### Bugfixes +- [#1926](https://github.com/influxdata/telegraf/issues/1926): Fix toml unmarshal panic in Duration objects. - [#1746](https://github.com/influxdata/telegraf/issues/1746): Fix handling of non-string values for JSON keys listed in tag_keys. - [#1628](https://github.com/influxdata/telegraf/issues/1628): Fix mongodb input panic on version 2.2. - [#1733](https://github.com/influxdata/telegraf/issues/1733): Fix statsd scientific notation parsing diff --git a/circle.yml b/circle.yml index 4d5ede725..38445f032 100644 --- a/circle.yml +++ b/circle.yml @@ -12,9 +12,6 @@ machine: dependencies: override: - docker info - post: - - gem install fpm - - sudo apt-get install -y rpm python-boto test: override: diff --git a/scripts/circle-test.sh b/scripts/circle-test.sh index 662426392..6e7a125c1 100755 --- a/scripts/circle-test.sh +++ b/scripts/circle-test.sh @@ -75,6 +75,10 @@ cat telegraf-race | gzip > $CIRCLE_ARTIFACTS/telegraf-race.gz eval "git describe --exact-match HEAD" if [ $? -eq 0 ]; then + # install fpm (packaging dependency) + exit_if_fail gem install fpm + # install boto & rpm (packaging & AWS dependencies) + exit_if_fail sudo apt-get install -y rpm python-boto unset GOGC tag=$(git describe --exact-match HEAD) echo $tag From 393d1299823ad862b9d66eed802f90454e727ad0 Mon Sep 17 00:00:00 2001 From: Priyank Trivedi Date: Tue, 25 Oct 2016 18:27:55 +0530 Subject: [PATCH 06/12] Fix typo from 'Proctstas' to 'Procstat' in procstat plugin's README (#1945) --- plugins/inputs/procstat/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/inputs/procstat/README.md b/plugins/inputs/procstat/README.md index ef96500a3..d31120743 100644 --- a/plugins/inputs/procstat/README.md +++ b/plugins/inputs/procstat/README.md @@ -10,7 +10,7 @@ The plugin will tag processes by their PID and their process name. Processes can be specified either by pid file, by executable name, by command line pattern matching, or by username (in this order or priority. Procstat plugin will use `pgrep` when executable name is provided to obtain the pid. -Proctstas plugin will transmit IO, memory, cpu, file descriptor related +Procstat plugin will transmit IO, memory, cpu, file descriptor related measurements for every process specified. A prefix can be set to isolate individual process specific measurements. From 61269c3500cbec0cd3f61871f17f1b876805d196 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 25 Oct 2016 13:47:38 +0100 Subject: [PATCH 07/12] Update config generation docs closes #1925 --- README.md | 71 +++++++++++++++++++--------------------- cmd/telegraf/telegraf.go | 2 +- docs/CONFIGURATION.md | 4 +-- 3 files changed, 37 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index d338caaa9..efba9e6c7 100644 --- a/README.md +++ b/README.md @@ -85,45 +85,42 @@ if you don't have it already. You also must build with golang version 1.5+. ## How to use it: -```console -$ telegraf --help -Telegraf, The plugin-driven server agent for collecting and reporting metrics. +See usage with: -Usage: - - telegraf [commands|flags] - -The commands & flags are: - - config print out full sample configuration to stdout - version print the version to stdout - - --config configuration file to load - --test gather metrics once, print them to stdout, and exit - --config-directory directory containing additional *.conf files - --input-filter filter the input plugins to enable, separator is : - --output-filter filter the output plugins to enable, separator is : - --usage print usage for a plugin, ie, 'telegraf -usage mysql' - --debug print metrics as they're generated to stdout - --quiet run in quiet mode - -Examples: - - # generate a telegraf config file: - telegraf config > telegraf.conf - - # generate config with only cpu input & influxdb output plugins defined - telegraf config -input-filter cpu -output-filter influxdb - - # run a single telegraf collection, outputing metrics to stdout - telegraf --config telegraf.conf -test - - # run telegraf with all plugins defined in config file - telegraf --config telegraf.conf - - # run telegraf, enabling the cpu & memory input, and influxdb output plugins - telegraf --config telegraf.conf -input-filter cpu:mem -output-filter influxdb ``` +telegraf --help +``` + +### Generate a telegraf config file: + +``` +telegraf config > telegraf.conf +``` + +### Generate config with only cpu input & influxdb output plugins defined + +``` +telegraf --input-filter cpu --output-filter influxdb config +``` + +### Run a single telegraf collection, outputing metrics to stdout + +``` +telegraf --config telegraf.conf -test +``` + +### Run telegraf with all plugins defined in config file + +``` +telegraf --config telegraf.conf +``` + +### Run telegraf, enabling the cpu & memory input, and influxdb output plugins + +``` +telegraf --config telegraf.conf -input-filter cpu:mem -output-filter influxdb +``` + ## Configuration diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index 347da1985..0f94c6e2c 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -95,7 +95,7 @@ Examples: telegraf config > telegraf.conf # generate config with only cpu input & influxdb output plugins defined - telegraf config -input-filter cpu -output-filter influxdb + telegraf --input-filter cpu --output-filter influxdb config # run a single telegraf collection, outputing metrics to stdout telegraf --config telegraf.conf -test diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index d05fc987e..9b2eb99d8 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -12,10 +12,10 @@ telegraf config > telegraf.conf ``` To generate a file with specific inputs and outputs, you can use the --input-filter and -output-filter flags: +--input-filter and --output-filter flags: ``` -telegraf config -input-filter cpu:mem:net:swap -output-filter influxdb:kafka +telegraf --input-filter cpu:mem:net:swap --output-filter influxdb:kafka config ``` ## Environment Variables From c66363cba5f003ae68de8b08b9f79c0e9272e149 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 25 Oct 2016 14:49:21 +0100 Subject: [PATCH 08/12] Update Go version: 1.7.1->1.7.3 --- circle.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/circle.yml b/circle.yml index 38445f032..7196c64f1 100644 --- a/circle.yml +++ b/circle.yml @@ -4,9 +4,9 @@ machine: post: - sudo service zookeeper stop - go version - - go version | grep 1.7.1 || sudo rm -rf /usr/local/go - - wget https://storage.googleapis.com/golang/go1.7.1.linux-amd64.tar.gz - - sudo tar -C /usr/local -xzf go1.7.1.linux-amd64.tar.gz + - go version | grep 1.7.3 || sudo rm -rf /usr/local/go + - wget https://storage.googleapis.com/golang/go1.7.3.linux-amd64.tar.gz + - sudo tar -C /usr/local -xzf go1.7.3.linux-amd64.tar.gz - go version dependencies: From b1a97e35b961a99b2fedacc7d1892d65bedbf20f Mon Sep 17 00:00:00 2001 From: Jonathan Chauncey Date: Wed, 26 Oct 2016 08:47:35 -0400 Subject: [PATCH 09/12] fix(kubernetes): Only initialize RoundTripper once (#1951) fixes #1933 --- plugins/inputs/kubernetes/kubernetes.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/plugins/inputs/kubernetes/kubernetes.go b/plugins/inputs/kubernetes/kubernetes.go index 6b488a7d1..ee95d560f 100644 --- a/plugins/inputs/kubernetes/kubernetes.go +++ b/plugins/inputs/kubernetes/kubernetes.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "io/ioutil" - "net" "net/http" "net/url" "sync" @@ -31,6 +30,8 @@ type Kubernetes struct { SSLKey string `toml:"ssl_key"` // Use SSL but skip chain & host verification InsecureSkipVerify bool + + RoundTripper http.RoundTripper } var sampleConfig = ` @@ -101,15 +102,12 @@ func (k *Kubernetes) gatherSummary(baseURL string, acc telegraf.Accumulator) err return err } - var rt http.RoundTripper = &http.Transport{ - Dial: (&net.Dialer{ - Timeout: 5 * time.Second, - KeepAlive: 30 * time.Second, - }).Dial, - TLSHandshakeTimeout: 5 * time.Second, - TLSClientConfig: tlsCfg, - ResponseHeaderTimeout: time.Duration(3 * time.Second), - DisableKeepAlives: false, + if k.RoundTripper == nil { + k.RoundTripper = &http.Transport{ + TLSHandshakeTimeout: 5 * time.Second, + TLSClientConfig: tlsCfg, + ResponseHeaderTimeout: time.Duration(3 * time.Second), + } } if k.BearerToken != "" { @@ -120,7 +118,7 @@ func (k *Kubernetes) gatherSummary(baseURL string, acc telegraf.Accumulator) err req.Header.Set("Authorization", "Bearer "+string(token)) } - resp, err = rt.RoundTrip(req) + resp, err = k.RoundTripper.RoundTrip(req) if err != nil { return fmt.Errorf("error making HTTP request to %s: %s", url, err) } From 522658bd076eccbc1d2a094860b102a163803501 Mon Sep 17 00:00:00 2001 From: Paulo Pires Date: Wed, 26 Oct 2016 10:45:33 -0400 Subject: [PATCH 10/12] Fix NATS plug-ins reconnection logic (#1955) * NATS output plug-in now retries to reconnect forever after a lost connection. * NATS input plug-in now retries to reconnect forever after a lost connection. * Fixes #1953 --- CHANGELOG.md | 1 + plugins/inputs/nats_consumer/nats_consumer.go | 7 +++++++ plugins/outputs/nats/nats.go | 11 ++++++++++- 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index efea23f16..dfa79ebd8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ continue sending logs to /var/log/telegraf/telegraf.log. ### Bugfixes +- [#1955](https://github.com/influxdata/telegraf/issues/1955): Fix NATS plug-ins reconnection logic. - [#1926](https://github.com/influxdata/telegraf/issues/1926): Fix toml unmarshal panic in Duration objects. - [#1746](https://github.com/influxdata/telegraf/issues/1746): Fix handling of non-string values for JSON keys listed in tag_keys. - [#1628](https://github.com/influxdata/telegraf/issues/1628): Fix mongodb input panic on version 2.2. diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index a760d0362..6d59cce28 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -91,8 +91,15 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error { var connectErr error + // set default NATS connection options opts := nats.DefaultOptions + + // override max reconnection tries + opts.MaxReconnect = -1 + + // override servers if any were specified opts.Servers = n.Servers + opts.Secure = n.Secure if n.Conn == nil || n.Conn.IsClosed() { diff --git a/plugins/outputs/nats/nats.go b/plugins/outputs/nats/nats.go index 5008937d9..68911be38 100644 --- a/plugins/outputs/nats/nats.go +++ b/plugins/outputs/nats/nats.go @@ -62,14 +62,23 @@ func (n *NATS) SetSerializer(serializer serializers.Serializer) { func (n *NATS) Connect() error { var err error - // set NATS connection options + + // set default NATS connection options opts := nats_client.DefaultOptions + + // override max reconnection tries + opts.MaxReconnect = -1 + + // override servers, if any were specified opts.Servers = n.Servers + + // override authentication, if any was specified if n.Username != "" { opts.User = n.Username opts.Password = n.Password } + // override TLS, if it was specified tlsConfig, err := internal.GetTLSConfig( n.SSLCert, n.SSLKey, n.SSLCA, n.InsecureSkipVerify) if err != nil { From 0cfa0d419a411092bb8051a3495b880e0be8785e Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Wed, 26 Oct 2016 19:04:48 +0100 Subject: [PATCH 11/12] udp_listener & tcp_listener set default values closes #1936 --- CHANGELOG.md | 1 + plugins/inputs/tcp_listener/tcp_listener.go | 12 ++++++++---- plugins/inputs/udp_listener/udp_listener.go | 9 ++++++--- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dfa79ebd8..83aecc135 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ continue sending logs to /var/log/telegraf/telegraf.log. ### Bugfixes - [#1955](https://github.com/influxdata/telegraf/issues/1955): Fix NATS plug-ins reconnection logic. +- [#1936](https://github.com/influxdata/telegraf/issues/1936): Set required default values in udp_listener & tcp_listener. - [#1926](https://github.com/influxdata/telegraf/issues/1926): Fix toml unmarshal panic in Duration objects. - [#1746](https://github.com/influxdata/telegraf/issues/1746): Fix handling of non-string values for JSON keys listed in tag_keys. - [#1628](https://github.com/influxdata/telegraf/issues/1628): Fix mongodb input panic on version 2.2. diff --git a/plugins/inputs/tcp_listener/tcp_listener.go b/plugins/inputs/tcp_listener/tcp_listener.go index a8827c037..861442348 100644 --- a/plugins/inputs/tcp_listener/tcp_listener.go +++ b/plugins/inputs/tcp_listener/tcp_listener.go @@ -52,14 +52,14 @@ var malformedwarn = "E! tcp_listener has received %d malformed packets" + const sampleConfig = ` ## Address and port to host TCP listener on - service_address = ":8094" + # service_address = ":8094" ## Number of TCP messages allowed to queue up. Once filled, the ## TCP listener will start dropping packets. - allowed_pending_messages = 10000 + # allowed_pending_messages = 10000 ## Maximum number of concurrent TCP connections to allow - max_tcp_connections = 250 + # max_tcp_connections = 250 ## Data format to consume. ## Each data format has it's own unique set of configuration options, read @@ -276,6 +276,10 @@ func (t *TcpListener) remember(id string, conn *net.TCPConn) { func init() { inputs.Add("tcp_listener", func() telegraf.Input { - return &TcpListener{} + return &TcpListener{ + ServiceAddress: ":8094", + AllowedPendingMessages: 10000, + MaxTCPConnections: 250, + } }) } diff --git a/plugins/inputs/udp_listener/udp_listener.go b/plugins/inputs/udp_listener/udp_listener.go index d2c4d0bbc..f8dff5269 100644 --- a/plugins/inputs/udp_listener/udp_listener.go +++ b/plugins/inputs/udp_listener/udp_listener.go @@ -51,11 +51,11 @@ var malformedwarn = "E! udp_listener has received %d malformed packets" + const sampleConfig = ` ## Address and port to host UDP listener on - service_address = ":8092" + # service_address = ":8092" ## Number of UDP messages allowed to queue up. Once filled, the ## UDP listener will start dropping packets. - allowed_pending_messages = 10000 + # allowed_pending_messages = 10000 ## Data format to consume. ## Each data format has it's own unique set of configuration options, read @@ -178,6 +178,9 @@ func (u *UdpListener) udpParser() error { func init() { inputs.Add("udp_listener", func() telegraf.Input { - return &UdpListener{} + return &UdpListener{ + ServiceAddress: ":8092", + AllowedPendingMessages: 10000, + } }) } From fc59757a1a70537190c11afcc423eea1c94ff3f3 Mon Sep 17 00:00:00 2001 From: albundy83 Date: Thu, 27 Oct 2016 12:45:17 +0200 Subject: [PATCH 12/12] Just fix typo (#1962) --- plugins/inputs/cassandra/README.md | 64 +++++++++++++++--------------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/plugins/inputs/cassandra/README.md b/plugins/inputs/cassandra/README.md index bfbcff77c..edcef08c4 100644 --- a/plugins/inputs/cassandra/README.md +++ b/plugins/inputs/cassandra/README.md @@ -7,7 +7,7 @@ #### Description -The Cassandra plugin collects Cassandra/JVM metrics exposed as MBean's attributes through jolokia REST endpoint. All metrics are collected for each server configured. +The Cassandra plugin collects Cassandra 3 / JVM metrics exposed as MBean's attributes through jolokia REST endpoint. All metrics are collected for each server configured. See: https://jolokia.org/ and [Cassandra Documentation](http://docs.datastax.com/en/cassandra/3.x/cassandra/operations/monitoringCassandraTOC.html) @@ -38,9 +38,9 @@ Here is a list of metrics that might be useful to monitor your cassandra cluster ####measurement = javaGarbageCollector -- /java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionTime +- /java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionTime - /java.lang:type=GarbageCollector,name=ConcurrentMarkSweep/CollectionCount -- /java.lang:type=GarbageCollector,name=ParNew/CollectionTime +- /java.lang:type=GarbageCollector,name=ParNew/CollectionTime - /java.lang:type=GarbageCollector,name=ParNew/CollectionCount ####measurement = javaMemory @@ -50,13 +50,13 @@ Here is a list of metrics that might be useful to monitor your cassandra cluster ####measurement = cassandraCache -- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Hit +- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Hits - /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Requests - /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Entries -- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Size -- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Capacity -- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Hit -- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Requests +- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Size +- /org.apache.cassandra.metrics:type=Cache,scope=KeyCache,name=Capacity +- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Hits +- /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Requests - /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Entries - /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Size - /org.apache.cassandra.metrics:type=Cache,scope=RowCache,name=Capacity @@ -67,33 +67,33 @@ Here is a list of metrics that might be useful to monitor your cassandra cluster ####measurement = cassandraClientRequest -- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=TotalLatency -- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=TotalLatency -- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Latency -- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Latency -- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Timeouts +- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=TotalLatency +- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=TotalLatency +- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Latency +- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Latency +- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Timeouts - /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Timeouts -- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Unavailables -- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Unavailables -- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Failures -- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Failures +- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Unavailables +- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Unavailables +- /org.apache.cassandra.metrics:type=ClientRequest,scope=Read,name=Failures +- /org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Failures ####measurement = cassandraCommitLog -- /org.apache.cassandra.metrics:type=CommitLog,name=PendingTasks +- /org.apache.cassandra.metrics:type=CommitLog,name=PendingTasks - /org.apache.cassandra.metrics:type=CommitLog,name=TotalCommitLogSize ####measurement = cassandraCompaction -- /org.apache.cassandra.metrics:type=Compaction,name=CompletedTask -- /org.apache.cassandra.metrics:type=Compaction,name=PendingTasks +- /org.apache.cassandra.metrics:type=Compaction,name=CompletedTasks +- /org.apache.cassandra.metrics:type=Compaction,name=PendingTasks - /org.apache.cassandra.metrics:type=Compaction,name=TotalCompactionsCompleted - /org.apache.cassandra.metrics:type=Compaction,name=BytesCompacted ####measurement = cassandraStorage - /org.apache.cassandra.metrics:type=Storage,name=Load -- /org.apache.cassandra.metrics:type=Storage,name=Exceptions +- /org.apache.cassandra.metrics:type=Storage,name=Exceptions ####measurement = cassandraTable Using wildcards for "keyspace" and "scope" can create a lot of series as metrics will be reported for every table and keyspace including internal system tables. Specify a keyspace name and/or a table name to limit them. @@ -101,25 +101,25 @@ Using wildcards for "keyspace" and "scope" can create a lot of series as metrics - /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=LiveDiskSpaceUsed - /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=TotalDiskSpaceUsed - /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=ReadLatency -- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=CoordinatorReadLatency -- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=WriteLatency -- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=ReadTotalLatency -- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=WriteTotalLatency +- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=CoordinatorReadLatency +- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=WriteLatency +- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=ReadTotalLatency +- /org.apache.cassandra.metrics:type=Table,keyspace=\*,scope=\*,name=WriteTotalLatency ####measurement = cassandraThreadPools -- /org.apache.cassandra.metrics:type=ThreadPools,path=internal,scope=CompactionExecutor,name=ActiveTasks -- /org.apache.cassandra.metrics:type=ThreadPools,path=internal,scope=AntiEntropyStage,name=ActiveTasks +- /org.apache.cassandra.metrics:type=ThreadPools,path=internal,scope=CompactionExecutor,name=ActiveTasks +- /org.apache.cassandra.metrics:type=ThreadPools,path=internal,scope=AntiEntropyStage,name=ActiveTasks - /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=CounterMutationStage,name=PendingTasks - /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=CounterMutationStage,name=CurrentlyBlockedTasks - /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=MutationStage,name=PendingTasks -- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=MutationStage,name=CurrentlyBlockedTasks +- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=MutationStage,name=CurrentlyBlockedTasks - /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadRepairStage,name=PendingTasks -- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadRepairStage,name=CurrentlyBlockedTasks +- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadRepairStage,name=CurrentlyBlockedTasks - /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadStage,name=PendingTasks -- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadStage,name=CurrentlyBlockedTasks -- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=RequestResponseStage,name=PendingTasks -- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=RequestResponseStage,name=CurrentlyBlockedTasks +- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=ReadStage,name=CurrentlyBlockedTasks +- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=RequestResponseStage,name=PendingTasks +- /org.apache.cassandra.metrics:type=ThreadPools,path=request,scope=RequestResponseStage,name=CurrentlyBlockedTasks