diff --git a/Gopkg.lock b/Gopkg.lock index 521740e05..ba4564a3d 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -7,8 +7,13 @@ packages = [ "civil", "compute/metadata", + "iam", + "internal/optional", "internal/version", "monitoring/apiv3", + "pubsub", + "pubsub/apiv1", + "pubsub/internal/distribution", ] pruneopts = "" revision = "c728a003b238b26cef9ab6753a5dc424b331c3ad" @@ -1217,6 +1222,17 @@ pruneopts = "" revision = "d2e6202438beef2727060aa7cabdd924d92ebfd9" +[[projects]] + branch = "master" + digest = "1:88ecca26e54f601a8733c9a31d9f0883b915216a177673f0467f6b864fd0d90f" + name = "golang.org/x/sync" + packages = [ + "errgroup", + "semaphore", + ] + pruneopts = "" + revision = "42b317875d0fa942474b76e1b46a6060d720ae6e" + [[projects]] branch = "master" digest = "1:6a6eed3727d0e15703d9e930d8dbe333bea09eda309d75a015d3c6dc4e5c92a6" @@ -1277,6 +1293,7 @@ "internal", "iterator", "option", + "support/bundler", "transport", "transport/grpc", "transport/http", @@ -1316,7 +1333,9 @@ "googleapis/api/label", "googleapis/api/metric", "googleapis/api/monitoredres", + "googleapis/iam/v1", "googleapis/monitoring/v3", + "googleapis/pubsub/v1", "googleapis/rpc/status", "protobuf/field_mask", ] @@ -1459,6 +1478,7 @@ analyzer-version = 1 input-imports = [ "cloud.google.com/go/monitoring/apiv3", + "cloud.google.com/go/pubsub", "collectd.org/api", "collectd.org/network", "github.com/Azure/go-autorest/autorest", @@ -1562,6 +1582,7 @@ "golang.org/x/net/html/charset", "golang.org/x/oauth2", "golang.org/x/oauth2/clientcredentials", + "golang.org/x/oauth2/google", "golang.org/x/sys/unix", "golang.org/x/sys/windows", "golang.org/x/sys/windows/svc", diff --git a/internal/internal.go b/internal/internal.go index 8acf63e96..a0a3ec0ec 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -18,7 +18,9 @@ import ( "time" "unicode" + "fmt" "github.com/alecthomas/units" + "runtime" ) const alphanum string = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" @@ -58,6 +60,11 @@ func Version() string { return version } +// ProductToken returns a tag for Telegraf that can be used in user agents. +func ProductToken() string { + return fmt.Sprintf("Telegraf/%s Go/%s", Version(), runtime.Version()) +} + // UnmarshalTOML parses the duration from the TOML config file func (d *Duration) UnmarshalTOML(b []byte) error { var err error diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index cfdc12ad2..9c183fcbb 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -14,6 +14,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/ceph" _ "github.com/influxdata/telegraf/plugins/inputs/cgroup" _ "github.com/influxdata/telegraf/plugins/inputs/chrony" + _ "github.com/influxdata/telegraf/plugins/inputs/cloud_pubsub" _ "github.com/influxdata/telegraf/plugins/inputs/cloudwatch" _ "github.com/influxdata/telegraf/plugins/inputs/conntrack" _ "github.com/influxdata/telegraf/plugins/inputs/consul" diff --git a/plugins/inputs/cloud_pubsub/README.md b/plugins/inputs/cloud_pubsub/README.md new file mode 100644 index 000000000..159c793f2 --- /dev/null +++ b/plugins/inputs/cloud_pubsub/README.md @@ -0,0 +1,90 @@ +# Google Cloud PubSub Input Plugin + +The GCP PubSub plugin ingests metrics from [Google Cloud PubSub][pubsub] +and creates metrics using one of the supported [input data formats][]. + + +### Configuration + +This section contains the default TOML to configure the plugin. You can +generate it using `telegraf --usage pubsub`. + +```toml +[[inputs.pubsub]] +## Required. Name of Google Cloud Platform (GCP) Project that owns + ## the given PubSub subscription. + project = "my-project" + + ## Required. Name of PubSub subscription to ingest metrics from. + subscription = "my-subscription" + + ## Required. Data format to consume. + ## Each data format has its own unique set of configuration options. + ## Read more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" + + ## Optional. Filepath for GCP credentials JSON file to authorize calls to + ## PubSub APIs. If not set explicitly, Telegraf will attempt to use + ## Application Default Credentials, which is preferred. + # credentials_file = "path/to/my/creds.json" + + ## Optional. Maximum byte length of a message to consume. + ## Larger messages are dropped with an error. If less than 0 or unspecified, + ## treated as no limit. + # max_message_len = 1000000 + + ## Optional. Maximum messages to read from PubSub that have not been written + ## to an output. Defaults to %d. + ## For best throughput set based on the number of metrics within + ## each message and the size of the output's metric_batch_size. + ## + ## For example, if each message contains 10 metrics and the output + ## metric_batch_size is 1000, setting this to 100 will ensure that a + ## full batch is collected and the write is triggered immediately without + ## waiting until the next flush_interval. + # max_undelivered_messages = 1000 + + ## The following are optional Subscription ReceiveSettings in PubSub. + ## Read more about these values: + ## https://godoc.org/cloud.google.com/go/pubsub#ReceiveSettings + + ## Optional. Maximum number of seconds for which a PubSub subscription + ## should auto-extend the PubSub ACK deadline for each message. If less than + ## 0, auto-extension is disabled. + # max_extension = 0 + + ## Optional. Maximum number of unprocessed messages in PubSub + ## (unacknowledged but not yet expired in PubSub). + ## A value of 0 is treated as the default PubSub value. + ## Negative values will be treated as unlimited. + # max_outstanding_messages = 0 + + ## Optional. Maximum size in bytes of unprocessed messages in PubSub + ## (unacknowledged but not yet expired in PubSub). + ## A value of 0 is treated as the default PubSub value. + ## Negative values will be treated as unlimited. + # max_outstanding_bytes = 0 + + ## Optional. Max number of goroutines a PubSub Subscription receiver can spawn + ## to pull messages from PubSub concurrently. This limit applies to each + ## subscription separately and is treated as the PubSub default if less than + ## 1. Note this setting does not limit the number of messages that can be + ## processed concurrently (use "max_outstanding_messages" instead). + # max_receiver_go_routines = 0 +``` + +### Multiple Subscriptions and Topics + +This plugin assumes you have already created a PULL subscription for a given +PubSub topic. To learn how to do so, see [how to create a subscription][pubsub create sub]. + +Each plugin agent can listen to one subscription at a time, so you will +need to run multiple instances of the plugin to pull messages from multiple +subscriptions/topics. + + + +[pubsub]: https://cloud.google.com/pubsub +[pubsub create sub]: https://cloud.google.com/pubsub/docs/admin#create_a_pull_subscription +[input data formats]: /docs/DATA_FORMATS_INPUT.md diff --git a/plugins/inputs/cloud_pubsub/pubsub.go b/plugins/inputs/cloud_pubsub/pubsub.go new file mode 100644 index 000000000..bb22a8dcb --- /dev/null +++ b/plugins/inputs/cloud_pubsub/pubsub.go @@ -0,0 +1,307 @@ +package cloud_pubsub + +import ( + "cloud.google.com/go/pubsub" + "context" + "fmt" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" + "golang.org/x/oauth2/google" + "google.golang.org/api/option" + "sync" +) + +type empty struct{} +type semaphore chan empty + +const defaultMaxUndeliveredMessages = 1000 + +type PubSub struct { + CredentialsFile string `toml:"credentials_file"` + Project string `toml:"project"` + Subscription string `toml:"subscription"` + + // Subscription ReceiveSettings + MaxExtension internal.Duration `toml:"max_extension"` + MaxOutstandingMessages int `toml:"max_outstanding_messages"` + MaxOutstandingBytes int `toml:"max_outstanding_bytes"` + MaxReceiverGoRoutines int `toml:"max_receiver_go_routines"` + + // Agent settings + MaxMessageLen int `toml:"max_message_len"` + MaxUndeliveredMessages int `toml:"max_undelivered_messages"` + + sub subscription + stubSub func() subscription + + cancel context.CancelFunc + + parser parsers.Parser + wg *sync.WaitGroup + acc telegraf.TrackingAccumulator + mu sync.Mutex + + undelivered map[telegraf.TrackingID]message + sem semaphore +} + +func (ps *PubSub) Description() string { + return "Read metrics from Google PubSub" +} + +func (ps *PubSub) SampleConfig() string { + return fmt.Sprintf(sampleConfig, defaultMaxUndeliveredMessages) +} + +// Gather does nothing for this service input. +func (ps *PubSub) Gather(acc telegraf.Accumulator) error { + return nil +} + +// SetParser implements ParserInput interface. +func (ps *PubSub) SetParser(parser parsers.Parser) { + ps.parser = parser +} + +// Start initializes the plugin and processing messages from Google PubSub. +// Two goroutines are started - one pulling for the subscription, one +// receiving delivery notifications from the accumulator. +func (ps *PubSub) Start(ac telegraf.Accumulator) error { + if ps.Subscription == "" { + return fmt.Errorf(`"subscription" is required`) + } + + if ps.Project == "" { + return fmt.Errorf(`"project" is required`) + } + + cctx, cancel := context.WithCancel(context.Background()) + ps.cancel = cancel + + if ps.stubSub != nil { + ps.sub = ps.stubSub() + } else { + subRef, err := ps.getGCPSubscription(cctx, ps.Subscription) + if err != nil { + return err + } + ps.sub = subRef + } + + ps.wg = &sync.WaitGroup{} + ps.acc = ac.WithTracking(ps.MaxUndeliveredMessages) + ps.sem = make(semaphore, ps.MaxUndeliveredMessages) + + // Start receiver in new goroutine for each subscription. + ps.wg.Add(1) + go func() { + defer ps.wg.Done() + ps.subReceive(cctx) + }() + + // Start goroutine to handle delivery notifications from accumulator. + ps.wg.Add(1) + go func() { + defer ps.wg.Done() + ps.receiveDelivered(cctx) + }() + + return nil +} + +// Stop ensures the PubSub subscriptions receivers are stopped by +// canceling the context and waits for goroutines to finish. +func (ps *PubSub) Stop() { + ps.cancel() + ps.wg.Wait() +} + +func (ps *PubSub) subReceive(cctx context.Context) { + err := ps.sub.Receive(cctx, func(ctx context.Context, msg message) { + if err := ps.onMessage(ctx, msg); err != nil { + ps.acc.AddError(fmt.Errorf("unable to add message from subscription %s: %v", ps.sub.ID(), err)) + } + }) + ps.acc.AddError(fmt.Errorf("receiver for subscription %s exited: %v", ps.sub.ID(), err)) +} + +// onMessage handles parsing and adding a received message to the accumulator. +func (ps *PubSub) onMessage(ctx context.Context, msg message) error { + if ps.MaxMessageLen > 0 && len(msg.Data()) > ps.MaxMessageLen { + msg.Ack() + return fmt.Errorf("message longer than max_message_len (%d > %d)", len(msg.Data()), ps.MaxMessageLen) + } + + metrics, err := ps.parser.Parse(msg.Data()) + if err != nil { + msg.Ack() + return err + } + + if len(metrics) == 0 { + msg.Ack() + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case ps.sem <- empty{}: + break + } + + ps.mu.Lock() + defer ps.mu.Unlock() + + id := ps.acc.AddTrackingMetricGroup(metrics) + if ps.undelivered == nil { + ps.undelivered = make(map[telegraf.TrackingID]message) + } + ps.undelivered[id] = msg + + return nil +} + +func (ps *PubSub) receiveDelivered(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case info := <-ps.acc.Delivered(): + <-ps.sem + msg := ps.removeDelivered(info.ID()) + + if msg != nil { + msg.Ack() + } + } + } +} + +func (ps *PubSub) removeDelivered(id telegraf.TrackingID) message { + ps.mu.Lock() + defer ps.mu.Unlock() + + msg, ok := ps.undelivered[id] + if !ok { + return nil + } + delete(ps.undelivered, id) + return msg +} + +func (ps *PubSub) getPubSubClient() (*pubsub.Client, error) { + var credsOpt option.ClientOption + if ps.CredentialsFile != "" { + credsOpt = option.WithCredentialsFile(ps.CredentialsFile) + } else { + creds, err := google.FindDefaultCredentials(context.Background(), pubsub.ScopeCloudPlatform) + if err != nil { + return nil, fmt.Errorf( + "unable to find GCP Application Default Credentials: %v."+ + "Either set ADC or provide CredentialsFile config", err) + } + credsOpt = option.WithCredentials(creds) + } + client, err := pubsub.NewClient( + context.Background(), + ps.Project, + credsOpt, + option.WithScopes(pubsub.ScopeCloudPlatform), + option.WithUserAgent(internal.ProductToken()), + ) + if err != nil { + return nil, fmt.Errorf("unable to generate PubSub client: %v", err) + } + return client, nil +} + +func (ps *PubSub) getGCPSubscription(ctx context.Context, subId string) (subscription, error) { + client, err := ps.getPubSubClient() + if err != nil { + return nil, err + } + s := client.Subscription(subId) + s.ReceiveSettings = pubsub.ReceiveSettings{ + NumGoroutines: ps.MaxReceiverGoRoutines, + MaxExtension: ps.MaxExtension.Duration, + MaxOutstandingMessages: ps.MaxOutstandingMessages, + MaxOutstandingBytes: ps.MaxOutstandingBytes, + } + return &gcpSubscription{s}, nil +} + +func init() { + inputs.Add("cloud_pubsub", func() telegraf.Input { + ps := &PubSub{ + MaxUndeliveredMessages: defaultMaxUndeliveredMessages, + } + return ps + }) +} + +const sampleConfig = ` + ## Required. Name of Google Cloud Platform (GCP) Project that owns + ## the given PubSub subscription. + project = "my-project" + + ## Required. Name of PubSub subscription to ingest metrics from. + subscription = "my-subscription" + + ## Required. Data format to consume. + ## Each data format has its own unique set of configuration options. + ## Read more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" + + ## Optional. Filepath for GCP credentials JSON file to authorize calls to + ## PubSub APIs. If not set explicitly, Telegraf will attempt to use + ## Application Default Credentials, which is preferred. + # credentials_file = "path/to/my/creds.json" + + ## Optional. Maximum byte length of a message to consume. + ## Larger messages are dropped with an error. If less than 0 or unspecified, + ## treated as no limit. + # max_message_len = 1000000 + + ## Optional. Maximum messages to read from PubSub that have not been written + ## to an output. Defaults to %d. + ## For best throughput set based on the number of metrics within + ## each message and the size of the output's metric_batch_size. + ## + ## For example, if each message contains 10 metrics and the output + ## metric_batch_size is 1000, setting this to 100 will ensure that a + ## full batch is collected and the write is triggered immediately without + ## waiting until the next flush_interval. + # max_undelivered_messages = 1000 + + ## The following are optional Subscription ReceiveSettings in PubSub. + ## Read more about these values: + ## https://godoc.org/cloud.google.com/go/pubsub#ReceiveSettings + + ## Optional. Maximum number of seconds for which a PubSub subscription + ## should auto-extend the PubSub ACK deadline for each message. If less than + ## 0, auto-extension is disabled. + # max_extension = 0 + + ## Optional. Maximum number of unprocessed messages in PubSub + ## (unacknowledged but not yet expired in PubSub). + ## A value of 0 is treated as the default PubSub value. + ## Negative values will be treated as unlimited. + # max_outstanding_messages = 0 + + ## Optional. Maximum size in bytes of unprocessed messages in PubSub + ## (unacknowledged but not yet expired in PubSub). + ## A value of 0 is treated as the default PubSub value. + ## Negative values will be treated as unlimited. + # max_outstanding_bytes = 0 + + ## Optional. Max number of goroutines a PubSub Subscription receiver can spawn + ## to pull messages from PubSub concurrently. This limit applies to each + ## subscription separately and is treated as the PubSub default if less than + ## 1. Note this setting does not limit the number of messages that can be + ## processed concurrently (use "max_outstanding_messages" instead). + # max_receiver_go_routines = 0 +` diff --git a/plugins/inputs/cloud_pubsub/pubsub_test.go b/plugins/inputs/cloud_pubsub/pubsub_test.go new file mode 100644 index 000000000..fd3ffb63e --- /dev/null +++ b/plugins/inputs/cloud_pubsub/pubsub_test.go @@ -0,0 +1,149 @@ +package cloud_pubsub + +import ( + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "testing" +) + +const ( + msgInflux = "cpu_load_short,host=server01 value=23422.0 1422568543702900257\n" +) + +// Test ingesting InfluxDB-format PubSub message +func TestRunParse(t *testing.T) { + subId := "sub-run-parse" + + testParser, _ := parsers.NewInfluxParser() + + sub := &stubSub{ + id: subId, + messages: make(chan *testMsg, 100), + } + + ps := &PubSub{ + parser: testParser, + stubSub: func() subscription { return sub }, + Project: "projectIDontMatterForTests", + Subscription: subId, + MaxUndeliveredMessages: defaultMaxUndeliveredMessages, + } + + acc := &testutil.Accumulator{} + if err := ps.Start(acc); err != nil { + t.Fatalf("test PubSub failed to start: %s", err) + } + defer ps.Stop() + + if ps.sub == nil { + t.Fatal("expected plugin subscription to be non-nil") + } + + testTracker := &testTracker{} + msg := &testMsg{ + value: msgInflux, + tracker: testTracker, + } + sub.messages <- msg + + acc.Wait(1) + assert.Equal(t, acc.NFields(), 1) + metric := acc.Metrics[0] + validateTestInfluxMetric(t, metric) +} + +func TestRunInvalidMessages(t *testing.T) { + subId := "sub-invalid-messages" + + testParser, _ := parsers.NewInfluxParser() + + sub := &stubSub{ + id: subId, + messages: make(chan *testMsg, 100), + } + + ps := &PubSub{ + parser: testParser, + stubSub: func() subscription { return sub }, + Project: "projectIDontMatterForTests", + Subscription: subId, + MaxUndeliveredMessages: defaultMaxUndeliveredMessages, + } + + acc := &testutil.Accumulator{} + + if err := ps.Start(acc); err != nil { + t.Fatalf("test PubSub failed to start: %s", err) + } + defer ps.Stop() + if ps.sub == nil { + t.Fatal("expected plugin subscription to be non-nil") + } + + testTracker := &testTracker{} + msg := &testMsg{ + value: "~invalidInfluxMsg~", + tracker: testTracker, + } + sub.messages <- msg + + acc.WaitError(1) + + // Make sure we acknowledged message so we don't receive it again. + testTracker.WaitForAck(1) + + assert.Equal(t, acc.NFields(), 0) +} + +func TestRunOverlongMessages(t *testing.T) { + subId := "sub-message-too-long" + + acc := &testutil.Accumulator{} + + testParser, _ := parsers.NewInfluxParser() + + sub := &stubSub{ + id: subId, + messages: make(chan *testMsg, 100), + } + + ps := &PubSub{ + parser: testParser, + stubSub: func() subscription { return sub }, + Project: "projectIDontMatterForTests", + Subscription: subId, + MaxUndeliveredMessages: defaultMaxUndeliveredMessages, + // Add MaxMessageLen Param + MaxMessageLen: 1, + } + + if err := ps.Start(acc); err != nil { + t.Fatalf("test PubSub failed to start: %s", err) + } + defer ps.Stop() + if ps.sub == nil { + t.Fatal("expected plugin subscription to be non-nil") + } + + testTracker := &testTracker{} + msg := &testMsg{ + value: msgInflux, + tracker: testTracker, + } + sub.messages <- msg + + acc.WaitError(1) + + // Make sure we acknowledged message so we don't receive it again. + testTracker.WaitForAck(1) + + assert.Equal(t, acc.NFields(), 0) +} + +func validateTestInfluxMetric(t *testing.T, m *testutil.Metric) { + assert.Equal(t, "cpu_load_short", m.Measurement) + assert.Equal(t, "server01", m.Tags["host"]) + assert.Equal(t, 23422.0, m.Fields["value"]) + assert.Equal(t, int64(1422568543702900257), m.Time.UnixNano()) +} diff --git a/plugins/inputs/cloud_pubsub/subscription_gcp.go b/plugins/inputs/cloud_pubsub/subscription_gcp.go new file mode 100644 index 000000000..f436d5219 --- /dev/null +++ b/plugins/inputs/cloud_pubsub/subscription_gcp.go @@ -0,0 +1,68 @@ +package cloud_pubsub + +import ( + "cloud.google.com/go/pubsub" + "context" + "time" +) + +type ( + subscription interface { + ID() string + Receive(ctx context.Context, f func(context.Context, message)) error + } + + message interface { + Ack() + Nack() + ID() string + Data() []byte + Attributes() map[string]string + PublishTime() time.Time + } + + gcpSubscription struct { + sub *pubsub.Subscription + } + + gcpMessage struct { + msg *pubsub.Message + } +) + +func (s *gcpSubscription) ID() string { + if s.sub == nil { + return "" + } + return s.sub.ID() +} + +func (s *gcpSubscription) Receive(ctx context.Context, f func(context.Context, message)) error { + return s.sub.Receive(ctx, func(cctx context.Context, m *pubsub.Message) { + f(cctx, &gcpMessage{m}) + }) +} + +func (env *gcpMessage) Ack() { + env.msg.Ack() +} + +func (env *gcpMessage) Nack() { + env.msg.Nack() +} + +func (env *gcpMessage) ID() string { + return env.msg.ID +} + +func (env *gcpMessage) Data() []byte { + return env.msg.Data +} + +func (env *gcpMessage) Attributes() map[string]string { + return env.msg.Attributes +} + +func (env *gcpMessage) PublishTime() time.Time { + return env.msg.PublishTime +} diff --git a/plugins/inputs/cloud_pubsub/subscription_stub.go b/plugins/inputs/cloud_pubsub/subscription_stub.go new file mode 100644 index 000000000..018c5472c --- /dev/null +++ b/plugins/inputs/cloud_pubsub/subscription_stub.go @@ -0,0 +1,104 @@ +package cloud_pubsub + +import ( + "context" + "sync" + "time" +) + +type stubSub struct { + id string + messages chan *testMsg +} + +func (s *stubSub) ID() string { + return s.id +} + +func (s *stubSub) Receive(ctx context.Context, f func(context.Context, message)) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case m := <-s.messages: + f(ctx, m) + } + } +} + +type testMsg struct { + id string + value string + attributes map[string]string + publishTime time.Time + + tracker *testTracker +} + +func (tm *testMsg) Ack() { + tm.tracker.Ack() +} + +func (tm *testMsg) Nack() { + tm.tracker.Nack() +} + +func (tm *testMsg) ID() string { + return tm.id +} + +func (tm *testMsg) Data() []byte { + return []byte(tm.value) +} + +func (tm *testMsg) Attributes() map[string]string { + return tm.attributes +} + +func (tm *testMsg) PublishTime() time.Time { + return tm.publishTime +} + +type testTracker struct { + sync.Mutex + *sync.Cond + + numAcks int + numNacks int +} + +func (t *testTracker) WaitForAck(num int) { + t.Lock() + if t.Cond == nil { + t.Cond = sync.NewCond(&t.Mutex) + } + for t.numAcks < num { + t.Wait() + } + t.Unlock() +} + +func (t *testTracker) WaitForNack(num int) { + t.Lock() + if t.Cond == nil { + t.Cond = sync.NewCond(&t.Mutex) + } + for t.numNacks < num { + t.Wait() + } + t.Unlock() +} + +func (t *testTracker) Ack() { + t.Lock() + defer t.Unlock() + + t.numAcks++ +} + +func (t *testTracker) Nack() { + t.Lock() + defer t.Unlock() + + t.numNacks++ +}