Add support for gzip compression to amqp plugins (#5830)

This commit is contained in:
Daniel Nelson 2019-05-20 14:36:23 -07:00 committed by GitHub
parent 1b2773a762
commit b5cd9a9ff2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 250 additions and 12 deletions

122
internal/content_coding.go Normal file
View File

@ -0,0 +1,122 @@
package internal
import (
"bytes"
"compress/gzip"
"errors"
"io"
)
// NewContentEncoder returns a ContentEncoder for the encoding type.
func NewContentEncoder(encoding string) (ContentEncoder, error) {
switch encoding {
case "gzip":
return NewGzipEncoder()
case "identity", "":
return NewIdentityEncoder(), nil
default:
return nil, errors.New("invalid value for content_encoding")
}
}
// NewContentDecoder returns a ContentDecoder for the encoding type.
func NewContentDecoder(encoding string) (ContentDecoder, error) {
switch encoding {
case "gzip":
return NewGzipDecoder()
case "identity", "":
return NewIdentityDecoder(), nil
default:
return nil, errors.New("invalid value for content_encoding")
}
}
// ContentEncoder applies a wrapper encoding to byte buffers.
type ContentEncoder interface {
Encode([]byte) ([]byte, error)
}
// GzipEncoder compresses the buffer using gzip at the default level.
type GzipEncoder struct {
writer *gzip.Writer
buf *bytes.Buffer
}
func NewGzipEncoder() (*GzipEncoder, error) {
var buf bytes.Buffer
return &GzipEncoder{
writer: gzip.NewWriter(&buf),
buf: &buf,
}, nil
}
func (e *GzipEncoder) Encode(data []byte) ([]byte, error) {
e.buf.Reset()
e.writer.Reset(e.buf)
_, err := e.writer.Write(data)
if err != nil {
return nil, err
}
err = e.writer.Close()
if err != nil {
return nil, err
}
return e.buf.Bytes(), nil
}
// IdentityEncoder is a null encoder that applies no transformation.
type IdentityEncoder struct{}
func NewIdentityEncoder() *IdentityEncoder {
return &IdentityEncoder{}
}
func (*IdentityEncoder) Encode(data []byte) ([]byte, error) {
return data, nil
}
// ContentDecoder removes a wrapper encoding from byte buffers.
type ContentDecoder interface {
Decode([]byte) ([]byte, error)
}
// GzipDecoder decompresses buffers with gzip compression.
type GzipDecoder struct {
reader *gzip.Reader
buf *bytes.Buffer
}
func NewGzipDecoder() (*GzipDecoder, error) {
return &GzipDecoder{
reader: new(gzip.Reader),
buf: new(bytes.Buffer),
}, nil
}
func (d *GzipDecoder) Decode(data []byte) ([]byte, error) {
d.reader.Reset(bytes.NewBuffer(data))
d.buf.Reset()
_, err := d.buf.ReadFrom(d.reader)
if err != nil && err != io.EOF {
return nil, err
}
err = d.reader.Close()
if err != nil {
return nil, err
}
return d.buf.Bytes(), nil
}
// IdentityDecoder is a null decoder that returns the input.
type IdentityDecoder struct{}
func NewIdentityDecoder() *IdentityDecoder {
return &IdentityDecoder{}
}
func (*IdentityDecoder) Decode(data []byte) ([]byte, error) {
return data, nil
}

View File

@ -0,0 +1,58 @@
package internal
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestGzipEncodeDecode(t *testing.T) {
enc, err := NewGzipEncoder()
require.NoError(t, err)
dec, err := NewGzipDecoder()
require.NoError(t, err)
payload, err := enc.Encode([]byte("howdy"))
require.NoError(t, err)
actual, err := dec.Decode(payload)
require.NoError(t, err)
require.Equal(t, "howdy", string(actual))
}
func TestGzipReuse(t *testing.T) {
enc, err := NewGzipEncoder()
require.NoError(t, err)
dec, err := NewGzipDecoder()
require.NoError(t, err)
payload, err := enc.Encode([]byte("howdy"))
require.NoError(t, err)
actual, err := dec.Decode(payload)
require.NoError(t, err)
require.Equal(t, "howdy", string(actual))
payload, err = enc.Encode([]byte("doody"))
require.NoError(t, err)
actual, err = dec.Decode(payload)
require.NoError(t, err)
require.Equal(t, "doody", string(actual))
}
func TestIdentityEncodeDecode(t *testing.T) {
enc := NewIdentityEncoder()
dec := NewIdentityDecoder()
payload, err := enc.Encode([]byte("howdy"))
require.NoError(t, err)
actual, err := dec.Decode(payload)
require.NoError(t, err)
require.Equal(t, "howdy", string(actual))
}

View File

@ -77,6 +77,10 @@ The following defaults are known to work with RabbitMQ:
## Use TLS but skip chain & host verification ## Use TLS but skip chain & host verification
# insecure_skip_verify = false # insecure_skip_verify = false
## Content encoding for message payloads, can be set to "gzip" to or
## "identity" to apply no encoding.
# content_encoding = "identity"
## Data format to consume. ## Data format to consume.
## Each data format has its own unique set of configuration options, read ## Each data format has its own unique set of configuration options, read
## more about them here: ## more about them here:

View File

@ -11,6 +11,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
@ -52,12 +53,15 @@ type AMQPConsumer struct {
AuthMethod string AuthMethod string
tls.ClientConfig tls.ClientConfig
ContentEncoding string `toml:"content_encoding"`
deliveries map[telegraf.TrackingID]amqp.Delivery deliveries map[telegraf.TrackingID]amqp.Delivery
parser parsers.Parser parser parsers.Parser
conn *amqp.Connection conn *amqp.Connection
wg *sync.WaitGroup wg *sync.WaitGroup
cancel context.CancelFunc cancel context.CancelFunc
decoder internal.ContentDecoder
} }
type externalAuth struct{} type externalAuth struct{}
@ -147,6 +151,10 @@ func (a *AMQPConsumer) SampleConfig() string {
## Use TLS but skip chain & host verification ## Use TLS but skip chain & host verification
# insecure_skip_verify = false # insecure_skip_verify = false
## Content encoding for message payloads, can be set to "gzip" to or
## "identity" to apply no encoding.
# content_encoding = "identity"
## Data format to consume. ## Data format to consume.
## Each data format has its own unique set of configuration options, read ## Each data format has its own unique set of configuration options, read
## more about them here: ## more about them here:
@ -201,6 +209,11 @@ func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error {
return err return err
} }
a.decoder, err = internal.NewContentDecoder(a.ContentEncoding)
if err != nil {
return err
}
msgs, err := a.connect(amqpConf) msgs, err := a.connect(amqpConf)
if err != nil { if err != nil {
return err return err
@ -428,8 +441,7 @@ func (a *AMQPConsumer) process(ctx context.Context, msgs <-chan amqp.Delivery, a
} }
func (a *AMQPConsumer) onMessage(acc telegraf.TrackingAccumulator, d amqp.Delivery) error { func (a *AMQPConsumer) onMessage(acc telegraf.TrackingAccumulator, d amqp.Delivery) error {
metrics, err := a.parser.Parse(d.Body) onError := func() {
if err != nil {
// Discard the message from the queue; will never be able to process // Discard the message from the queue; will never be able to process
// this message. // this message.
rejErr := d.Ack(false) rejErr := d.Ack(false)
@ -438,6 +450,17 @@ func (a *AMQPConsumer) onMessage(acc telegraf.TrackingAccumulator, d amqp.Delive
d.DeliveryTag, rejErr) d.DeliveryTag, rejErr)
a.conn.Close() a.conn.Close()
} }
}
body, err := a.decoder.Decode(d.Body)
if err != nil {
onError()
return err
}
metrics, err := a.parser.Parse(body)
if err != nil {
onError()
return err return err
} }

