diff --git a/plugins/inputs/cloud_pubsub/README.md b/plugins/inputs/cloud_pubsub/README.md index eb08af105..6bf3fa29e 100644 --- a/plugins/inputs/cloud_pubsub/README.md +++ b/plugins/inputs/cloud_pubsub/README.md @@ -26,6 +26,12 @@ and creates metrics using one of the supported [input data formats][]. ## Application Default Credentials, which is preferred. # credentials_file = "path/to/my/creds.json" + ## Optional. Number of seconds to wait before attempting to restart the + ## PubSub subscription receiver after an unexpected error. + ## If the streaming pull for a PubSub Subscription fails (receiver), + ## the agent attempts to restart receiving messages after this many seconds. + # retry_delay_seconds = 5 + ## Optional. Maximum byte length of a message to consume. ## Larger messages are dropped with an error. If less than 0 or unspecified, ## treated as no limit. diff --git a/plugins/inputs/cloud_pubsub/pubsub.go b/plugins/inputs/cloud_pubsub/pubsub.go index 8c2b600b0..9f7125126 100644 --- a/plugins/inputs/cloud_pubsub/pubsub.go +++ b/plugins/inputs/cloud_pubsub/pubsub.go @@ -12,14 +12,19 @@ import ( "github.com/influxdata/telegraf/plugins/parsers" "golang.org/x/oauth2/google" "google.golang.org/api/option" + "log" + "time" ) type empty struct{} type semaphore chan empty const defaultMaxUndeliveredMessages = 1000 +const defaultRetryDelaySeconds = 5 type PubSub struct { + sync.Mutex + CredentialsFile string `toml:"credentials_file"` Project string `toml:"project"` Subscription string `toml:"subscription"` @@ -31,8 +36,9 @@ type PubSub struct { MaxReceiverGoRoutines int `toml:"max_receiver_go_routines"` // Agent settings - MaxMessageLen int `toml:"max_message_len"` - MaxUndeliveredMessages int `toml:"max_undelivered_messages"` + MaxMessageLen int `toml:"max_message_len"` + MaxUndeliveredMessages int `toml:"max_undelivered_messages"` + RetryReceiveDelaySeconds int `toml:"retry_delay_seconds"` sub subscription stubSub func() subscription @@ -42,7 +48,6 @@ type PubSub struct { parser parsers.Parser wg *sync.WaitGroup acc telegraf.TrackingAccumulator - mu sync.Mutex undelivered map[telegraf.TrackingID]message sem semaphore @@ -78,35 +83,36 @@ func (ps *PubSub) Start(ac telegraf.Accumulator) error { return fmt.Errorf(`"project" is required`) } - cctx, cancel := context.WithCancel(context.Background()) + ps.sem = make(semaphore, ps.MaxUndeliveredMessages) + ps.acc = ac.WithTracking(ps.MaxUndeliveredMessages) + + // Create top-level context with cancel that will be called on Stop(). + ctx, cancel := context.WithCancel(context.Background()) ps.cancel = cancel if ps.stubSub != nil { ps.sub = ps.stubSub() } else { - subRef, err := ps.getGCPSubscription(cctx, ps.Subscription) + subRef, err := ps.getGCPSubscription(ps.Subscription) if err != nil { - return err + return fmt.Errorf("unable to create subscription handle: %v", err) } ps.sub = subRef } ps.wg = &sync.WaitGroup{} - ps.acc = ac.WithTracking(ps.MaxUndeliveredMessages) - ps.sem = make(semaphore, ps.MaxUndeliveredMessages) - - // Start receiver in new goroutine for each subscription. - ps.wg.Add(1) - go func() { - defer ps.wg.Done() - ps.subReceive(cctx) - }() - // Start goroutine to handle delivery notifications from accumulator. ps.wg.Add(1) go func() { defer ps.wg.Done() - ps.receiveDelivered(cctx) + ps.waitForDelivery(ctx) + }() + + // Start goroutine for subscription receiver. + ps.wg.Add(1) + go func() { + defer ps.wg.Done() + ps.receiveWithRetry(ctx) }() return nil @@ -119,13 +125,41 @@ func (ps *PubSub) Stop() { ps.wg.Wait() } -func (ps *PubSub) subReceive(cctx context.Context) { +// startReceiver is called within a goroutine and manages keeping a +// subscription.Receive() up and running while the plugin has not been stopped. +func (ps *PubSub) receiveWithRetry(parentCtx context.Context) { + err := ps.startReceiver(parentCtx) + + for err != nil && parentCtx.Err() == nil { + log.Printf("E! [inputs.cloud_pubsub] Receiver for subscription %s exited with error: %v", ps.sub.ID(), err) + + delay := defaultRetryDelaySeconds + if ps.RetryReceiveDelaySeconds > 0 { + delay = ps.RetryReceiveDelaySeconds + } + + log.Printf("I! [inputs.cloud_pubsub] Waiting %d seconds before attempting to restart receiver...", delay) + time.Sleep(time.Duration(delay) * time.Second) + + err = ps.startReceiver(parentCtx) + } +} + +func (ps *PubSub) startReceiver(parentCtx context.Context) error { + log.Printf("I! [inputs.cloud_pubsub] Starting receiver for subscription %s...", ps.sub.ID()) + cctx, ccancel := context.WithCancel(parentCtx) err := ps.sub.Receive(cctx, func(ctx context.Context, msg message) { if err := ps.onMessage(ctx, msg); err != nil { ps.acc.AddError(fmt.Errorf("unable to add message from subscription %s: %v", ps.sub.ID(), err)) } }) - ps.acc.AddError(fmt.Errorf("receiver for subscription %s exited: %v", ps.sub.ID(), err)) + if err != nil { + ps.acc.AddError(fmt.Errorf("receiver for subscription %s exited: %v", ps.sub.ID(), err)) + } else { + log.Printf("I! [inputs.cloud_pubsub] subscription pull ended (no error, most likely stopped)") + } + ccancel() + return err } // onMessage handles parsing and adding a received message to the accumulator. @@ -153,8 +187,8 @@ func (ps *PubSub) onMessage(ctx context.Context, msg message) error { break } - ps.mu.Lock() - defer ps.mu.Unlock() + ps.Lock() + defer ps.Unlock() id := ps.acc.AddTrackingMetricGroup(metrics) if ps.undelivered == nil { @@ -165,10 +199,10 @@ func (ps *PubSub) onMessage(ctx context.Context, msg message) error { return nil } -func (ps *PubSub) receiveDelivered(ctx context.Context) { +func (ps *PubSub) waitForDelivery(parentCtx context.Context) { for { select { - case <-ctx.Done(): + case <-parentCtx.Done(): return case info := <-ps.acc.Delivered(): <-ps.sem @@ -182,8 +216,8 @@ func (ps *PubSub) receiveDelivered(ctx context.Context) { } func (ps *PubSub) removeDelivered(id telegraf.TrackingID) message { - ps.mu.Lock() - defer ps.mu.Unlock() + ps.Lock() + defer ps.Unlock() msg, ok := ps.undelivered[id] if !ok { @@ -219,7 +253,7 @@ func (ps *PubSub) getPubSubClient() (*pubsub.Client, error) { return client, nil } -func (ps *PubSub) getGCPSubscription(ctx context.Context, subId string) (subscription, error) { +func (ps *PubSub) getGCPSubscription(subId string) (subscription, error) { client, err := ps.getPubSubClient() if err != nil { return nil, err @@ -262,6 +296,12 @@ const sampleConfig = ` ## Application Default Credentials, which is preferred. # credentials_file = "path/to/my/creds.json" + ## Optional. Number of seconds to wait before attempting to restart the + ## PubSub subscription receiver after an unexpected error. + ## If the streaming pull for a PubSub Subscription fails (receiver), + ## the agent attempts to restart receiving messages after this many seconds. + # retry_delay_seconds = 5 + ## Optional. Maximum byte length of a message to consume. ## Larger messages are dropped with an error. If less than 0 or unspecified, ## treated as no limit. diff --git a/plugins/inputs/cloud_pubsub/pubsub_test.go b/plugins/inputs/cloud_pubsub/pubsub_test.go index fd3ffb63e..be6070d15 100644 --- a/plugins/inputs/cloud_pubsub/pubsub_test.go +++ b/plugins/inputs/cloud_pubsub/pubsub_test.go @@ -1,6 +1,7 @@ package cloud_pubsub import ( + "errors" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" @@ -21,6 +22,7 @@ func TestRunParse(t *testing.T) { id: subId, messages: make(chan *testMsg, 100), } + sub.receiver = testMessagesReceive(sub) ps := &PubSub{ parser: testParser, @@ -62,6 +64,7 @@ func TestRunInvalidMessages(t *testing.T) { id: subId, messages: make(chan *testMsg, 100), } + sub.receiver = testMessagesReceive(sub) ps := &PubSub{ parser: testParser, @@ -107,6 +110,7 @@ func TestRunOverlongMessages(t *testing.T) { id: subId, messages: make(chan *testMsg, 100), } + sub.receiver = testMessagesReceive(sub) ps := &PubSub{ parser: testParser, @@ -141,6 +145,41 @@ func TestRunOverlongMessages(t *testing.T) { assert.Equal(t, acc.NFields(), 0) } +func TestRunErrorInSubscriber(t *testing.T) { + subId := "sub-unexpected-error" + + acc := &testutil.Accumulator{} + + testParser, _ := parsers.NewInfluxParser() + + sub := &stubSub{ + id: subId, + messages: make(chan *testMsg, 100), + } + fakeErrStr := "a fake error" + sub.receiver = testMessagesError(sub, errors.New("a fake error")) + + ps := &PubSub{ + parser: testParser, + stubSub: func() subscription { return sub }, + Project: "projectIDontMatterForTests", + Subscription: subId, + MaxUndeliveredMessages: defaultMaxUndeliveredMessages, + RetryReceiveDelaySeconds: 1, + } + + if err := ps.Start(acc); err != nil { + t.Fatalf("test PubSub failed to start: %s", err) + } + defer ps.Stop() + + if ps.sub == nil { + t.Fatal("expected plugin subscription to be non-nil") + } + acc.WaitError(1) + assert.Regexp(t, fakeErrStr, acc.Errors[0]) +} + func validateTestInfluxMetric(t *testing.T, m *testutil.Metric) { assert.Equal(t, "cpu_load_short", m.Measurement) assert.Equal(t, "server01", m.Tags["host"]) diff --git a/plugins/inputs/cloud_pubsub/subscription_stub.go b/plugins/inputs/cloud_pubsub/subscription_stub.go index 018c5472c..e061728ca 100644 --- a/plugins/inputs/cloud_pubsub/subscription_stub.go +++ b/plugins/inputs/cloud_pubsub/subscription_stub.go @@ -9,6 +9,7 @@ import ( type stubSub struct { id string messages chan *testMsg + receiver receiveFunc } func (s *stubSub) ID() string { @@ -16,12 +17,26 @@ func (s *stubSub) ID() string { } func (s *stubSub) Receive(ctx context.Context, f func(context.Context, message)) error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - case m := <-s.messages: - f(ctx, m) + return s.receiver(ctx, f) +} + +type receiveFunc func(ctx context.Context, f func(context.Context, message)) error + +func testMessagesError(s *stubSub, expectedErr error) receiveFunc { + return func(ctx context.Context, f func(context.Context, message)) error { + return expectedErr + } +} + +func testMessagesReceive(s *stubSub) receiveFunc { + return func(ctx context.Context, f func(context.Context, message)) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case m := <-s.messages: + f(ctx, m) + } } } }