Retry restarting receiver on PubSub service error (#5458)

This commit is contained in:
emily 2019-02-20 17:33:56 -08:00 committed by Daniel Nelson
parent c234ba291e
commit 0a01713bcc
4 changed files with 132 additions and 32 deletions

View File

@ -26,6 +26,12 @@ and creates metrics using one of the supported [input data formats][].
## Application Default Credentials, which is preferred. ## Application Default Credentials, which is preferred.
# credentials_file = "path/to/my/creds.json" # credentials_file = "path/to/my/creds.json"
## Optional. Number of seconds to wait before attempting to restart the
## PubSub subscription receiver after an unexpected error.
## If the streaming pull for a PubSub Subscription fails (receiver),
## the agent attempts to restart receiving messages after this many seconds.
# retry_delay_seconds = 5
## Optional. Maximum byte length of a message to consume. ## Optional. Maximum byte length of a message to consume.
## Larger messages are dropped with an error. If less than 0 or unspecified, ## Larger messages are dropped with an error. If less than 0 or unspecified,
## treated as no limit. ## treated as no limit.

View File

@ -12,14 +12,19 @@ import (
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"golang.org/x/oauth2/google" "golang.org/x/oauth2/google"
"google.golang.org/api/option" "google.golang.org/api/option"
"log"
"time"
) )
type empty struct{} type empty struct{}
type semaphore chan empty type semaphore chan empty
const defaultMaxUndeliveredMessages = 1000 const defaultMaxUndeliveredMessages = 1000
const defaultRetryDelaySeconds = 5
type PubSub struct { type PubSub struct {
sync.Mutex
CredentialsFile string `toml:"credentials_file"` CredentialsFile string `toml:"credentials_file"`
Project string `toml:"project"` Project string `toml:"project"`
Subscription string `toml:"subscription"` Subscription string `toml:"subscription"`
@ -33,6 +38,7 @@ type PubSub struct {
// Agent settings // Agent settings
MaxMessageLen int `toml:"max_message_len"` MaxMessageLen int `toml:"max_message_len"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"` MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
RetryReceiveDelaySeconds int `toml:"retry_delay_seconds"`
sub subscription sub subscription
stubSub func() subscription stubSub func() subscription
@ -42,7 +48,6 @@ type PubSub struct {
parser parsers.Parser parser parsers.Parser
wg *sync.WaitGroup wg *sync.WaitGroup
acc telegraf.TrackingAccumulator acc telegraf.TrackingAccumulator
mu sync.Mutex
undelivered map[telegraf.TrackingID]message undelivered map[telegraf.TrackingID]message
sem semaphore sem semaphore
@ -78,35 +83,36 @@ func (ps *PubSub) Start(ac telegraf.Accumulator) error {
return fmt.Errorf(`"project" is required`) return fmt.Errorf(`"project" is required`)
} }
cctx, cancel := context.WithCancel(context.Background()) ps.sem = make(semaphore, ps.MaxUndeliveredMessages)
ps.acc = ac.WithTracking(ps.MaxUndeliveredMessages)
// Create top-level context with cancel that will be called on Stop().
ctx, cancel := context.WithCancel(context.Background())
ps.cancel = cancel ps.cancel = cancel
if ps.stubSub != nil { if ps.stubSub != nil {
ps.sub = ps.stubSub() ps.sub = ps.stubSub()
} else { } else {
subRef, err := ps.getGCPSubscription(cctx, ps.Subscription) subRef, err := ps.getGCPSubscription(ps.Subscription)
if err != nil { if err != nil {
return err return fmt.Errorf("unable to create subscription handle: %v", err)
} }
ps.sub = subRef ps.sub = subRef
} }
ps.wg = &sync.WaitGroup{} ps.wg = &sync.WaitGroup{}
ps.acc = ac.WithTracking(ps.MaxUndeliveredMessages)
ps.sem = make(semaphore, ps.MaxUndeliveredMessages)
// Start receiver in new goroutine for each subscription.
ps.wg.Add(1)
go func() {
defer ps.wg.Done()
ps.subReceive(cctx)
}()
// Start goroutine to handle delivery notifications from accumulator. // Start goroutine to handle delivery notifications from accumulator.
ps.wg.Add(1) ps.wg.Add(1)
go func() { go func() {
defer ps.wg.Done() defer ps.wg.Done()
ps.receiveDelivered(cctx) ps.waitForDelivery(ctx)
}()
// Start goroutine for subscription receiver.
ps.wg.Add(1)
go func() {
defer ps.wg.Done()
ps.receiveWithRetry(ctx)
}() }()
return nil return nil
@ -119,13 +125,41 @@ func (ps *PubSub) Stop() {
ps.wg.Wait() ps.wg.Wait()
} }
func (ps *PubSub) subReceive(cctx context.Context) { // startReceiver is called within a goroutine and manages keeping a
// subscription.Receive() up and running while the plugin has not been stopped.
func (ps *PubSub) receiveWithRetry(parentCtx context.Context) {
err := ps.startReceiver(parentCtx)
for err != nil && parentCtx.Err() == nil {
log.Printf("E! [inputs.cloud_pubsub] Receiver for subscription %s exited with error: %v", ps.sub.ID(), err)
delay := defaultRetryDelaySeconds
if ps.RetryReceiveDelaySeconds > 0 {
delay = ps.RetryReceiveDelaySeconds
}
log.Printf("I! [inputs.cloud_pubsub] Waiting %d seconds before attempting to restart receiver...", delay)
time.Sleep(time.Duration(delay) * time.Second)
err = ps.startReceiver(parentCtx)
}
}
func (ps *PubSub) startReceiver(parentCtx context.Context) error {
log.Printf("I! [inputs.cloud_pubsub] Starting receiver for subscription %s...", ps.sub.ID())
cctx, ccancel := context.WithCancel(parentCtx)
err := ps.sub.Receive(cctx, func(ctx context.Context, msg message) { err := ps.sub.Receive(cctx, func(ctx context.Context, msg message) {
if err := ps.onMessage(ctx, msg); err != nil { if err := ps.onMessage(ctx, msg); err != nil {
ps.acc.AddError(fmt.Errorf("unable to add message from subscription %s: %v", ps.sub.ID(), err)) ps.acc.AddError(fmt.Errorf("unable to add message from subscription %s: %v", ps.sub.ID(), err))
} }
}) })
if err != nil {
ps.acc.AddError(fmt.Errorf("receiver for subscription %s exited: %v", ps.sub.ID(), err)) ps.acc.AddError(fmt.Errorf("receiver for subscription %s exited: %v", ps.sub.ID(), err))
} else {
log.Printf("I! [inputs.cloud_pubsub] subscription pull ended (no error, most likely stopped)")
}
ccancel()
return err
} }
// onMessage handles parsing and adding a received message to the accumulator. // onMessage handles parsing and adding a received message to the accumulator.
@ -153,8 +187,8 @@ func (ps *PubSub) onMessage(ctx context.Context, msg message) error {
break break
} }
ps.mu.Lock() ps.Lock()
defer ps.mu.Unlock() defer ps.Unlock()
id := ps.acc.AddTrackingMetricGroup(metrics) id := ps.acc.AddTrackingMetricGroup(metrics)
if ps.undelivered == nil { if ps.undelivered == nil {
@ -165,10 +199,10 @@ func (ps *PubSub) onMessage(ctx context.Context, msg message) error {
return nil return nil
} }
func (ps *PubSub) receiveDelivered(ctx context.Context) { func (ps *PubSub) waitForDelivery(parentCtx context.Context) {
for { for {
select { select {
case <-ctx.Done(): case <-parentCtx.Done():
return return
case info := <-ps.acc.Delivered(): case info := <-ps.acc.Delivered():
<-ps.sem <-ps.sem
@ -182,8 +216,8 @@ func (ps *PubSub) receiveDelivered(ctx context.Context) {
} }
func (ps *PubSub) removeDelivered(id telegraf.TrackingID) message { func (ps *PubSub) removeDelivered(id telegraf.TrackingID) message {
ps.mu.Lock() ps.Lock()
defer ps.mu.Unlock() defer ps.Unlock()
msg, ok := ps.undelivered[id] msg, ok := ps.undelivered[id]
if !ok { if !ok {
@ -219,7 +253,7 @@ func (ps *PubSub) getPubSubClient() (*pubsub.Client, error) {
return client, nil return client, nil
} }
func (ps *PubSub) getGCPSubscription(ctx context.Context, subId string) (subscription, error) { func (ps *PubSub) getGCPSubscription(subId string) (subscription, error) {
client, err := ps.getPubSubClient() client, err := ps.getPubSubClient()
if err != nil { if err != nil {
return nil, err return nil, err
@ -262,6 +296,12 @@ const sampleConfig = `
## Application Default Credentials, which is preferred. ## Application Default Credentials, which is preferred.
# credentials_file = "path/to/my/creds.json" # credentials_file = "path/to/my/creds.json"
## Optional. Number of seconds to wait before attempting to restart the
## PubSub subscription receiver after an unexpected error.
## If the streaming pull for a PubSub Subscription fails (receiver),
## the agent attempts to restart receiving messages after this many seconds.
# retry_delay_seconds = 5
## Optional. Maximum byte length of a message to consume. ## Optional. Maximum byte length of a message to consume.
## Larger messages are dropped with an error. If less than 0 or unspecified, ## Larger messages are dropped with an error. If less than 0 or unspecified,
## treated as no limit. ## treated as no limit.

View File

@ -1,6 +1,7 @@
package cloud_pubsub package cloud_pubsub
import ( import (
"errors"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -21,6 +22,7 @@ func TestRunParse(t *testing.T) {
id: subId, id: subId,
messages: make(chan *testMsg, 100), messages: make(chan *testMsg, 100),
} }
sub.receiver = testMessagesReceive(sub)
ps := &PubSub{ ps := &PubSub{
parser: testParser, parser: testParser,
@ -62,6 +64,7 @@ func TestRunInvalidMessages(t *testing.T) {
id: subId, id: subId,
messages: make(chan *testMsg, 100), messages: make(chan *testMsg, 100),
} }
sub.receiver = testMessagesReceive(sub)
ps := &PubSub{ ps := &PubSub{
parser: testParser, parser: testParser,
@ -107,6 +110,7 @@ func TestRunOverlongMessages(t *testing.T) {
id: subId, id: subId,
messages: make(chan *testMsg, 100), messages: make(chan *testMsg, 100),
} }
sub.receiver = testMessagesReceive(sub)
ps := &PubSub{ ps := &PubSub{
parser: testParser, parser: testParser,
@ -141,6 +145,41 @@ func TestRunOverlongMessages(t *testing.T) {
assert.Equal(t, acc.NFields(), 0) assert.Equal(t, acc.NFields(), 0)
} }
func TestRunErrorInSubscriber(t *testing.T) {
subId := "sub-unexpected-error"
acc := &testutil.Accumulator{}
testParser, _ := parsers.NewInfluxParser()
sub := &stubSub{
id: subId,
messages: make(chan *testMsg, 100),
}
fakeErrStr := "a fake error"
sub.receiver = testMessagesError(sub, errors.New("a fake error"))
ps := &PubSub{
parser: testParser,
stubSub: func() subscription { return sub },
Project: "projectIDontMatterForTests",
Subscription: subId,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
RetryReceiveDelaySeconds: 1,
}
if err := ps.Start(acc); err != nil {
t.Fatalf("test PubSub failed to start: %s", err)
}
defer ps.Stop()
if ps.sub == nil {
t.Fatal("expected plugin subscription to be non-nil")
}
acc.WaitError(1)
assert.Regexp(t, fakeErrStr, acc.Errors[0])
}
func validateTestInfluxMetric(t *testing.T, m *testutil.Metric) { func validateTestInfluxMetric(t *testing.T, m *testutil.Metric) {
assert.Equal(t, "cpu_load_short", m.Measurement) assert.Equal(t, "cpu_load_short", m.Measurement)
assert.Equal(t, "server01", m.Tags["host"]) assert.Equal(t, "server01", m.Tags["host"])

View File

@ -9,6 +9,7 @@ import (
type stubSub struct { type stubSub struct {
id string id string
messages chan *testMsg messages chan *testMsg
receiver receiveFunc
} }
func (s *stubSub) ID() string { func (s *stubSub) ID() string {
@ -16,6 +17,19 @@ func (s *stubSub) ID() string {
} }
func (s *stubSub) Receive(ctx context.Context, f func(context.Context, message)) error { func (s *stubSub) Receive(ctx context.Context, f func(context.Context, message)) error {
return s.receiver(ctx, f)
}
type receiveFunc func(ctx context.Context, f func(context.Context, message)) error
func testMessagesError(s *stubSub, expectedErr error) receiveFunc {
return func(ctx context.Context, f func(context.Context, message)) error {
return expectedErr
}
}
func testMessagesReceive(s *stubSub) receiveFunc {
return func(ctx context.Context, f func(context.Context, message)) error {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -24,6 +38,7 @@ func (s *stubSub) Receive(ctx context.Context, f func(context.Context, message))
f(ctx, m) f(ctx, m)
} }
} }
}
} }
type testMsg struct { type testMsg struct {