diff --git a/plugins/inputs/cloud_pubsub/README.md b/plugins/inputs/cloud_pubsub/README.md index 6bf3fa29e..460cf4b82 100644 --- a/plugins/inputs/cloud_pubsub/README.md +++ b/plugins/inputs/cloud_pubsub/README.md @@ -75,6 +75,11 @@ and creates metrics using one of the supported [input data formats][]. ## 1. Note this setting does not limit the number of messages that can be ## processed concurrently (use "max_outstanding_messages" instead). # max_receiver_go_routines = 0 + + ## Optional. If true, Telegraf will attempt to base64 decode the + ## PubSub message data before parsing. Many GCP services that + ## output JSON to Google PubSub base64-encode the JSON payload. + # base64_data = false ``` ### Multiple Subscriptions and Topics diff --git a/plugins/inputs/cloud_pubsub/pubsub.go b/plugins/inputs/cloud_pubsub/pubsub.go index 9f7125126..845711e7d 100644 --- a/plugins/inputs/cloud_pubsub/pubsub.go +++ b/plugins/inputs/cloud_pubsub/pubsub.go @@ -6,6 +6,7 @@ import ( "sync" "cloud.google.com/go/pubsub" + "encoding/base64" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" @@ -40,6 +41,8 @@ type PubSub struct { MaxUndeliveredMessages int `toml:"max_undelivered_messages"` RetryReceiveDelaySeconds int `toml:"retry_delay_seconds"` + Base64Data bool `toml:"base64_data"` + sub subscription stubSub func() subscription @@ -169,7 +172,18 @@ func (ps *PubSub) onMessage(ctx context.Context, msg message) error { return fmt.Errorf("message longer than max_message_len (%d > %d)", len(msg.Data()), ps.MaxMessageLen) } - metrics, err := ps.parser.Parse(msg.Data()) + var data []byte + if ps.Base64Data { + strData, err := base64.StdEncoding.DecodeString(string(msg.Data())) + if err != nil { + return fmt.Errorf("unable to base64 decode message: %v", err) + } + data = []byte(strData) + } else { + data = msg.Data() + } + + metrics, err := ps.parser.Parse(data) if err != nil { msg.Ack() return err @@ -345,4 +359,8 @@ const sampleConfig = ` ## 1. Note this setting does not limit the number of messages that can be ## processed concurrently (use "max_outstanding_messages" instead). # max_receiver_go_routines = 0 + + ## Optional. If true, Telegraf will attempt to base64 decode the + ## PubSub message data before parsing + # base64_data = false ` diff --git a/plugins/inputs/cloud_pubsub/pubsub_test.go b/plugins/inputs/cloud_pubsub/pubsub_test.go index be6070d15..6233546aa 100644 --- a/plugins/inputs/cloud_pubsub/pubsub_test.go +++ b/plugins/inputs/cloud_pubsub/pubsub_test.go @@ -1,6 +1,7 @@ package cloud_pubsub import ( + "encoding/base64" "errors" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" @@ -55,6 +56,50 @@ func TestRunParse(t *testing.T) { validateTestInfluxMetric(t, metric) } +// Test ingesting InfluxDB-format PubSub message +func TestRunBase64(t *testing.T) { + subId := "sub-run-base64" + + testParser, _ := parsers.NewInfluxParser() + + sub := &stubSub{ + id: subId, + messages: make(chan *testMsg, 100), + } + sub.receiver = testMessagesReceive(sub) + + ps := &PubSub{ + parser: testParser, + stubSub: func() subscription { return sub }, + Project: "projectIDontMatterForTests", + Subscription: subId, + MaxUndeliveredMessages: defaultMaxUndeliveredMessages, + Base64Data: true, + } + + acc := &testutil.Accumulator{} + if err := ps.Start(acc); err != nil { + t.Fatalf("test PubSub failed to start: %s", err) + } + defer ps.Stop() + + if ps.sub == nil { + t.Fatal("expected plugin subscription to be non-nil") + } + + testTracker := &testTracker{} + msg := &testMsg{ + value: base64.StdEncoding.EncodeToString([]byte(msgInflux)), + tracker: testTracker, + } + sub.messages <- msg + + acc.Wait(1) + assert.Equal(t, acc.NFields(), 1) + metric := acc.Metrics[0] + validateTestInfluxMetric(t, metric) +} + func TestRunInvalidMessages(t *testing.T) { subId := "sub-invalid-messages" diff --git a/plugins/outputs/cloud_pubsub/README.md b/plugins/outputs/cloud_pubsub/README.md index 873f3c9b3..3a4088b61 100644 --- a/plugins/outputs/cloud_pubsub/README.md +++ b/plugins/outputs/cloud_pubsub/README.md @@ -52,6 +52,9 @@ generate it using `telegraf --usage cloud_pubsub`. ## Optional. Specifies a timeout for requests to the PubSub API. # publish_timeout = "30s" + ## Optional. If true, published PubSub message data will be base64-encoded. + # base64_data = false + ## Optional. PubSub attributes to add to metrics. # [[inputs.pubsub.attributes]] # my_attr = "tag_value" diff --git a/plugins/outputs/cloud_pubsub/pubsub.go b/plugins/outputs/cloud_pubsub/pubsub.go index ee1611d3f..c8fbf242d 100644 --- a/plugins/outputs/cloud_pubsub/pubsub.go +++ b/plugins/outputs/cloud_pubsub/pubsub.go @@ -6,6 +6,7 @@ import ( "sync" "cloud.google.com/go/pubsub" + "encoding/base64" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" @@ -56,6 +57,9 @@ const sampleConfig = ` ## Optional. Specifies a timeout for requests to the PubSub API. # publish_timeout = "30s" + ## Optional. If true, published PubSub message data will be base64-encoded. + # base64_data = false + ## Optional. PubSub attributes to add to metrics. # [[inputs.pubsub.attributes]] # my_attr = "tag_value" @@ -72,6 +76,7 @@ type PubSub struct { PublishByteThreshold int `toml:"publish_byte_threshold"` PublishNumGoroutines int `toml:"publish_num_go_routines"` PublishTimeout internal.Duration `toml:"publish_timeout"` + Base64Data bool `toml:"base64_data"` t topic c *pubsub.Client @@ -207,6 +212,12 @@ func (ps *PubSub) toMessages(metrics []telegraf.Metric) ([]*pubsub.Message, erro if err != nil { return nil, err } + + if ps.Base64Data { + encoded := base64.StdEncoding.EncodeToString(b) + b = []byte(encoded) + } + msg := &pubsub.Message{Data: b} if ps.Attributes != nil { msg.Attributes = ps.Attributes @@ -220,6 +231,12 @@ func (ps *PubSub) toMessages(metrics []telegraf.Metric) ([]*pubsub.Message, erro if err != nil { return nil, err } + + if ps.Base64Data { + encoded := base64.StdEncoding.EncodeToString(b) + b = []byte(encoded) + } + msgs[i] = &pubsub.Message{ Data: b, } diff --git a/plugins/outputs/cloud_pubsub/pubsub_test.go b/plugins/outputs/cloud_pubsub/pubsub_test.go index eb993b37c..76eb518d7 100644 --- a/plugins/outputs/cloud_pubsub/pubsub_test.go +++ b/plugins/outputs/cloud_pubsub/pubsub_test.go @@ -4,6 +4,7 @@ import ( "testing" "cloud.google.com/go/pubsub" + "encoding/base64" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" @@ -26,7 +27,7 @@ func TestPubSub_WriteSingle(t *testing.T) { } for _, testM := range testMetrics { - verifyMetricPublished(t, testM.m, topic.published) + verifyRawMetricPublished(t, testM.m, topic.published) } } @@ -48,7 +49,7 @@ func TestPubSub_WriteWithAttribute(t *testing.T) { } for _, testM := range testMetrics { - msg := verifyMetricPublished(t, testM.m, topic.published) + msg := verifyRawMetricPublished(t, testM.m, topic.published) assert.Equalf(t, "bar1", msg.Attributes["foo1"], "expected attribute foo1=bar1") assert.Equalf(t, "bar2", msg.Attributes["foo2"], "expected attribute foo2=bar2") } @@ -70,7 +71,7 @@ func TestPubSub_WriteMultiple(t *testing.T) { } for _, testM := range testMetrics { - verifyMetricPublished(t, testM.m, topic.published) + verifyRawMetricPublished(t, testM.m, topic.published) } assert.Equalf(t, 1, topic.bundleCount, "unexpected bundle count") } @@ -94,7 +95,7 @@ func TestPubSub_WriteOverCountThreshold(t *testing.T) { } for _, testM := range testMetrics { - verifyMetricPublished(t, testM.m, topic.published) + verifyRawMetricPublished(t, testM.m, topic.published) } assert.Equalf(t, 2, topic.bundleCount, "unexpected bundle count") } @@ -117,11 +118,33 @@ func TestPubSub_WriteOverByteThreshold(t *testing.T) { } for _, testM := range testMetrics { - verifyMetricPublished(t, testM.m, topic.published) + verifyRawMetricPublished(t, testM.m, topic.published) } assert.Equalf(t, 2, topic.bundleCount, "unexpected bundle count") } +func TestPubSub_WriteBase64Single(t *testing.T) { + + testMetrics := []testMetric{ + {testutil.TestMetric("value_1", "test"), false /*return error */}, + {testutil.TestMetric("value_2", "test"), false}, + } + + settings := pubsub.DefaultPublishSettings + settings.CountThreshold = 1 + ps, topic, metrics := getTestResources(t, settings, testMetrics) + ps.Base64Data = true + + err := ps.Write(metrics) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + + for _, testM := range testMetrics { + verifyMetricPublished(t, testM.m, topic.published, true /* base64encoded */) + } +} + func TestPubSub_Error(t *testing.T) { testMetrics := []testMetric{ // Force this batch to return error @@ -141,7 +164,11 @@ func TestPubSub_Error(t *testing.T) { } } -func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string]*pubsub.Message) *pubsub.Message { +func verifyRawMetricPublished(t *testing.T, m telegraf.Metric, published map[string]*pubsub.Message) *pubsub.Message { + return verifyMetricPublished(t, m, published, false) +} + +func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string]*pubsub.Message, base64Encoded bool) *pubsub.Message { p, _ := parsers.NewInfluxParser() v, _ := m.GetField("value") @@ -150,7 +177,16 @@ func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string t.Fatalf("expected metric to get published (value: %s)", v.(string)) } - parsed, err := p.Parse(psMsg.Data) + data := psMsg.Data + if base64Encoded { + v, err := base64.StdEncoding.DecodeString(string(psMsg.Data)) + if err != nil { + t.Fatalf("Unable to decode expected base64-encoded message: %s", err) + } + data = []byte(v) + } + + parsed, err := p.Parse(data) if err != nil { t.Fatalf("could not parse influxdb metric from published message: %s", string(psMsg.Data)) } diff --git a/plugins/outputs/cloud_pubsub/topic_stubbed.go b/plugins/outputs/cloud_pubsub/topic_stubbed.go index 55f2e5a0a..d78d4fbd4 100644 --- a/plugins/outputs/cloud_pubsub/topic_stubbed.go +++ b/plugins/outputs/cloud_pubsub/topic_stubbed.go @@ -10,6 +10,7 @@ import ( "time" "cloud.google.com/go/pubsub" + "encoding/base64" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/parsers" @@ -180,8 +181,17 @@ func (t *stubTopic) parseIDs(msg *pubsub.Message) []string { p, _ := parsers.NewInfluxParser() metrics, err := p.Parse(msg.Data) if err != nil { - t.Fatalf("unexpected parsing error: %v", err) + // Just attempt to base64-decode first before returning error. + d, err := base64.StdEncoding.DecodeString(string(msg.Data)) + if err != nil { + t.Errorf("unable to base64-decode potential test message: %v", err) + } + metrics, err = p.Parse(d) + if err != nil { + t.Fatalf("unexpected parsing error: %v", err) + } } + ids := make([]string, len(metrics)) for i, met := range metrics { id, _ := met.GetField("value")