Fix intermittent test cases in cloud_pubsub (#5271)
This commit is contained in:
parent
4b3580cceb
commit
e20ba1e2b6
|
@ -1,16 +1,17 @@
|
||||||
package cloud_pubsub
|
package cloud_pubsub
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"cloud.google.com/go/pubsub"
|
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"cloud.google.com/go/pubsub"
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers"
|
"github.com/influxdata/telegraf/plugins/serializers"
|
||||||
"golang.org/x/oauth2/google"
|
"golang.org/x/oauth2/google"
|
||||||
"google.golang.org/api/option"
|
"google.golang.org/api/option"
|
||||||
"sync"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const sampleConfig = `
|
const sampleConfig = `
|
||||||
|
@ -28,9 +29,9 @@ const sampleConfig = `
|
||||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||||
data_format = "influx"
|
data_format = "influx"
|
||||||
|
|
||||||
## Optional. Filepath for GCP credentials JSON file to authorize calls to
|
## Optional. Filepath for GCP credentials JSON file to authorize calls to
|
||||||
## PubSub APIs. If not set explicitly, Telegraf will attempt to use
|
## PubSub APIs. If not set explicitly, Telegraf will attempt to use
|
||||||
## Application Default Credentials, which is preferred.
|
## Application Default Credentials, which is preferred.
|
||||||
# credentials_file = "path/to/my/creds.json"
|
# credentials_file = "path/to/my/creds.json"
|
||||||
|
|
||||||
## Optional. If true, will send all metrics per write in one PubSub message.
|
## Optional. If true, will send all metrics per write in one PubSub message.
|
||||||
|
@ -55,7 +56,7 @@ const sampleConfig = `
|
||||||
|
|
||||||
## Optional. Specifies a timeout for requests to the PubSub API.
|
## Optional. Specifies a timeout for requests to the PubSub API.
|
||||||
# publish_timeout = "30s"
|
# publish_timeout = "30s"
|
||||||
|
|
||||||
## Optional. PubSub attributes to add to metrics.
|
## Optional. PubSub attributes to add to metrics.
|
||||||
# [[inputs.pubsub.attributes]]
|
# [[inputs.pubsub.attributes]]
|
||||||
# my_attr = "tag_value"
|
# my_attr = "tag_value"
|
||||||
|
|
|
@ -1,12 +1,13 @@
|
||||||
package cloud_pubsub
|
package cloud_pubsub
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
"cloud.google.com/go/pubsub"
|
"cloud.google.com/go/pubsub"
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers"
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"testing"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestPubSub_WriteSingle(t *testing.T) {
|
func TestPubSub_WriteSingle(t *testing.T) {
|
||||||
|
|
|
@ -1,18 +1,20 @@
|
||||||
package cloud_pubsub
|
package cloud_pubsub
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"cloud.google.com/go/pubsub"
|
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"runtime"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"cloud.google.com/go/pubsub"
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers"
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers"
|
"github.com/influxdata/telegraf/plugins/serializers"
|
||||||
"google.golang.org/api/support/bundler"
|
"google.golang.org/api/support/bundler"
|
||||||
"runtime"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -138,7 +140,7 @@ func (t *stubTopic) SetPublishSettings(settings pubsub.PublishSettings) {
|
||||||
|
|
||||||
func (t *stubTopic) initBundler() *stubTopic {
|
func (t *stubTopic) initBundler() *stubTopic {
|
||||||
t.bundler = bundler.NewBundler(&bundledMsg{}, t.sendBundle())
|
t.bundler = bundler.NewBundler(&bundledMsg{}, t.sendBundle())
|
||||||
t.bundler.DelayThreshold = t.Settings.DelayThreshold
|
t.bundler.DelayThreshold = 10 * time.Second
|
||||||
t.bundler.BundleCountThreshold = t.Settings.CountThreshold
|
t.bundler.BundleCountThreshold = t.Settings.CountThreshold
|
||||||
if t.bundler.BundleCountThreshold > pubsub.MaxPublishRequestCount {
|
if t.bundler.BundleCountThreshold > pubsub.MaxPublishRequestCount {
|
||||||
t.bundler.BundleCountThreshold = pubsub.MaxPublishRequestCount
|
t.bundler.BundleCountThreshold = pubsub.MaxPublishRequestCount
|
||||||
|
@ -159,14 +161,15 @@ func (t *stubTopic) sendBundle() func(items interface{}) {
|
||||||
|
|
||||||
for _, msg := range bundled {
|
for _, msg := range bundled {
|
||||||
r := msg.stubResult
|
r := msg.stubResult
|
||||||
|
for _, id := range r.metricIds {
|
||||||
|
t.published[id] = msg.Message
|
||||||
|
}
|
||||||
|
|
||||||
if r.sendError {
|
if r.sendError {
|
||||||
r.err <- errors.New(errMockFail)
|
r.err <- errors.New(errMockFail)
|
||||||
} else {
|
} else {
|
||||||
r.done <- struct{}{}
|
r.done <- struct{}{}
|
||||||
}
|
}
|
||||||
for _, id := range r.metricIds {
|
|
||||||
t.published[id] = msg.Message
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
t.bundleCount++
|
t.bundleCount++
|
||||||
|
|
Loading…
Reference in New Issue