telegraf/plugins/outputs/cloud_pubsub/topic_stubbed.go

200 lines
4.2 KiB
Go

package cloud_pubsub
import (
"cloud.google.com/go/pubsub"
"context"
"errors"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/serializers"
"google.golang.org/api/support/bundler"
"runtime"
"sync"
"testing"
)
const (
errMockFail = "this is an error"
)
type (
testMetric struct {
m telegraf.Metric
returnErr bool
}
bundledMsg struct {
*pubsub.Message
*stubResult
}
stubResult struct {
metricIds []string
sendError bool
err chan error
done chan struct{}
}
stubTopic struct {
Settings pubsub.PublishSettings
ReturnErr map[string]bool
parsers.Parser
*testing.T
stopped bool
pLock sync.Mutex
published map[string]*pubsub.Message
bundler *bundler.Bundler
bLock sync.Mutex
bundleCount int
}
)
func getTestResources(tT *testing.T, settings pubsub.PublishSettings, testM []testMetric) (*PubSub, *stubTopic, []telegraf.Metric) {
s, _ := serializers.NewInfluxSerializer()
metrics := make([]telegraf.Metric, len(testM))
t := &stubTopic{
T: tT,
ReturnErr: make(map[string]bool),
published: make(map[string]*pubsub.Message),
}
for i, tm := range testM {
metrics[i] = tm.m
if tm.returnErr {
v, _ := tm.m.GetField("value")
t.ReturnErr[v.(string)] = true
}
}
ps := &PubSub{
Project: "test-project",
Topic: "test-topic",
stubTopic: func(string) topic { return t },
PublishCountThreshold: settings.CountThreshold,
PublishByteThreshold: settings.ByteThreshold,
PublishNumGoroutines: settings.NumGoroutines,
PublishTimeout: internal.Duration{Duration: settings.Timeout},
}
ps.SetSerializer(s)
return ps, t, metrics
}
func (t *stubTopic) ID() string {
return "test-topic"
}
func (t *stubTopic) Stop() {
t.pLock.Lock()
defer t.pLock.Unlock()
t.stopped = true
t.bundler.Flush()
}
func (t *stubTopic) Publish(ctx context.Context, msg *pubsub.Message) publishResult {
t.pLock.Lock()
defer t.pLock.Unlock()
if t.stopped || ctx.Err() != nil {
t.Fatalf("publish called after stop")
}
ids := t.parseIDs(msg)
r := &stubResult{
metricIds: ids,
err: make(chan error, 1),
done: make(chan struct{}, 1),
}
for _, id := range ids {
_, ok := t.ReturnErr[id]
r.sendError = r.sendError || ok
}
bundled := &bundledMsg{msg, r}
err := t.bundler.Add(bundled, len(msg.Data))
if err != nil {
t.Fatalf("unexpected error while adding to bundle: %v", err)
}
return r
}
func (t *stubTopic) PublishSettings() pubsub.PublishSettings {
return t.Settings
}
func (t *stubTopic) SetPublishSettings(settings pubsub.PublishSettings) {
t.Settings = settings
t.initBundler()
}
func (t *stubTopic) initBundler() *stubTopic {
t.bundler = bundler.NewBundler(&bundledMsg{}, t.sendBundle())
t.bundler.DelayThreshold = t.Settings.DelayThreshold
t.bundler.BundleCountThreshold = t.Settings.CountThreshold
if t.bundler.BundleCountThreshold > pubsub.MaxPublishRequestCount {
t.bundler.BundleCountThreshold = pubsub.MaxPublishRequestCount
}
t.bundler.BundleByteThreshold = t.Settings.ByteThreshold
t.bundler.BundleByteLimit = pubsub.MaxPublishRequestBytes
t.bundler.HandlerLimit = 25 * runtime.GOMAXPROCS(0)
return t
}
func (t *stubTopic) sendBundle() func(items interface{}) {
return func(items interface{}) {
t.bLock.Lock()
defer t.bLock.Unlock()
bundled := items.([]*bundledMsg)
for _, msg := range bundled {
r := msg.stubResult
if r.sendError {
r.err <- errors.New(errMockFail)
} else {
r.done <- struct{}{}
}
for _, id := range r.metricIds {
t.published[id] = msg.Message
}
}
t.bundleCount++
}
}
func (t *stubTopic) parseIDs(msg *pubsub.Message) []string {
p, _ := parsers.NewInfluxParser()
metrics, err := p.Parse(msg.Data)
if err != nil {
t.Fatalf("unexpected parsing error: %v", err)
}
ids := make([]string, len(metrics))
for i, met := range metrics {
id, _ := met.GetField("value")
ids[i] = id.(string)
}
return ids
}
func (r *stubResult) Get(ctx context.Context) (string, error) {
select {
case <-ctx.Done():
return "", ctx.Err()
case err := <-r.err:
return "", err
case <-r.done:
return fmt.Sprintf("id-%s", r.metricIds[0]), nil
}
}