package cloud_pubsub import ( "context" "fmt" "sync" "encoding/base64" "time" "cloud.google.com/go/pubsub" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" "golang.org/x/oauth2/google" "google.golang.org/api/option" ) type empty struct{} type semaphore chan empty const defaultMaxUndeliveredMessages = 1000 const defaultRetryDelaySeconds = 5 type PubSub struct { sync.Mutex CredentialsFile string `toml:"credentials_file"` Project string `toml:"project"` Subscription string `toml:"subscription"` // Subscription ReceiveSettings MaxExtension internal.Duration `toml:"max_extension"` MaxOutstandingMessages int `toml:"max_outstanding_messages"` MaxOutstandingBytes int `toml:"max_outstanding_bytes"` MaxReceiverGoRoutines int `toml:"max_receiver_go_routines"` // Agent settings MaxMessageLen int `toml:"max_message_len"` MaxUndeliveredMessages int `toml:"max_undelivered_messages"` RetryReceiveDelaySeconds int `toml:"retry_delay_seconds"` Base64Data bool `toml:"base64_data"` Log telegraf.Logger sub subscription stubSub func() subscription cancel context.CancelFunc parser parsers.Parser wg *sync.WaitGroup acc telegraf.TrackingAccumulator undelivered map[telegraf.TrackingID]message sem semaphore } func (ps *PubSub) Description() string { return "Read metrics from Google PubSub" } func (ps *PubSub) SampleConfig() string { return fmt.Sprintf(sampleConfig, defaultMaxUndeliveredMessages) } // Gather does nothing for this service input. func (ps *PubSub) Gather(acc telegraf.Accumulator) error { return nil } // SetParser implements ParserInput interface. func (ps *PubSub) SetParser(parser parsers.Parser) { ps.parser = parser } // Start initializes the plugin and processing messages from Google PubSub. // Two goroutines are started - one pulling for the subscription, one // receiving delivery notifications from the accumulator. func (ps *PubSub) Start(ac telegraf.Accumulator) error { if ps.Subscription == "" { return fmt.Errorf(`"subscription" is required`) } if ps.Project == "" { return fmt.Errorf(`"project" is required`) } ps.sem = make(semaphore, ps.MaxUndeliveredMessages) ps.acc = ac.WithTracking(ps.MaxUndeliveredMessages) // Create top-level context with cancel that will be called on Stop(). ctx, cancel := context.WithCancel(context.Background()) ps.cancel = cancel if ps.stubSub != nil { ps.sub = ps.stubSub() } else { subRef, err := ps.getGCPSubscription(ps.Subscription) if err != nil { return fmt.Errorf("unable to create subscription handle: %v", err) } ps.sub = subRef } ps.wg = &sync.WaitGroup{} // Start goroutine to handle delivery notifications from accumulator. ps.wg.Add(1) go func() { defer ps.wg.Done() ps.waitForDelivery(ctx) }() // Start goroutine for subscription receiver. ps.wg.Add(1) go func() { defer ps.wg.Done() ps.receiveWithRetry(ctx) }() return nil } // Stop ensures the PubSub subscriptions receivers are stopped by // canceling the context and waits for goroutines to finish. func (ps *PubSub) Stop() { ps.cancel() ps.wg.Wait() } // startReceiver is called within a goroutine and manages keeping a // subscription.Receive() up and running while the plugin has not been stopped. func (ps *PubSub) receiveWithRetry(parentCtx context.Context) { err := ps.startReceiver(parentCtx) for err != nil && parentCtx.Err() == nil { ps.Log.Errorf("Receiver for subscription %s exited with error: %v", ps.sub.ID(), err) delay := defaultRetryDelaySeconds if ps.RetryReceiveDelaySeconds > 0 { delay = ps.RetryReceiveDelaySeconds } ps.Log.Infof("Waiting %d seconds before attempting to restart receiver...", delay) time.Sleep(time.Duration(delay) * time.Second) err = ps.startReceiver(parentCtx) } } func (ps *PubSub) startReceiver(parentCtx context.Context) error { ps.Log.Infof("Starting receiver for subscription %s...", ps.sub.ID()) cctx, ccancel := context.WithCancel(parentCtx) err := ps.sub.Receive(cctx, func(ctx context.Context, msg message) { if err := ps.onMessage(ctx, msg); err != nil { ps.acc.AddError(fmt.Errorf("unable to add message from subscription %s: %v", ps.sub.ID(), err)) } }) if err != nil { ps.acc.AddError(fmt.Errorf("receiver for subscription %s exited: %v", ps.sub.ID(), err)) } else { ps.Log.Info("Subscription pull ended (no error, most likely stopped)") } ccancel() return err } // onMessage handles parsing and adding a received message to the accumulator. func (ps *PubSub) onMessage(ctx context.Context, msg message) error { if ps.MaxMessageLen > 0 && len(msg.Data()) > ps.MaxMessageLen { msg.Ack() return fmt.Errorf("message longer than max_message_len (%d > %d)", len(msg.Data()), ps.MaxMessageLen) } var data []byte if ps.Base64Data { strData, err := base64.StdEncoding.DecodeString(string(msg.Data())) if err != nil { return fmt.Errorf("unable to base64 decode message: %v", err) } data = []byte(strData) } else { data = msg.Data() } metrics, err := ps.parser.Parse(data) if err != nil { msg.Ack() return err } if len(metrics) == 0 { msg.Ack() return nil } select { case <-ctx.Done(): return ctx.Err() case ps.sem <- empty{}: break } ps.Lock() defer ps.Unlock() id := ps.acc.AddTrackingMetricGroup(metrics) if ps.undelivered == nil { ps.undelivered = make(map[telegraf.TrackingID]message) } ps.undelivered[id] = msg return nil } func (ps *PubSub) waitForDelivery(parentCtx context.Context) { for { select { case <-parentCtx.Done(): return case info := <-ps.acc.Delivered(): <-ps.sem msg := ps.removeDelivered(info.ID()) if msg != nil { msg.Ack() } } } } func (ps *PubSub) removeDelivered(id telegraf.TrackingID) message { ps.Lock() defer ps.Unlock() msg, ok := ps.undelivered[id] if !ok { return nil } delete(ps.undelivered, id) return msg } func (ps *PubSub) getPubSubClient() (*pubsub.Client, error) { var credsOpt option.ClientOption if ps.CredentialsFile != "" { credsOpt = option.WithCredentialsFile(ps.CredentialsFile) } else { creds, err := google.FindDefaultCredentials(context.Background(), pubsub.ScopeCloudPlatform) if err != nil { return nil, fmt.Errorf( "unable to find GCP Application Default Credentials: %v."+ "Either set ADC or provide CredentialsFile config", err) } credsOpt = option.WithCredentials(creds) } client, err := pubsub.NewClient( context.Background(), ps.Project, credsOpt, option.WithScopes(pubsub.ScopeCloudPlatform), option.WithUserAgent(internal.ProductToken()), ) if err != nil { return nil, fmt.Errorf("unable to generate PubSub client: %v", err) } return client, nil } func (ps *PubSub) getGCPSubscription(subId string) (subscription, error) { client, err := ps.getPubSubClient() if err != nil { return nil, err } s := client.Subscription(subId) s.ReceiveSettings = pubsub.ReceiveSettings{ NumGoroutines: ps.MaxReceiverGoRoutines, MaxExtension: ps.MaxExtension.Duration, MaxOutstandingMessages: ps.MaxOutstandingMessages, MaxOutstandingBytes: ps.MaxOutstandingBytes, } return &gcpSubscription{s}, nil } func init() { inputs.Add("cloud_pubsub", func() telegraf.Input { ps := &PubSub{ MaxUndeliveredMessages: defaultMaxUndeliveredMessages, } return ps }) } const sampleConfig = ` ## Required. Name of Google Cloud Platform (GCP) Project that owns ## the given PubSub subscription. project = "my-project" ## Required. Name of PubSub subscription to ingest metrics from. subscription = "my-subscription" ## Required. Data format to consume. ## Each data format has its own unique set of configuration options. ## Read more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "influx" ## Optional. Filepath for GCP credentials JSON file to authorize calls to ## PubSub APIs. If not set explicitly, Telegraf will attempt to use ## Application Default Credentials, which is preferred. # credentials_file = "path/to/my/creds.json" ## Optional. Number of seconds to wait before attempting to restart the ## PubSub subscription receiver after an unexpected error. ## If the streaming pull for a PubSub Subscription fails (receiver), ## the agent attempts to restart receiving messages after this many seconds. # retry_delay_seconds = 5 ## Optional. Maximum byte length of a message to consume. ## Larger messages are dropped with an error. If less than 0 or unspecified, ## treated as no limit. # max_message_len = 1000000 ## Optional. Maximum messages to read from PubSub that have not been written ## to an output. Defaults to %d. ## For best throughput set based on the number of metrics within ## each message and the size of the output's metric_batch_size. ## ## For example, if each message contains 10 metrics and the output ## metric_batch_size is 1000, setting this to 100 will ensure that a ## full batch is collected and the write is triggered immediately without ## waiting until the next flush_interval. # max_undelivered_messages = 1000 ## The following are optional Subscription ReceiveSettings in PubSub. ## Read more about these values: ## https://godoc.org/cloud.google.com/go/pubsub#ReceiveSettings ## Optional. Maximum number of seconds for which a PubSub subscription ## should auto-extend the PubSub ACK deadline for each message. If less than ## 0, auto-extension is disabled. # max_extension = 0 ## Optional. Maximum number of unprocessed messages in PubSub ## (unacknowledged but not yet expired in PubSub). ## A value of 0 is treated as the default PubSub value. ## Negative values will be treated as unlimited. # max_outstanding_messages = 0 ## Optional. Maximum size in bytes of unprocessed messages in PubSub ## (unacknowledged but not yet expired in PubSub). ## A value of 0 is treated as the default PubSub value. ## Negative values will be treated as unlimited. # max_outstanding_bytes = 0 ## Optional. Max number of goroutines a PubSub Subscription receiver can spawn ## to pull messages from PubSub concurrently. This limit applies to each ## subscription separately and is treated as the PubSub default if less than ## 1. Note this setting does not limit the number of messages that can be ## processed concurrently (use "max_outstanding_messages" instead). # max_receiver_go_routines = 0 ## Optional. If true, Telegraf will attempt to base64 decode the ## PubSub message data before parsing # base64_data = false `