View File

@ -92,6 +92,14 @@ For an introduction to AMQP see:
## Recommended to set to true. ## Recommended to set to true.
# use_batch_format = false # use_batch_format = false
## Content encoding for message payloads, can be set to "gzip" to or
## "identity" to apply no encoding.
##
## Please note that when use_batch_format = false each amqp message contains only
## a single metric, it is recommended to use compression with batch format
## for best results.
# content_encoding = "identity"
## Data format to output. ## Data format to output.
## Each data format has its own unique set of configuration options, read ## Each data format has its own unique set of configuration options, read
## more about them here: ## more about them here:

View File

@ -54,6 +54,7 @@ type AMQP struct {
Headers map[string]string `toml:"headers"` Headers map[string]string `toml:"headers"`
Timeout internal.Duration `toml:"timeout"` Timeout internal.Duration `toml:"timeout"`
UseBatchFormat bool `toml:"use_batch_format"` UseBatchFormat bool `toml:"use_batch_format"`
ContentEncoding string `toml:"content_encoding"`
tls.ClientConfig tls.ClientConfig
serializer serializers.Serializer serializer serializers.Serializer
@ -61,6 +62,7 @@ type AMQP struct {
client Client client Client
config *ClientConfig config *ClientConfig
sentMessages int sentMessages int
encoder internal.ContentEncoder
} }
type Client interface { type Client interface {
@ -149,6 +151,14 @@ var sampleConfig = `
## Recommended to set to true. ## Recommended to set to true.
# use_batch_format = false # use_batch_format = false
## Content encoding for message payloads, can be set to "gzip" to or
## "identity" to apply no encoding.
##
## Please note that when use_batch_format = false each amqp message contains only
## a single metric, it is recommended to use compression with batch format
## for best results.
# content_encoding = "identity"
## Data format to output. ## Data format to output.
## Each data format has its own unique set of configuration options, read ## Each data format has its own unique set of configuration options, read
## more about them here: ## more about them here:
@ -177,11 +187,16 @@ func (q *AMQP) Connect() error {
q.config = config q.config = config
} }
client, err := q.connect(q.config) var err error
q.encoder, err = internal.NewContentEncoder(q.ContentEncoding)
if err != nil {
return err
}
q.client, err = q.connect(q.config)
if err != nil { if err != nil {
return err return err
} }
q.client = client
return nil return nil
} }
@ -227,6 +242,11 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
return err return err
} }
body, err = q.encoder.Encode(body)
if err != nil {
return err
}
err = q.publish(key, body) err = q.publish(key, body)
if err != nil { if err != nil {
// If this is the first attempt to publish and the connection is // If this is the first attempt to publish and the connection is
@ -298,6 +318,7 @@ func (q *AMQP) makeClientConfig() (*ClientConfig, error) {
exchange: q.Exchange, exchange: q.Exchange,
exchangeType: q.ExchangeType, exchangeType: q.ExchangeType,
exchangePassive: q.ExchangePassive, exchangePassive: q.ExchangePassive,
encoding: q.ContentEncoding,
timeout: q.Timeout.Duration, timeout: q.Timeout.Duration,
} }

View File

@ -19,6 +19,7 @@ type ClientConfig struct {
exchangePassive bool exchangePassive bool
exchangeDurable bool exchangeDurable bool
exchangeArguments amqp.Table exchangeArguments amqp.Table
encoding string
headers amqp.Table headers amqp.Table
deliveryMode uint8 deliveryMode uint8
tlsConfig *tls.Config tlsConfig *tls.Config
@ -114,10 +115,11 @@ func (c *client) Publish(key string, body []byte) error {
false, // mandatory false, // mandatory
false, // immediate false, // immediate
amqp.Publishing{ amqp.Publishing{
Headers: c.config.headers, Headers: c.config.headers,
ContentType: "text/plain", ContentType: "text/plain",
Body: body, ContentEncoding: c.config.encoding,
DeliveryMode: c.config.deliveryMode, Body: body,
DeliveryMode: c.config.deliveryMode,
}) })
} }