Add Base64-encoding/decoding for Google Cloud PubSub plugins (#5543)

This commit is contained in:
emily 2019-03-06 17:34:47 -08:00 committed by Daniel Nelson
parent 51cc0fe6d8
commit dd67144660
7 changed files with 143 additions and 9 deletions

View File

@ -75,6 +75,11 @@ and creates metrics using one of the supported [input data formats][].
## 1. Note this setting does not limit the number of messages that can be ## 1. Note this setting does not limit the number of messages that can be
## processed concurrently (use "max_outstanding_messages" instead). ## processed concurrently (use "max_outstanding_messages" instead).
# max_receiver_go_routines = 0 # max_receiver_go_routines = 0
## Optional. If true, Telegraf will attempt to base64 decode the
## PubSub message data before parsing. Many GCP services that
## output JSON to Google PubSub base64-encode the JSON payload.
# base64_data = false
``` ```
### Multiple Subscriptions and Topics ### Multiple Subscriptions and Topics

View File

@ -6,6 +6,7 @@ import (
"sync" "sync"
"cloud.google.com/go/pubsub" "cloud.google.com/go/pubsub"
"encoding/base64"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
@ -40,6 +41,8 @@ type PubSub struct {
MaxUndeliveredMessages int `toml:"max_undelivered_messages"` MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
RetryReceiveDelaySeconds int `toml:"retry_delay_seconds"` RetryReceiveDelaySeconds int `toml:"retry_delay_seconds"`
Base64Data bool `toml:"base64_data"`
sub subscription sub subscription
stubSub func() subscription stubSub func() subscription
@ -169,7 +172,18 @@ func (ps *PubSub) onMessage(ctx context.Context, msg message) error {
return fmt.Errorf("message longer than max_message_len (%d > %d)", len(msg.Data()), ps.MaxMessageLen) return fmt.Errorf("message longer than max_message_len (%d > %d)", len(msg.Data()), ps.MaxMessageLen)
} }
metrics, err := ps.parser.Parse(msg.Data()) var data []byte
if ps.Base64Data {
strData, err := base64.StdEncoding.DecodeString(string(msg.Data()))
if err != nil {
return fmt.Errorf("unable to base64 decode message: %v", err)
}
data = []byte(strData)
} else {
data = msg.Data()
}
metrics, err := ps.parser.Parse(data)
if err != nil { if err != nil {
msg.Ack() msg.Ack()
return err return err
@ -345,4 +359,8 @@ const sampleConfig = `
## 1. Note this setting does not limit the number of messages that can be ## 1. Note this setting does not limit the number of messages that can be
## processed concurrently (use "max_outstanding_messages" instead). ## processed concurrently (use "max_outstanding_messages" instead).
# max_receiver_go_routines = 0 # max_receiver_go_routines = 0
## Optional. If true, Telegraf will attempt to base64 decode the
## PubSub message data before parsing
# base64_data = false
` `

View File

@ -1,6 +1,7 @@
package cloud_pubsub package cloud_pubsub
import ( import (
"encoding/base64"
"errors" "errors"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
@ -55,6 +56,50 @@ func TestRunParse(t *testing.T) {
validateTestInfluxMetric(t, metric) validateTestInfluxMetric(t, metric)
} }
// Test ingesting InfluxDB-format PubSub message
func TestRunBase64(t *testing.T) {
subId := "sub-run-base64"
testParser, _ := parsers.NewInfluxParser()
sub := &stubSub{
id: subId,
messages: make(chan *testMsg, 100),
}
sub.receiver = testMessagesReceive(sub)
ps := &PubSub{
parser: testParser,
stubSub: func() subscription { return sub },
Project: "projectIDontMatterForTests",
Subscription: subId,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
Base64Data: true,
}
acc := &testutil.Accumulator{}
if err := ps.Start(acc); err != nil {
t.Fatalf("test PubSub failed to start: %s", err)
}
defer ps.Stop()
if ps.sub == nil {
t.Fatal("expected plugin subscription to be non-nil")
}
testTracker := &testTracker{}
msg := &testMsg{
value: base64.StdEncoding.EncodeToString([]byte(msgInflux)),
tracker: testTracker,
}
sub.messages <- msg
acc.Wait(1)
assert.Equal(t, acc.NFields(), 1)
metric := acc.Metrics[0]
validateTestInfluxMetric(t, metric)
}
func TestRunInvalidMessages(t *testing.T) { func TestRunInvalidMessages(t *testing.T) {
subId := "sub-invalid-messages" subId := "sub-invalid-messages"

View File

@ -52,6 +52,9 @@ generate it using `telegraf --usage cloud_pubsub`.
## Optional. Specifies a timeout for requests to the PubSub API. ## Optional. Specifies a timeout for requests to the PubSub API.
# publish_timeout = "30s" # publish_timeout = "30s"
## Optional. If true, published PubSub message data will be base64-encoded.
# base64_data = false
## Optional. PubSub attributes to add to metrics. ## Optional. PubSub attributes to add to metrics.
# [[inputs.pubsub.attributes]] # [[inputs.pubsub.attributes]]
# my_attr = "tag_value" # my_attr = "tag_value"

View File

@ -6,6 +6,7 @@ import (
"sync" "sync"
"cloud.google.com/go/pubsub" "cloud.google.com/go/pubsub"
"encoding/base64"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
@ -56,6 +57,9 @@ const sampleConfig = `
## Optional. Specifies a timeout for requests to the PubSub API. ## Optional. Specifies a timeout for requests to the PubSub API.
# publish_timeout = "30s" # publish_timeout = "30s"
## Optional. If true, published PubSub message data will be base64-encoded.
# base64_data = false
## Optional. PubSub attributes to add to metrics. ## Optional. PubSub attributes to add to metrics.
# [[inputs.pubsub.attributes]] # [[inputs.pubsub.attributes]]
# my_attr = "tag_value" # my_attr = "tag_value"
@ -72,6 +76,7 @@ type PubSub struct {
PublishByteThreshold int `toml:"publish_byte_threshold"` PublishByteThreshold int `toml:"publish_byte_threshold"`
PublishNumGoroutines int `toml:"publish_num_go_routines"` PublishNumGoroutines int `toml:"publish_num_go_routines"`
PublishTimeout internal.Duration `toml:"publish_timeout"` PublishTimeout internal.Duration `toml:"publish_timeout"`
Base64Data bool `toml:"base64_data"`
t topic t topic
c *pubsub.Client c *pubsub.Client
@ -207,6 +212,12 @@ func (ps *PubSub) toMessages(metrics []telegraf.Metric) ([]*pubsub.Message, erro
if err != nil { if err != nil {
return nil, err return nil, err
} }
if ps.Base64Data {
encoded := base64.StdEncoding.EncodeToString(b)
b = []byte(encoded)
}
msg := &pubsub.Message{Data: b} msg := &pubsub.Message{Data: b}
if ps.Attributes != nil { if ps.Attributes != nil {
msg.Attributes = ps.Attributes msg.Attributes = ps.Attributes
@ -220,6 +231,12 @@ func (ps *PubSub) toMessages(metrics []telegraf.Metric) ([]*pubsub.Message, erro
if err != nil { if err != nil {
return nil, err return nil, err
} }
if ps.Base64Data {
encoded := base64.StdEncoding.EncodeToString(b)
b = []byte(encoded)
}
msgs[i] = &pubsub.Message{ msgs[i] = &pubsub.Message{
Data: b, Data: b,
} }

View File

@ -4,6 +4,7 @@ import (
"testing" "testing"
"cloud.google.com/go/pubsub" "cloud.google.com/go/pubsub"
"encoding/base64"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
@ -26,7 +27,7 @@ func TestPubSub_WriteSingle(t *testing.T) {
} }
for _, testM := range testMetrics { for _, testM := range testMetrics {
verifyMetricPublished(t, testM.m, topic.published) verifyRawMetricPublished(t, testM.m, topic.published)
} }
} }
@ -48,7 +49,7 @@ func TestPubSub_WriteWithAttribute(t *testing.T) {
} }
for _, testM := range testMetrics { for _, testM := range testMetrics {
msg := verifyMetricPublished(t, testM.m, topic.published) msg := verifyRawMetricPublished(t, testM.m, topic.published)
assert.Equalf(t, "bar1", msg.Attributes["foo1"], "expected attribute foo1=bar1") assert.Equalf(t, "bar1", msg.Attributes["foo1"], "expected attribute foo1=bar1")
assert.Equalf(t, "bar2", msg.Attributes["foo2"], "expected attribute foo2=bar2") assert.Equalf(t, "bar2", msg.Attributes["foo2"], "expected attribute foo2=bar2")
} }
@ -70,7 +71,7 @@ func TestPubSub_WriteMultiple(t *testing.T) {
} }
for _, testM := range testMetrics { for _, testM := range testMetrics {
verifyMetricPublished(t, testM.m, topic.published) verifyRawMetricPublished(t, testM.m, topic.published)
} }
assert.Equalf(t, 1, topic.bundleCount, "unexpected bundle count") assert.Equalf(t, 1, topic.bundleCount, "unexpected bundle count")
} }
@ -94,7 +95,7 @@ func TestPubSub_WriteOverCountThreshold(t *testing.T) {
} }
for _, testM := range testMetrics { for _, testM := range testMetrics {
verifyMetricPublished(t, testM.m, topic.published) verifyRawMetricPublished(t, testM.m, topic.published)
} }
assert.Equalf(t, 2, topic.bundleCount, "unexpected bundle count") assert.Equalf(t, 2, topic.bundleCount, "unexpected bundle count")
} }
@ -117,11 +118,33 @@ func TestPubSub_WriteOverByteThreshold(t *testing.T) {
} }
for _, testM := range testMetrics { for _, testM := range testMetrics {
verifyMetricPublished(t, testM.m, topic.published) verifyRawMetricPublished(t, testM.m, topic.published)
} }
assert.Equalf(t, 2, topic.bundleCount, "unexpected bundle count") assert.Equalf(t, 2, topic.bundleCount, "unexpected bundle count")
} }
func TestPubSub_WriteBase64Single(t *testing.T) {
testMetrics := []testMetric{
{testutil.TestMetric("value_1", "test"), false /*return error */},
{testutil.TestMetric("value_2", "test"), false},
}
settings := pubsub.DefaultPublishSettings
settings.CountThreshold = 1
ps, topic, metrics := getTestResources(t, settings, testMetrics)
ps.Base64Data = true
err := ps.Write(metrics)
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
for _, testM := range testMetrics {
verifyMetricPublished(t, testM.m, topic.published, true /* base64encoded */)
}
}
func TestPubSub_Error(t *testing.T) { func TestPubSub_Error(t *testing.T) {
testMetrics := []testMetric{ testMetrics := []testMetric{
// Force this batch to return error // Force this batch to return error
@ -141,7 +164,11 @@ func TestPubSub_Error(t *testing.T) {
} }
} }
func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string]*pubsub.Message) *pubsub.Message { func verifyRawMetricPublished(t *testing.T, m telegraf.Metric, published map[string]*pubsub.Message) *pubsub.Message {
return verifyMetricPublished(t, m, published, false)
}
func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string]*pubsub.Message, base64Encoded bool) *pubsub.Message {
p, _ := parsers.NewInfluxParser() p, _ := parsers.NewInfluxParser()
v, _ := m.GetField("value") v, _ := m.GetField("value")
@ -150,7 +177,16 @@ func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string
t.Fatalf("expected metric to get published (value: %s)", v.(string)) t.Fatalf("expected metric to get published (value: %s)", v.(string))
} }
parsed, err := p.Parse(psMsg.Data) data := psMsg.Data
if base64Encoded {
v, err := base64.StdEncoding.DecodeString(string(psMsg.Data))
if err != nil {
t.Fatalf("Unable to decode expected base64-encoded message: %s", err)
}
data = []byte(v)
}
parsed, err := p.Parse(data)
if err != nil { if err != nil {
t.Fatalf("could not parse influxdb metric from published message: %s", string(psMsg.Data)) t.Fatalf("could not parse influxdb metric from published message: %s", string(psMsg.Data))
} }

View File

@ -10,6 +10,7 @@ import (
"time" "time"
"cloud.google.com/go/pubsub" "cloud.google.com/go/pubsub"
"encoding/base64"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
@ -179,9 +180,18 @@ func (t *stubTopic) sendBundle() func(items interface{}) {
func (t *stubTopic) parseIDs(msg *pubsub.Message) []string { func (t *stubTopic) parseIDs(msg *pubsub.Message) []string {
p, _ := parsers.NewInfluxParser() p, _ := parsers.NewInfluxParser()
metrics, err := p.Parse(msg.Data) metrics, err := p.Parse(msg.Data)
if err != nil {
// Just attempt to base64-decode first before returning error.
d, err := base64.StdEncoding.DecodeString(string(msg.Data))
if err != nil {
t.Errorf("unable to base64-decode potential test message: %v", err)
}
metrics, err = p.Parse(d)
if err != nil { if err != nil {
t.Fatalf("unexpected parsing error: %v", err) t.Fatalf("unexpected parsing error: %v", err)
} }
}
ids := make([]string, len(metrics)) ids := make([]string, len(metrics))
for i, met := range metrics { for i, met := range metrics {
id, _ := met.GetField("value") id, _ := met.GetField("value")