telegraf/plugins/inputs/cloud_pubsub/pubsub_test.go

240 lines
5.7 KiB
Go

package cloud_pubsub
import (
"encoding/base64"
"errors"
"testing"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
const (
msgInflux = "cpu_load_short,host=server01 value=23422.0 1422568543702900257\n"
)
// Test ingesting InfluxDB-format PubSub message
func TestRunParse(t *testing.T) {
subId := "sub-run-parse"
testParser, _ := parsers.NewInfluxParser()
sub := &stubSub{
id: subId,
messages: make(chan *testMsg, 100),
}
sub.receiver = testMessagesReceive(sub)
ps := &PubSub{
Log: testutil.Logger{},
parser: testParser,
stubSub: func() subscription { return sub },
Project: "projectIDontMatterForTests",
Subscription: subId,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
}
acc := &testutil.Accumulator{}
if err := ps.Start(acc); err != nil {
t.Fatalf("test PubSub failed to start: %s", err)
}
defer ps.Stop()
if ps.sub == nil {
t.Fatal("expected plugin subscription to be non-nil")
}
testTracker := &testTracker{}
msg := &testMsg{
value: msgInflux,
tracker: testTracker,
}
sub.messages <- msg
acc.Wait(1)
assert.Equal(t, acc.NFields(), 1)
metric := acc.Metrics[0]
validateTestInfluxMetric(t, metric)
}
// Test ingesting InfluxDB-format PubSub message
func TestRunBase64(t *testing.T) {
subId := "sub-run-base64"
testParser, _ := parsers.NewInfluxParser()
sub := &stubSub{
id: subId,
messages: make(chan *testMsg, 100),
}
sub.receiver = testMessagesReceive(sub)
ps := &PubSub{
Log: testutil.Logger{},
parser: testParser,
stubSub: func() subscription { return sub },
Project: "projectIDontMatterForTests",
Subscription: subId,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
Base64Data: true,
}
acc := &testutil.Accumulator{}
if err := ps.Start(acc); err != nil {
t.Fatalf("test PubSub failed to start: %s", err)
}
defer ps.Stop()
if ps.sub == nil {
t.Fatal("expected plugin subscription to be non-nil")
}
testTracker := &testTracker{}
msg := &testMsg{
value: base64.StdEncoding.EncodeToString([]byte(msgInflux)),
tracker: testTracker,
}
sub.messages <- msg
acc.Wait(1)
assert.Equal(t, acc.NFields(), 1)
metric := acc.Metrics[0]
validateTestInfluxMetric(t, metric)
}
func TestRunInvalidMessages(t *testing.T) {
subId := "sub-invalid-messages"
testParser, _ := parsers.NewInfluxParser()
sub := &stubSub{
id: subId,
messages: make(chan *testMsg, 100),
}
sub.receiver = testMessagesReceive(sub)
ps := &PubSub{
Log: testutil.Logger{},
parser: testParser,
stubSub: func() subscription { return sub },
Project: "projectIDontMatterForTests",
Subscription: subId,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
}
acc := &testutil.Accumulator{}
if err := ps.Start(acc); err != nil {
t.Fatalf("test PubSub failed to start: %s", err)
}
defer ps.Stop()
if ps.sub == nil {
t.Fatal("expected plugin subscription to be non-nil")
}
testTracker := &testTracker{}
msg := &testMsg{
value: "~invalidInfluxMsg~",
tracker: testTracker,
}
sub.messages <- msg
acc.WaitError(1)
// Make sure we acknowledged message so we don't receive it again.
testTracker.WaitForAck(1)
assert.Equal(t, acc.NFields(), 0)
}
func TestRunOverlongMessages(t *testing.T) {
subId := "sub-message-too-long"
acc := &testutil.Accumulator{}
testParser, _ := parsers.NewInfluxParser()
sub := &stubSub{
id: subId,
messages: make(chan *testMsg, 100),
}
sub.receiver = testMessagesReceive(sub)
ps := &PubSub{
Log: testutil.Logger{},
parser: testParser,
stubSub: func() subscription { return sub },
Project: "projectIDontMatterForTests",
Subscription: subId,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
// Add MaxMessageLen Param
MaxMessageLen: 1,
}
if err := ps.Start(acc); err != nil {
t.Fatalf("test PubSub failed to start: %s", err)
}
defer ps.Stop()
if ps.sub == nil {
t.Fatal("expected plugin subscription to be non-nil")
}
testTracker := &testTracker{}
msg := &testMsg{
value: msgInflux,
tracker: testTracker,
}
sub.messages <- msg
acc.WaitError(1)
// Make sure we acknowledged message so we don't receive it again.
testTracker.WaitForAck(1)
assert.Equal(t, acc.NFields(), 0)
}
func TestRunErrorInSubscriber(t *testing.T) {
subId := "sub-unexpected-error"
acc := &testutil.Accumulator{}
testParser, _ := parsers.NewInfluxParser()
sub := &stubSub{
id: subId,
messages: make(chan *testMsg, 100),
}
fakeErrStr := "a fake error"
sub.receiver = testMessagesError(sub, errors.New("a fake error"))
ps := &PubSub{
Log: testutil.Logger{},
parser: testParser,
stubSub: func() subscription { return sub },
Project: "projectIDontMatterForTests",
Subscription: subId,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
RetryReceiveDelaySeconds: 1,
}
if err := ps.Start(acc); err != nil {
t.Fatalf("test PubSub failed to start: %s", err)
}
defer ps.Stop()
if ps.sub == nil {
t.Fatal("expected plugin subscription to be non-nil")
}
acc.WaitError(1)
assert.Regexp(t, fakeErrStr, acc.Errors[0])
}
func validateTestInfluxMetric(t *testing.T, m *testutil.Metric) {
assert.Equal(t, "cpu_load_short", m.Measurement)
assert.Equal(t, "server01", m.Tags["host"])
assert.Equal(t, 23422.0, m.Fields["value"])
assert.Equal(t, int64(1422568543702900257), m.Time.UnixNano())
}