diff --git a/plugins/outputs/cloud_pubsub/pubsub.go b/plugins/outputs/cloud_pubsub/pubsub.go index 9811af1d7..bc81bf580 100644 --- a/plugins/outputs/cloud_pubsub/pubsub.go +++ b/plugins/outputs/cloud_pubsub/pubsub.go @@ -1,16 +1,17 @@ package cloud_pubsub import ( - "cloud.google.com/go/pubsub" "context" "fmt" + "sync" + + "cloud.google.com/go/pubsub" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers" "golang.org/x/oauth2/google" "google.golang.org/api/option" - "sync" ) const sampleConfig = ` @@ -28,9 +29,9 @@ const sampleConfig = ` ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "influx" - ## Optional. Filepath for GCP credentials JSON file to authorize calls to - ## PubSub APIs. If not set explicitly, Telegraf will attempt to use - ## Application Default Credentials, which is preferred. + ## Optional. Filepath for GCP credentials JSON file to authorize calls to + ## PubSub APIs. If not set explicitly, Telegraf will attempt to use + ## Application Default Credentials, which is preferred. # credentials_file = "path/to/my/creds.json" ## Optional. If true, will send all metrics per write in one PubSub message. @@ -55,7 +56,7 @@ const sampleConfig = ` ## Optional. Specifies a timeout for requests to the PubSub API. # publish_timeout = "30s" - + ## Optional. PubSub attributes to add to metrics. # [[inputs.pubsub.attributes]] # my_attr = "tag_value" diff --git a/plugins/outputs/cloud_pubsub/pubsub_test.go b/plugins/outputs/cloud_pubsub/pubsub_test.go index a60f05eb0..eb993b37c 100644 --- a/plugins/outputs/cloud_pubsub/pubsub_test.go +++ b/plugins/outputs/cloud_pubsub/pubsub_test.go @@ -1,12 +1,13 @@ package cloud_pubsub import ( + "testing" + "cloud.google.com/go/pubsub" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" - "testing" ) func TestPubSub_WriteSingle(t *testing.T) { diff --git a/plugins/outputs/cloud_pubsub/topic_stubbed.go b/plugins/outputs/cloud_pubsub/topic_stubbed.go index fdae70bc3..55f2e5a0a 100644 --- a/plugins/outputs/cloud_pubsub/topic_stubbed.go +++ b/plugins/outputs/cloud_pubsub/topic_stubbed.go @@ -1,18 +1,20 @@ package cloud_pubsub import ( - "cloud.google.com/go/pubsub" "context" "errors" "fmt" + "runtime" + "sync" + "testing" + "time" + + "cloud.google.com/go/pubsub" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/serializers" "google.golang.org/api/support/bundler" - "runtime" - "sync" - "testing" ) const ( @@ -138,7 +140,7 @@ func (t *stubTopic) SetPublishSettings(settings pubsub.PublishSettings) { func (t *stubTopic) initBundler() *stubTopic { t.bundler = bundler.NewBundler(&bundledMsg{}, t.sendBundle()) - t.bundler.DelayThreshold = t.Settings.DelayThreshold + t.bundler.DelayThreshold = 10 * time.Second t.bundler.BundleCountThreshold = t.Settings.CountThreshold if t.bundler.BundleCountThreshold > pubsub.MaxPublishRequestCount { t.bundler.BundleCountThreshold = pubsub.MaxPublishRequestCount @@ -159,14 +161,15 @@ func (t *stubTopic) sendBundle() func(items interface{}) { for _, msg := range bundled { r := msg.stubResult + for _, id := range r.metricIds { + t.published[id] = msg.Message + } + if r.sendError { r.err <- errors.New(errMockFail) } else { r.done <- struct{}{} } - for _, id := range r.metricIds { - t.published[id] = msg.Message - } } t.bundleCount++