diff --git a/internal/content_coding.go b/internal/content_coding.go new file mode 100644 index 000000000..936dd9562 --- /dev/null +++ b/internal/content_coding.go @@ -0,0 +1,122 @@ +package internal + +import ( + "bytes" + "compress/gzip" + "errors" + "io" +) + +// NewContentEncoder returns a ContentEncoder for the encoding type. +func NewContentEncoder(encoding string) (ContentEncoder, error) { + switch encoding { + case "gzip": + return NewGzipEncoder() + + case "identity", "": + return NewIdentityEncoder(), nil + default: + return nil, errors.New("invalid value for content_encoding") + } +} + +// NewContentDecoder returns a ContentDecoder for the encoding type. +func NewContentDecoder(encoding string) (ContentDecoder, error) { + switch encoding { + case "gzip": + return NewGzipDecoder() + case "identity", "": + return NewIdentityDecoder(), nil + default: + return nil, errors.New("invalid value for content_encoding") + } +} + +// ContentEncoder applies a wrapper encoding to byte buffers. +type ContentEncoder interface { + Encode([]byte) ([]byte, error) +} + +// GzipEncoder compresses the buffer using gzip at the default level. +type GzipEncoder struct { + writer *gzip.Writer + buf *bytes.Buffer +} + +func NewGzipEncoder() (*GzipEncoder, error) { + var buf bytes.Buffer + return &GzipEncoder{ + writer: gzip.NewWriter(&buf), + buf: &buf, + }, nil +} + +func (e *GzipEncoder) Encode(data []byte) ([]byte, error) { + e.buf.Reset() + e.writer.Reset(e.buf) + + _, err := e.writer.Write(data) + if err != nil { + return nil, err + } + err = e.writer.Close() + if err != nil { + return nil, err + } + return e.buf.Bytes(), nil +} + +// IdentityEncoder is a null encoder that applies no transformation. +type IdentityEncoder struct{} + +func NewIdentityEncoder() *IdentityEncoder { + return &IdentityEncoder{} +} + +func (*IdentityEncoder) Encode(data []byte) ([]byte, error) { + return data, nil +} + +// ContentDecoder removes a wrapper encoding from byte buffers. +type ContentDecoder interface { + Decode([]byte) ([]byte, error) +} + +// GzipDecoder decompresses buffers with gzip compression. +type GzipDecoder struct { + reader *gzip.Reader + buf *bytes.Buffer +} + +func NewGzipDecoder() (*GzipDecoder, error) { + return &GzipDecoder{ + reader: new(gzip.Reader), + buf: new(bytes.Buffer), + }, nil +} + +func (d *GzipDecoder) Decode(data []byte) ([]byte, error) { + d.reader.Reset(bytes.NewBuffer(data)) + d.buf.Reset() + + _, err := d.buf.ReadFrom(d.reader) + if err != nil && err != io.EOF { + return nil, err + } + err = d.reader.Close() + if err != nil { + return nil, err + } + return d.buf.Bytes(), nil +} + +// IdentityDecoder is a null decoder that returns the input. +type IdentityDecoder struct{} + +func NewIdentityDecoder() *IdentityDecoder { + return &IdentityDecoder{} +} + +func (*IdentityDecoder) Decode(data []byte) ([]byte, error) { + return data, nil +} diff --git a/internal/content_coding_test.go b/internal/content_coding_test.go new file mode 100644 index 000000000..031633112 --- /dev/null +++ b/internal/content_coding_test.go @@ -0,0 +1,58 @@ +package internal + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGzipEncodeDecode(t *testing.T) { + enc, err := NewGzipEncoder() + require.NoError(t, err) + dec, err := NewGzipDecoder() + require.NoError(t, err) + + payload, err := enc.Encode([]byte("howdy")) + require.NoError(t, err) + + actual, err := dec.Decode(payload) + require.NoError(t, err) + + require.Equal(t, "howdy", string(actual)) +} + +func TestGzipReuse(t *testing.T) { + enc, err := NewGzipEncoder() + require.NoError(t, err) + dec, err := NewGzipDecoder() + require.NoError(t, err) + + payload, err := enc.Encode([]byte("howdy")) + require.NoError(t, err) + + actual, err := dec.Decode(payload) + require.NoError(t, err) + + require.Equal(t, "howdy", string(actual)) + + payload, err = enc.Encode([]byte("doody")) + require.NoError(t, err) + + actual, err = dec.Decode(payload) + require.NoError(t, err) + + require.Equal(t, "doody", string(actual)) +} + +func TestIdentityEncodeDecode(t *testing.T) { + enc := NewIdentityEncoder() + dec := NewIdentityDecoder() + + payload, err := enc.Encode([]byte("howdy")) + require.NoError(t, err) + + actual, err := dec.Decode(payload) + require.NoError(t, err) + + require.Equal(t, "howdy", string(actual)) +} diff --git a/plugins/inputs/amqp_consumer/README.md b/plugins/inputs/amqp_consumer/README.md index ca1af800c..84371ba4d 100644 --- a/plugins/inputs/amqp_consumer/README.md +++ b/plugins/inputs/amqp_consumer/README.md @@ -77,6 +77,10 @@ The following defaults are known to work with RabbitMQ: ## Use TLS but skip chain & host verification # insecure_skip_verify = false + ## Content encoding for message payloads, can be set to "gzip" to or + ## "identity" to apply no encoding. + # content_encoding = "identity" + ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: diff --git a/plugins/inputs/amqp_consumer/amqp_consumer.go b/plugins/inputs/amqp_consumer/amqp_consumer.go index d80a3683b..994a3736a 100644 --- a/plugins/inputs/amqp_consumer/amqp_consumer.go +++ b/plugins/inputs/amqp_consumer/amqp_consumer.go @@ -11,6 +11,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" @@ -52,12 +53,15 @@ type AMQPConsumer struct { AuthMethod string tls.ClientConfig + ContentEncoding string `toml:"content_encoding"` + deliveries map[telegraf.TrackingID]amqp.Delivery - parser parsers.Parser - conn *amqp.Connection - wg *sync.WaitGroup - cancel context.CancelFunc + parser parsers.Parser + conn *amqp.Connection + wg *sync.WaitGroup + cancel context.CancelFunc + decoder internal.ContentDecoder } type externalAuth struct{} @@ -147,6 +151,10 @@ func (a *AMQPConsumer) SampleConfig() string { ## Use TLS but skip chain & host verification # insecure_skip_verify = false + ## Content encoding for message payloads, can be set to "gzip" to or + ## "identity" to apply no encoding. + # content_encoding = "identity" + ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: @@ -201,6 +209,11 @@ func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error { return err } + a.decoder, err = internal.NewContentDecoder(a.ContentEncoding) + if err != nil { + return err + } + msgs, err := a.connect(amqpConf) if err != nil { return err @@ -428,8 +441,7 @@ func (a *AMQPConsumer) process(ctx context.Context, msgs <-chan amqp.Delivery, a } func (a *AMQPConsumer) onMessage(acc telegraf.TrackingAccumulator, d amqp.Delivery) error { - metrics, err := a.parser.Parse(d.Body) - if err != nil { + onError := func() { // Discard the message from the queue; will never be able to process // this message. rejErr := d.Ack(false) @@ -438,6 +450,17 @@ func (a *AMQPConsumer) onMessage(acc telegraf.TrackingAccumulator, d amqp.Delive d.DeliveryTag, rejErr) a.conn.Close() } + } + + body, err := a.decoder.Decode(d.Body) + if err != nil { + onError() + return err + } + + metrics, err := a.parser.Parse(body) + if err != nil { + onError() return err } diff --git a/plugins/outputs/amqp/README.md b/plugins/outputs/amqp/README.md index fe44ea4ed..68470a2c0 100644 --- a/plugins/outputs/amqp/README.md +++ b/plugins/outputs/amqp/README.md @@ -92,6 +92,14 @@ For an introduction to AMQP see: ## Recommended to set to true. # use_batch_format = false + ## Content encoding for message payloads, can be set to "gzip" to or + ## "identity" to apply no encoding. + ## + ## Please note that when use_batch_format = false each amqp message contains only + ## a single metric, it is recommended to use compression with batch format + ## for best results. + # content_encoding = "identity" + ## Data format to output. ## Each data format has its own unique set of configuration options, read ## more about them here: diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go index 56e1e13ef..4350f2e74 100644 --- a/plugins/outputs/amqp/amqp.go +++ b/plugins/outputs/amqp/amqp.go @@ -54,6 +54,7 @@ type AMQP struct { Headers map[string]string `toml:"headers"` Timeout internal.Duration `toml:"timeout"` UseBatchFormat bool `toml:"use_batch_format"` + ContentEncoding string `toml:"content_encoding"` tls.ClientConfig serializer serializers.Serializer @@ -61,6 +62,7 @@ type AMQP struct { client Client config *ClientConfig sentMessages int + encoder internal.ContentEncoder } type Client interface { @@ -149,6 +151,14 @@ var sampleConfig = ` ## Recommended to set to true. # use_batch_format = false + ## Content encoding for message payloads, can be set to "gzip" to or + ## "identity" to apply no encoding. + ## + ## Please note that when use_batch_format = false each amqp message contains only + ## a single metric, it is recommended to use compression with batch format + ## for best results. + # content_encoding = "identity" + ## Data format to output. ## Each data format has its own unique set of configuration options, read ## more about them here: @@ -177,11 +187,16 @@ func (q *AMQP) Connect() error { q.config = config } - client, err := q.connect(q.config) + var err error + q.encoder, err = internal.NewContentEncoder(q.ContentEncoding) + if err != nil { + return err + } + + q.client, err = q.connect(q.config) if err != nil { return err } - q.client = client return nil } @@ -227,6 +242,11 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error { return err } + body, err = q.encoder.Encode(body) + if err != nil { + return err + } + err = q.publish(key, body) if err != nil { // If this is the first attempt to publish and the connection is @@ -298,6 +318,7 @@ func (q *AMQP) makeClientConfig() (*ClientConfig, error) { exchange: q.Exchange, exchangeType: q.ExchangeType, exchangePassive: q.ExchangePassive, + encoding: q.ContentEncoding, timeout: q.Timeout.Duration, } diff --git a/plugins/outputs/amqp/client.go b/plugins/outputs/amqp/client.go index 0ee45d950..5e0dc3b49 100644 --- a/plugins/outputs/amqp/client.go +++ b/plugins/outputs/amqp/client.go @@ -19,6 +19,7 @@ type ClientConfig struct { exchangePassive bool exchangeDurable bool exchangeArguments amqp.Table + encoding string headers amqp.Table deliveryMode uint8 tlsConfig *tls.Config @@ -114,10 +115,11 @@ func (c *client) Publish(key string, body []byte) error { false, // mandatory false, // immediate amqp.Publishing{ - Headers: c.config.headers, - ContentType: "text/plain", - Body: body, - DeliveryMode: c.config.deliveryMode, + Headers: c.config.headers, + ContentType: "text/plain", + ContentEncoding: c.config.encoding, + Body: body, + DeliveryMode: c.config.deliveryMode, }) }