diff --git a/internal/internal.go b/internal/internal.go index f6b85de84..567b0f773 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -16,6 +16,8 @@ import ( "syscall" "time" "unicode" + + "github.com/alecthomas/units" ) const alphanum string = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" @@ -36,6 +38,11 @@ type Duration struct { Duration time.Duration } +// Size just wraps an int64 +type Size struct { + Size int64 +} + // SetVersion sets the telegraf agent version func SetVersion(v string) error { if version != "" { @@ -85,6 +92,27 @@ func (d *Duration) UnmarshalTOML(b []byte) error { return nil } +func (s *Size) UnmarshalTOML(b []byte) error { + var err error + b = bytes.Trim(b, `'`) + + val, err := strconv.ParseInt(string(b), 10, 64) + if err == nil { + s.Size = val + return nil + } + uq, err := strconv.Unquote(string(b)) + if err != nil { + return err + } + val, err = units.ParseStrictBytes(uq) + if err != nil { + return err + } + s.Size = val + return nil +} + // ReadLines reads contents from a file and splits them by new lines. // A convenience wrapper to ReadLinesOffsetN(filename, 0, -1). func ReadLines(filename string) ([]string, error) { diff --git a/internal/internal_test.go b/internal/internal_test.go index 486c3d744..89ee06903 100644 --- a/internal/internal_test.go +++ b/internal/internal_test.go @@ -166,6 +166,29 @@ func TestDuration(t *testing.T) { assert.Equal(t, time.Second, d.Duration) } +func TestSize(t *testing.T) { + var s Size + + s.UnmarshalTOML([]byte(`"1B"`)) + assert.Equal(t, int64(1), s.Size) + + s = Size{} + s.UnmarshalTOML([]byte(`1`)) + assert.Equal(t, int64(1), s.Size) + + s = Size{} + s.UnmarshalTOML([]byte(`'1'`)) + assert.Equal(t, int64(1), s.Size) + + s = Size{} + s.UnmarshalTOML([]byte(`"1GB"`)) + assert.Equal(t, int64(1000*1000*1000), s.Size) + + s = Size{} + s.UnmarshalTOML([]byte(`"12GiB"`)) + assert.Equal(t, int64(12*1024*1024*1024), s.Size) +} + func TestCompressWithGzip(t *testing.T) { testData := "the quick brown fox jumps over the lazy dog" inputBuffer := bytes.NewBuffer([]byte(testData)) diff --git a/plugins/inputs/filecount/README.md b/plugins/inputs/filecount/README.md index ccec532aa..cf11b7d90 100644 --- a/plugins/inputs/filecount/README.md +++ b/plugins/inputs/filecount/README.md @@ -19,10 +19,11 @@ Counts files in directories that match certain criteria. ## Only count regular files. Defaults to true. regular_only = true - ## Only count files that are at least this size in bytes. If size is + ## Only count files that are at least this size. If size is ## a negative number, only count files that are smaller than the - ## absolute value of size. Defaults to 0. - size = 0 + ## absolute value of size. Acceptable units are B, KiB, MiB, KB, ... + ## Without quotes and units, interpreted as size in bytes. + size = "0B" ## Only count files that have not been touched for at least this ## duration. If mtime is negative, only count files that have been diff --git a/plugins/inputs/filecount/filecount.go b/plugins/inputs/filecount/filecount.go index 66d5a33fe..d613f3b77 100644 --- a/plugins/inputs/filecount/filecount.go +++ b/plugins/inputs/filecount/filecount.go @@ -34,10 +34,11 @@ const sampleConfig = ` ## Only count regular files. Defaults to true. regular_only = true - ## Only count files that are at least this size in bytes. If size is + ## Only count files that are at least this size. If size is ## a negative number, only count files that are smaller than the - ## absolute value of size. Defaults to 0. - size = 0 + ## absolute value of size. Acceptable units are B, KiB, MiB, KB, ... + ## Without quotes and units, interpreted as size in bytes. + size = "0B" ## Only count files that have not been touched for at least this ## duration. If mtime is negative, only count files that have been @@ -51,7 +52,7 @@ type FileCount struct { Name string Recursive bool RegularOnly bool - Size int64 + Size internal.Size MTime internal.Duration `toml:"mtime"` fileFilters []fileFilterFunc } @@ -99,7 +100,7 @@ func (fc *FileCount) regularOnlyFilter() fileFilterFunc { } func (fc *FileCount) sizeFilter() fileFilterFunc { - if fc.Size == 0 { + if fc.Size.Size == 0 { return nil } @@ -107,10 +108,10 @@ func (fc *FileCount) sizeFilter() fileFilterFunc { if !f.Mode().IsRegular() { return false, nil } - if fc.Size < 0 { - return f.Size() < -fc.Size, nil + if fc.Size.Size < 0 { + return f.Size() < -fc.Size.Size, nil } - return f.Size() >= fc.Size, nil + return f.Size() >= fc.Size.Size, nil } } @@ -257,7 +258,7 @@ func NewFileCount() *FileCount { Name: "*", Recursive: true, RegularOnly: true, - Size: 0, + Size: internal.Size{Size: 0}, MTime: internal.Duration{Duration: 0}, fileFilters: nil, } diff --git a/plugins/inputs/filecount/filecount_test.go b/plugins/inputs/filecount/filecount_test.go index 16bb83de5..7a48c2166 100644 --- a/plugins/inputs/filecount/filecount_test.go +++ b/plugins/inputs/filecount/filecount_test.go @@ -70,7 +70,7 @@ func TestRegularOnlyFilter(t *testing.T) { func TestSizeFilter(t *testing.T) { fc := getNoFilterFileCount("testdata") - fc.Size = -100 + fc.Size = internal.Size{Size: -100} matches := []string{"foo", "bar", "baz", "subdir/quux", "subdir/quuz"} acc := testutil.Accumulator{} @@ -78,7 +78,7 @@ func TestSizeFilter(t *testing.T) { require.True(t, assertFileCount(&acc, "testdata", len(matches))) - fc.Size = 100 + fc.Size = internal.Size{Size: 100} matches = []string{"qux"} acc = testutil.Accumulator{} @@ -119,7 +119,7 @@ func getNoFilterFileCount(dir string) FileCount { Name: "*", Recursive: true, RegularOnly: false, - Size: 0, + Size: internal.Size{Size: 0}, MTime: internal.Duration{Duration: 0}, fileFilters: nil, } diff --git a/plugins/inputs/http_listener_v2/README.md b/plugins/inputs/http_listener_v2/README.md index 6d5d25aa4..f5a853189 100644 --- a/plugins/inputs/http_listener_v2/README.md +++ b/plugins/inputs/http_listener_v2/README.md @@ -28,8 +28,8 @@ This is a sample configuration for the plugin. # write_timeout = "10s" ## Maximum allowed http request body size in bytes. - ## 0 means to use the default of 536,870,912 bytes (500 mebibytes) - # max_body_size = 0 + ## 0 means to use the default of 524,288,000 bytes (500 mebibytes) + # max_body_size = "500MB" ## Set one or more allowed client CA certificate file names to ## enable mutually authenticated TLS connections diff --git a/plugins/inputs/http_listener_v2/http_listener_v2.go b/plugins/inputs/http_listener_v2/http_listener_v2.go index 871d1080b..3fd8989f9 100644 --- a/plugins/inputs/http_listener_v2/http_listener_v2.go +++ b/plugins/inputs/http_listener_v2/http_listener_v2.go @@ -31,7 +31,7 @@ type HTTPListenerV2 struct { Methods []string ReadTimeout internal.Duration WriteTimeout internal.Duration - MaxBodySize int64 + MaxBodySize internal.Size Port int tlsint.ServerConfig @@ -65,8 +65,8 @@ const sampleConfig = ` # write_timeout = "10s" ## Maximum allowed http request body size in bytes. - ## 0 means to use the default of 536,870,912 bytes (500 mebibytes) - # max_body_size = 0 + ## 0 means to use the default of 524,288,00 bytes (500 mebibytes) + # max_body_size = "500MB" ## Set one or more allowed client CA certificate file names to ## enable mutually authenticated TLS connections @@ -106,8 +106,8 @@ func (h *HTTPListenerV2) SetParser(parser parsers.Parser) { // Start starts the http listener service. func (h *HTTPListenerV2) Start(acc telegraf.Accumulator) error { - if h.MaxBodySize == 0 { - h.MaxBodySize = defaultMaxBodySize + if h.MaxBodySize.Size == 0 { + h.MaxBodySize.Size = defaultMaxBodySize } if h.ReadTimeout.Duration < time.Second { @@ -173,7 +173,7 @@ func (h *HTTPListenerV2) ServeHTTP(res http.ResponseWriter, req *http.Request) { func (h *HTTPListenerV2) serveWrite(res http.ResponseWriter, req *http.Request) { // Check that the content length is not too large for us to handle. - if req.ContentLength > h.MaxBodySize { + if req.ContentLength > h.MaxBodySize.Size { tooLarge(res) return } @@ -204,7 +204,7 @@ func (h *HTTPListenerV2) serveWrite(res http.ResponseWriter, req *http.Request) defer body.Close() } - body = http.MaxBytesReader(res, body, h.MaxBodySize) + body = http.MaxBytesReader(res, body, h.MaxBodySize.Size) bytes, err := ioutil.ReadAll(body) if err != nil { tooLarge(res) diff --git a/plugins/inputs/http_listener_v2/http_listener_v2_test.go b/plugins/inputs/http_listener_v2/http_listener_v2_test.go index 3287ea59e..ab0c89f81 100644 --- a/plugins/inputs/http_listener_v2/http_listener_v2_test.go +++ b/plugins/inputs/http_listener_v2/http_listener_v2_test.go @@ -13,6 +13,7 @@ import ( "testing" "time" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" @@ -51,7 +52,7 @@ func newTestHTTPListenerV2() *HTTPListenerV2 { Methods: []string{"POST"}, Parser: parser, TimeFunc: time.Now, - MaxBodySize: 70000, + MaxBodySize: internal.Size{Size: 70000}, } return listener } @@ -234,7 +235,7 @@ func TestWriteHTTPExactMaxBodySize(t *testing.T) { Path: "/write", Methods: []string{"POST"}, Parser: parser, - MaxBodySize: int64(len(hugeMetric)), + MaxBodySize: internal.Size{Size: int64(len(hugeMetric))}, TimeFunc: time.Now, } @@ -256,7 +257,7 @@ func TestWriteHTTPVerySmallMaxBody(t *testing.T) { Path: "/write", Methods: []string{"POST"}, Parser: parser, - MaxBodySize: 4096, + MaxBodySize: internal.Size{Size: 4096}, TimeFunc: time.Now, } diff --git a/plugins/inputs/influxdb_listener/http_listener.go b/plugins/inputs/influxdb_listener/http_listener.go index 29beff9a8..b8abeecd7 100644 --- a/plugins/inputs/influxdb_listener/http_listener.go +++ b/plugins/inputs/influxdb_listener/http_listener.go @@ -39,8 +39,8 @@ type HTTPListener struct { ServiceAddress string ReadTimeout internal.Duration WriteTimeout internal.Duration - MaxBodySize int64 - MaxLineSize int + MaxBodySize internal.Size + MaxLineSize internal.Size Port int tlsint.ServerConfig @@ -84,12 +84,12 @@ const sampleConfig = ` write_timeout = "10s" ## Maximum allowed http request body size in bytes. - ## 0 means to use the default of 536,870,912 bytes (500 mebibytes) - max_body_size = 0 + ## 0 means to use the default of 524,288,000 bytes (500 mebibytes) + max_body_size = "500MiB" ## Maximum line size allowed to be sent in bytes. ## 0 means to use the default of 65536 bytes (64 kibibytes) - max_line_size = 0 + max_line_size = "64KiB" ## Set one or more allowed client CA certificate file names to ## enable mutually authenticated TLS connections @@ -139,11 +139,11 @@ func (h *HTTPListener) Start(acc telegraf.Accumulator) error { h.BuffersCreated = selfstat.Register("http_listener", "buffers_created", tags) h.AuthFailures = selfstat.Register("http_listener", "auth_failures", tags) - if h.MaxBodySize == 0 { - h.MaxBodySize = DEFAULT_MAX_BODY_SIZE + if h.MaxBodySize.Size == 0 { + h.MaxBodySize.Size = DEFAULT_MAX_BODY_SIZE } - if h.MaxLineSize == 0 { - h.MaxLineSize = DEFAULT_MAX_LINE_SIZE + if h.MaxLineSize.Size == 0 { + h.MaxLineSize.Size = DEFAULT_MAX_LINE_SIZE } if h.ReadTimeout.Duration < time.Second { @@ -154,7 +154,7 @@ func (h *HTTPListener) Start(acc telegraf.Accumulator) error { } h.acc = acc - h.pool = NewPool(200, h.MaxLineSize) + h.pool = NewPool(200, int(h.MaxLineSize.Size)) tlsConf, err := h.ServerConfig.TLSConfig() if err != nil { @@ -241,7 +241,7 @@ func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) { // Check that the content length is not too large for us to handle. - if req.ContentLength > h.MaxBodySize { + if req.ContentLength > h.MaxBodySize.Size { tooLarge(res) return } @@ -261,7 +261,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) { return } } - body = http.MaxBytesReader(res, body, h.MaxBodySize) + body = http.MaxBytesReader(res, body, h.MaxBodySize.Size) var return400 bool var hangingBytes bool diff --git a/plugins/inputs/influxdb_listener/http_listener_test.go b/plugins/inputs/influxdb_listener/http_listener_test.go index 3277e5344..964295061 100644 --- a/plugins/inputs/influxdb_listener/http_listener_test.go +++ b/plugins/inputs/influxdb_listener/http_listener_test.go @@ -13,6 +13,7 @@ import ( "testing" "time" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" @@ -217,7 +218,7 @@ func TestWriteHTTPNoNewline(t *testing.T) { func TestWriteHTTPMaxLineSizeIncrease(t *testing.T) { listener := &HTTPListener{ ServiceAddress: "localhost:0", - MaxLineSize: 128 * 1000, + MaxLineSize: internal.Size{Size: 128 * 1000}, TimeFunc: time.Now, } @@ -235,7 +236,7 @@ func TestWriteHTTPMaxLineSizeIncrease(t *testing.T) { func TestWriteHTTPVerySmallMaxBody(t *testing.T) { listener := &HTTPListener{ ServiceAddress: "localhost:0", - MaxBodySize: 4096, + MaxBodySize: internal.Size{Size: 4096}, TimeFunc: time.Now, } @@ -252,7 +253,7 @@ func TestWriteHTTPVerySmallMaxBody(t *testing.T) { func TestWriteHTTPVerySmallMaxLineSize(t *testing.T) { listener := &HTTPListener{ ServiceAddress: "localhost:0", - MaxLineSize: 70, + MaxLineSize: internal.Size{Size: 70}, TimeFunc: time.Now, } @@ -279,7 +280,7 @@ func TestWriteHTTPVerySmallMaxLineSize(t *testing.T) { func TestWriteHTTPLargeLinesSkipped(t *testing.T) { listener := &HTTPListener{ ServiceAddress: "localhost:0", - MaxLineSize: 100, + MaxLineSize: internal.Size{Size: 100}, TimeFunc: time.Now, } diff --git a/plugins/inputs/socket_listener/README.md b/plugins/inputs/socket_listener/README.md index ff73b1fbb..2f1a0572e 100644 --- a/plugins/inputs/socket_listener/README.md +++ b/plugins/inputs/socket_listener/README.md @@ -42,11 +42,11 @@ This is a sample configuration for the plugin. ## Enables client authentication if set. # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] - ## Maximum socket buffer size in bytes. + ## Maximum socket buffer size (in bytes when no unit specified). ## For stream sockets, once the buffer fills up, the sender will start backing up. ## For datagram sockets, once the buffer fills up, metrics will start dropping. ## Defaults to the OS default. - # read_buffer_size = 65535 + # read_buffer_size = "64KiB" ## Period between keep alive probes. ## Only applies to TCP sockets. diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go index daab84952..73c321f81 100644 --- a/plugins/inputs/socket_listener/socket_listener.go +++ b/plugins/inputs/socket_listener/socket_listener.go @@ -47,9 +47,9 @@ func (ssl *streamSocketListener) listen() { break } - if ssl.ReadBufferSize > 0 { + if ssl.ReadBufferSize.Size > 0 { if srb, ok := c.(setReadBufferer); ok { - srb.SetReadBuffer(ssl.ReadBufferSize) + srb.SetReadBuffer(int(ssl.ReadBufferSize.Size)) } else { log.Printf("W! Unable to set read buffer on a %s socket", ssl.sockType) } @@ -164,7 +164,7 @@ func (psl *packetSocketListener) listen() { type SocketListener struct { ServiceAddress string `toml:"service_address"` MaxConnections int `toml:"max_connections"` - ReadBufferSize int `toml:"read_buffer_size"` + ReadBufferSize internal.Size `toml:"read_buffer_size"` ReadTimeout *internal.Duration `toml:"read_timeout"` KeepAlivePeriod *internal.Duration `toml:"keep_alive_period"` tlsint.ServerConfig @@ -209,11 +209,11 @@ func (sl *SocketListener) SampleConfig() string { ## Enables client authentication if set. # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] - ## Maximum socket buffer size in bytes. + ## Maximum socket buffer size (in bytes when no unit specified). ## For stream sockets, once the buffer fills up, the sender will start backing up. ## For datagram sockets, once the buffer fills up, metrics will start dropping. ## Defaults to the OS default. - # read_buffer_size = 65535 + # read_buffer_size = "64KiB" ## Period between keep alive probes. ## Only applies to TCP sockets. @@ -286,9 +286,9 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error { return err } - if sl.ReadBufferSize > 0 { + if sl.ReadBufferSize.Size > 0 { if srb, ok := pc.(setReadBufferer); ok { - srb.SetReadBuffer(sl.ReadBufferSize) + srb.SetReadBuffer(int(sl.ReadBufferSize.Size)) } else { log.Printf("W! Unable to set read buffer on a %s socket", spl[0]) } diff --git a/plugins/inputs/socket_listener/socket_listener_test.go b/plugins/inputs/socket_listener/socket_listener_test.go index 26691ef54..ae7fef8b9 100644 --- a/plugins/inputs/socket_listener/socket_listener_test.go +++ b/plugins/inputs/socket_listener/socket_listener_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -81,7 +82,7 @@ func TestSocketListener_tcp(t *testing.T) { sl := newSocketListener() sl.ServiceAddress = "tcp://127.0.0.1:0" - sl.ReadBufferSize = 1024 + sl.ReadBufferSize = internal.Size{Size: 1024} acc := &testutil.Accumulator{} err := sl.Start(acc) @@ -99,7 +100,7 @@ func TestSocketListener_udp(t *testing.T) { sl := newSocketListener() sl.ServiceAddress = "udp://127.0.0.1:0" - sl.ReadBufferSize = 1024 + sl.ReadBufferSize = internal.Size{Size: 1024} acc := &testutil.Accumulator{} err := sl.Start(acc) @@ -123,7 +124,7 @@ func TestSocketListener_unix(t *testing.T) { os.Create(sock) sl := newSocketListener() sl.ServiceAddress = "unix://" + sock - sl.ReadBufferSize = 1024 + sl.ReadBufferSize = internal.Size{Size: 1024} acc := &testutil.Accumulator{} err = sl.Start(acc) @@ -147,7 +148,7 @@ func TestSocketListener_unixgram(t *testing.T) { os.Create(sock) sl := newSocketListener() sl.ServiceAddress = "unixgram://" + sock - sl.ReadBufferSize = 1024 + sl.ReadBufferSize = internal.Size{Size: 1024} acc := &testutil.Accumulator{} err = sl.Start(acc) diff --git a/plugins/outputs/influxdb/README.md b/plugins/outputs/influxdb/README.md index e9b3b0346..5d223ca3d 100644 --- a/plugins/outputs/influxdb/README.md +++ b/plugins/outputs/influxdb/README.md @@ -42,7 +42,7 @@ The InfluxDB output plugin writes metrics to the [InfluxDB v1.x] HTTP or UDP ser # user_agent = "telegraf" ## UDP payload size is the maximum packet size to send. - # udp_payload = 512 + # udp_payload = "512B" ## Optional TLS Config for use on HTTP connections. # tls_ca = "/etc/telegraf/ca.pem" diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index 06079dfc5..1f61b801f 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -41,7 +41,7 @@ type InfluxDB struct { RetentionPolicy string WriteConsistency string Timeout internal.Duration - UDPPayload int `toml:"udp_payload"` + UDPPayload internal.Size `toml:"udp_payload"` HTTPProxy string `toml:"http_proxy"` HTTPHeaders map[string]string `toml:"http_headers"` ContentEncoding string `toml:"content_encoding"` @@ -95,7 +95,7 @@ var sampleConfig = ` # user_agent = "telegraf" ## UDP payload size is the maximum packet size to send. - # udp_payload = 512 + # udp_payload = "512B" ## Optional TLS Config for use on HTTP connections. # tls_ca = "/etc/telegraf/ca.pem" @@ -225,7 +225,7 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { func (i *InfluxDB) udpClient(url *url.URL) (Client, error) { config := &UDPConfig{ URL: url, - MaxPayloadSize: i.UDPPayload, + MaxPayloadSize: int(i.UDPPayload.Size), Serializer: i.serializer, } diff --git a/plugins/outputs/influxdb/influxdb_test.go b/plugins/outputs/influxdb/influxdb_test.go index 3ec10989e..63ecc47be 100644 --- a/plugins/outputs/influxdb/influxdb_test.go +++ b/plugins/outputs/influxdb/influxdb_test.go @@ -74,7 +74,7 @@ func TestConnectUDPConfig(t *testing.T) { output := influxdb.InfluxDB{ URLs: []string{"udp://localhost:8089"}, - UDPPayload: 42, + UDPPayload: internal.Size{Size: 42}, CreateUDPClientF: func(config *influxdb.UDPConfig) (influxdb.Client, error) { actual = config