diff --git a/Gopkg.lock b/Gopkg.lock index ba4564a3d..c110818ec 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1588,6 +1588,7 @@ "golang.org/x/sys/windows/svc", "golang.org/x/sys/windows/svc/mgr", "google.golang.org/api/option", + "google.golang.org/api/support/bundler", "google.golang.org/genproto/googleapis/api/metric", "google.golang.org/genproto/googleapis/api/monitoredres", "google.golang.org/genproto/googleapis/monitoring/v3", diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 94c1421b5..a5d2a44da 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -5,6 +5,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/amqp" _ "github.com/influxdata/telegraf/plugins/outputs/application_insights" _ "github.com/influxdata/telegraf/plugins/outputs/azure_monitor" + _ "github.com/influxdata/telegraf/plugins/outputs/cloud_pubsub" _ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch" _ "github.com/influxdata/telegraf/plugins/outputs/cratedb" _ "github.com/influxdata/telegraf/plugins/outputs/datadog" diff --git a/plugins/outputs/cloud_pubsub/README.md b/plugins/outputs/cloud_pubsub/README.md new file mode 100644 index 000000000..5c345de4b --- /dev/null +++ b/plugins/outputs/cloud_pubsub/README.md @@ -0,0 +1,61 @@ +# Google Cloud PubSub Output Plugin + +The GCP PubSub plugin publishes metrics to a [Google Cloud PubSub][pubsub] topic +as one of the supported [output data formats][]. + + +### Configuration + +This section contains the default TOML to configure the plugin. You can +generate it using `telegraf --usage pubsub`. + +```toml +[[inputs.pubsub]] + ## Required. Name of Google Cloud Platform (GCP) Project that owns + ## the given PubSub subscription. + project = "my-project" + + ## Required. Name of PubSub subscription to ingest metrics from. + subscription = "my-subscription" + + ## Required. Data format to consume. + ## Each data format has its own unique set of configuration options. + ## Read more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" + + ## Optional. Filepath for GCP credentials JSON file to authorize calls to + ## PubSub APIs. If not set explicitly, Telegraf will attempt to use + ## Application Default Credentials, which is preferred. + # credentials_file = "path/to/my/creds.json" + + ## Optional. If true, will send all metrics per write in one PubSub message. + # send_batched = true + + ## The following publish_* parameters specifically configures batching + ## requests made to the GCP Cloud PubSub API via the PubSub Golang library. Read + ## more here: https://godoc.org/cloud.google.com/go/pubsub#PublishSettings + + ## Optional. Send a request to PubSub (i.e. actually publish a batch) + ## when it has this many PubSub messages. If send_batched is true, + ## this is ignored and treated as if it were 1. + # publish_count_threshold = 1000 + + ## Optional. Send a request to PubSub (i.e. actually publish a batch) + ## when it has this many PubSub messages. If send_batched is true, + ## this is ignored and treated as if it were 1 + # publish_byte_threshold = 1000000 + + ## Optional. Specifically configures requests made to the PubSub API. + # publish_num_go_routines = 2 + + ## Optional. Specifies a timeout for requests to the PubSub API. + # publish_timeout = "30s" + + ## Optional. PubSub attributes to add to metrics. + # [[inputs.pubsub.attributes]] + # my_attr = "tag_value" +``` + +[pubsub]: https://cloud.google.com/pubsub +[output data formats]: /docs/DATA_FORMATS_OUTPUT.md diff --git a/plugins/outputs/cloud_pubsub/pubsub.go b/plugins/outputs/cloud_pubsub/pubsub.go new file mode 100644 index 000000000..9811af1d7 --- /dev/null +++ b/plugins/outputs/cloud_pubsub/pubsub.go @@ -0,0 +1,263 @@ +package cloud_pubsub + +import ( + "cloud.google.com/go/pubsub" + "context" + "fmt" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/serializers" + "golang.org/x/oauth2/google" + "google.golang.org/api/option" + "sync" +) + +const sampleConfig = ` +[[inputs.pubsub]] + ## Required. Name of Google Cloud Platform (GCP) Project that owns + ## the given PubSub subscription. + project = "my-project" + + ## Required. Name of PubSub subscription to ingest metrics from. + subscription = "my-subscription" + + ## Required. Data format to consume. + ## Each data format has its own unique set of configuration options. + ## Read more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" + + ## Optional. Filepath for GCP credentials JSON file to authorize calls to + ## PubSub APIs. If not set explicitly, Telegraf will attempt to use + ## Application Default Credentials, which is preferred. + # credentials_file = "path/to/my/creds.json" + + ## Optional. If true, will send all metrics per write in one PubSub message. + # send_batched = true + + ## The following publish_* parameters specifically configures batching + ## requests made to the GCP Cloud PubSub API via the PubSub Golang library. Read + ## more here: https://godoc.org/cloud.google.com/go/pubsub#PublishSettings + + ## Optional. Send a request to PubSub (i.e. actually publish a batch) + ## when it has this many PubSub messages. If send_batched is true, + ## this is ignored and treated as if it were 1. + # publish_count_threshold = 1000 + + ## Optional. Send a request to PubSub (i.e. actually publish a batch) + ## when it has this many PubSub messages. If send_batched is true, + ## this is ignored and treated as if it were 1 + # publish_byte_threshold = 1000000 + + ## Optional. Specifically configures requests made to the PubSub API. + # publish_num_go_routines = 2 + + ## Optional. Specifies a timeout for requests to the PubSub API. + # publish_timeout = "30s" + + ## Optional. PubSub attributes to add to metrics. + # [[inputs.pubsub.attributes]] + # my_attr = "tag_value" +` + +type PubSub struct { + CredentialsFile string `toml:"credentials_file"` + Project string `toml:"project"` + Topic string `toml:"topic"` + Attributes map[string]string `toml:"attributes"` + + SendBatched bool `toml:"send_batched"` + PublishCountThreshold int `toml:"publish_count_threshold"` + PublishByteThreshold int `toml:"publish_byte_threshold"` + PublishNumGoroutines int `toml:"publish_num_go_routines"` + PublishTimeout internal.Duration `toml:"publish_timeout"` + + t topic + c *pubsub.Client + + stubTopic func(id string) topic + + serializer serializers.Serializer + publishResults []publishResult +} + +func (ps *PubSub) Description() string { + return "Publish Telegraf metrics to a Google Cloud PubSub topic" +} + +func (ps *PubSub) SampleConfig() string { + return sampleConfig +} + +func (ps *PubSub) SetSerializer(serializer serializers.Serializer) { + ps.serializer = serializer +} + +func (ps *PubSub) Connect() error { + if ps.Topic == "" { + return fmt.Errorf(`"topic" is required`) + } + + if ps.Project == "" { + return fmt.Errorf(`"project" is required`) + } + + if ps.stubTopic == nil { + return ps.initPubSubClient() + } else { + return nil + } +} + +func (ps *PubSub) Close() error { + if ps.t != nil { + ps.t.Stop() + } + return nil +} + +func (ps *PubSub) Write(metrics []telegraf.Metric) error { + ps.refreshTopic() + + // Serialize metrics and package into appropriate PubSub messages + msgs, err := ps.toMessages(metrics) + if err != nil { + return err + } + + cctx, cancel := context.WithCancel(context.Background()) + + // Publish all messages - each call to Publish returns a future. + ps.publishResults = make([]publishResult, len(msgs)) + for i, m := range msgs { + ps.publishResults[i] = ps.t.Publish(cctx, m) + } + + // topic.Stop() forces all published messages to be sent, even + // if PubSub batch limits have not been reached. + go ps.t.Stop() + + return ps.waitForResults(cctx, cancel) +} + +func (ps *PubSub) initPubSubClient() error { + var credsOpt option.ClientOption + if ps.CredentialsFile != "" { + credsOpt = option.WithCredentialsFile(ps.CredentialsFile) + } else { + creds, err := google.FindDefaultCredentials(context.Background(), pubsub.ScopeCloudPlatform) + if err != nil { + return fmt.Errorf( + "unable to find GCP Application Default Credentials: %v."+ + "Either set ADC or provide CredentialsFile config", err) + } + credsOpt = option.WithCredentials(creds) + } + client, err := pubsub.NewClient( + context.Background(), + ps.Project, + credsOpt, + option.WithScopes(pubsub.ScopeCloudPlatform), + option.WithUserAgent(internal.ProductToken()), + ) + if err != nil { + return fmt.Errorf("unable to generate PubSub client: %v", err) + } + ps.c = client + return nil +} + +func (ps *PubSub) refreshTopic() { + if ps.stubTopic != nil { + ps.t = ps.stubTopic(ps.Topic) + } else { + t := ps.c.Topic(ps.Topic) + ps.t = &topicWrapper{t} + } + ps.t.SetPublishSettings(ps.publishSettings()) +} + +func (ps *PubSub) publishSettings() pubsub.PublishSettings { + settings := pubsub.PublishSettings{} + if ps.PublishNumGoroutines > 0 { + settings.NumGoroutines = ps.PublishNumGoroutines + } + + if ps.PublishTimeout.Duration > 0 { + settings.CountThreshold = 1 + } + + if ps.SendBatched { + settings.CountThreshold = 1 + } else if ps.PublishCountThreshold > 0 { + settings.CountThreshold = ps.PublishCountThreshold + } + + if ps.PublishByteThreshold > 0 { + settings.ByteThreshold = ps.PublishByteThreshold + } + + return settings +} + +func (ps *PubSub) toMessages(metrics []telegraf.Metric) ([]*pubsub.Message, error) { + if ps.SendBatched { + b, err := ps.serializer.SerializeBatch(metrics) + if err != nil { + return nil, err + } + msg := &pubsub.Message{Data: b} + if ps.Attributes != nil { + msg.Attributes = ps.Attributes + } + return []*pubsub.Message{msg}, nil + } + + msgs := make([]*pubsub.Message, len(metrics)) + for i, m := range metrics { + b, err := ps.serializer.Serialize(m) + if err != nil { + return nil, err + } + msgs[i] = &pubsub.Message{ + Data: b, + } + if ps.Attributes != nil { + msgs[i].Attributes = ps.Attributes + } + } + + return msgs, nil +} + +func (ps *PubSub) waitForResults(ctx context.Context, cancel context.CancelFunc) error { + var pErr error + var setErr sync.Once + var wg sync.WaitGroup + + for _, pr := range ps.publishResults { + wg.Add(1) + + go func(r publishResult) { + defer wg.Done() + // Wait on each future + _, err := r.Get(ctx) + if err != nil { + setErr.Do(func() { + pErr = err + cancel() + }) + } + }(pr) + } + + wg.Wait() + return pErr +} + +func init() { + outputs.Add("cloud_pubsub", func() telegraf.Output { + return &PubSub{} + }) +} diff --git a/plugins/outputs/cloud_pubsub/pubsub_test.go b/plugins/outputs/cloud_pubsub/pubsub_test.go new file mode 100644 index 000000000..a60f05eb0 --- /dev/null +++ b/plugins/outputs/cloud_pubsub/pubsub_test.go @@ -0,0 +1,167 @@ +package cloud_pubsub + +import ( + "cloud.google.com/go/pubsub" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestPubSub_WriteSingle(t *testing.T) { + + testMetrics := []testMetric{ + {testutil.TestMetric("value_1", "test"), false /*return error */}, + } + + settings := pubsub.DefaultPublishSettings + settings.CountThreshold = 1 + ps, topic, metrics := getTestResources(t, settings, testMetrics) + + err := ps.Write(metrics) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + + for _, testM := range testMetrics { + verifyMetricPublished(t, testM.m, topic.published) + } +} + +func TestPubSub_WriteWithAttribute(t *testing.T) { + testMetrics := []testMetric{ + {testutil.TestMetric("value_1", "test"), false /*return error*/}, + } + + settings := pubsub.DefaultPublishSettings + ps, topic, metrics := getTestResources(t, settings, testMetrics) + ps.Attributes = map[string]string{ + "foo1": "bar1", + "foo2": "bar2", + } + + err := ps.Write(metrics) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + + for _, testM := range testMetrics { + msg := verifyMetricPublished(t, testM.m, topic.published) + assert.Equalf(t, "bar1", msg.Attributes["foo1"], "expected attribute foo1=bar1") + assert.Equalf(t, "bar2", msg.Attributes["foo2"], "expected attribute foo2=bar2") + } +} + +func TestPubSub_WriteMultiple(t *testing.T) { + testMetrics := []testMetric{ + {testutil.TestMetric("value_1", "test"), false /*return error*/}, + {testutil.TestMetric("value_2", "test"), false}, + } + + settings := pubsub.DefaultPublishSettings + + ps, topic, metrics := getTestResources(t, settings, testMetrics) + + err := ps.Write(metrics) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + + for _, testM := range testMetrics { + verifyMetricPublished(t, testM.m, topic.published) + } + assert.Equalf(t, 1, topic.bundleCount, "unexpected bundle count") +} + +func TestPubSub_WriteOverCountThreshold(t *testing.T) { + testMetrics := []testMetric{ + {testutil.TestMetric("value_1", "test"), false /*return error*/}, + {testutil.TestMetric("value_2", "test"), false}, + {testutil.TestMetric("value_3", "test"), false}, + {testutil.TestMetric("value_4", "test"), false}, + } + + settings := pubsub.DefaultPublishSettings + settings.CountThreshold = 2 + + ps, topic, metrics := getTestResources(t, settings, testMetrics) + + err := ps.Write(metrics) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + + for _, testM := range testMetrics { + verifyMetricPublished(t, testM.m, topic.published) + } + assert.Equalf(t, 2, topic.bundleCount, "unexpected bundle count") +} + +func TestPubSub_WriteOverByteThreshold(t *testing.T) { + testMetrics := []testMetric{ + {testutil.TestMetric("value_1", "test"), false /*return error*/}, + {testutil.TestMetric("value_2", "test"), false}, + } + + settings := pubsub.DefaultPublishSettings + settings.CountThreshold = 10 + settings.ByteThreshold = 1 + + ps, topic, metrics := getTestResources(t, settings, testMetrics) + + err := ps.Write(metrics) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + + for _, testM := range testMetrics { + verifyMetricPublished(t, testM.m, topic.published) + } + assert.Equalf(t, 2, topic.bundleCount, "unexpected bundle count") +} + +func TestPubSub_Error(t *testing.T) { + testMetrics := []testMetric{ + // Force this batch to return error + {testutil.TestMetric("value_1", "test"), true}, + {testutil.TestMetric("value_2", "test"), false}, + } + + settings := pubsub.DefaultPublishSettings + ps, _, metrics := getTestResources(t, settings, testMetrics) + + err := ps.Write(metrics) + if err == nil { + t.Fatalf("expected error") + } + if err.Error() != errMockFail { + t.Fatalf("expected fake error, got %v", err) + } +} + +func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string]*pubsub.Message) *pubsub.Message { + p, _ := parsers.NewInfluxParser() + + v, _ := m.GetField("value") + psMsg, ok := published[v.(string)] + if !ok { + t.Fatalf("expected metric to get published (value: %s)", v.(string)) + } + + parsed, err := p.Parse(psMsg.Data) + if err != nil { + t.Fatalf("could not parse influxdb metric from published message: %s", string(psMsg.Data)) + } + if len(parsed) > 1 { + t.Fatalf("expected only one influxdb metric per published message, got %d", len(published)) + } + + publishedV, ok := parsed[0].GetField("value") + if !ok { + t.Fatalf("expected published metric to have a value") + } + assert.Equal(t, v, publishedV, "incorrect published value") + + return psMsg +} diff --git a/plugins/outputs/cloud_pubsub/topic_gcp.go b/plugins/outputs/cloud_pubsub/topic_gcp.go new file mode 100644 index 000000000..a85c6f39e --- /dev/null +++ b/plugins/outputs/cloud_pubsub/topic_gcp.go @@ -0,0 +1,46 @@ +package cloud_pubsub + +import ( + "cloud.google.com/go/pubsub" + "context" +) + +type ( + topicFactory func(string) (topic, error) + + topic interface { + ID() string + Stop() + Publish(ctx context.Context, msg *pubsub.Message) publishResult + PublishSettings() pubsub.PublishSettings + SetPublishSettings(settings pubsub.PublishSettings) + } + + publishResult interface { + Get(ctx context.Context) (string, error) + } + + topicWrapper struct { + topic *pubsub.Topic + } +) + +func (tw *topicWrapper) ID() string { + return tw.topic.ID() +} + +func (tw *topicWrapper) Stop() { + tw.topic.Stop() +} + +func (tw *topicWrapper) Publish(ctx context.Context, msg *pubsub.Message) publishResult { + return tw.topic.Publish(ctx, msg) +} + +func (tw *topicWrapper) PublishSettings() pubsub.PublishSettings { + return tw.topic.PublishSettings +} + +func (tw *topicWrapper) SetPublishSettings(settings pubsub.PublishSettings) { + tw.topic.PublishSettings = settings +} diff --git a/plugins/outputs/cloud_pubsub/topic_stubbed.go b/plugins/outputs/cloud_pubsub/topic_stubbed.go new file mode 100644 index 000000000..fdae70bc3 --- /dev/null +++ b/plugins/outputs/cloud_pubsub/topic_stubbed.go @@ -0,0 +1,199 @@ +package cloud_pubsub + +import ( + "cloud.google.com/go/pubsub" + "context" + "errors" + "fmt" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/serializers" + "google.golang.org/api/support/bundler" + "runtime" + "sync" + "testing" +) + +const ( + errMockFail = "this is an error" +) + +type ( + testMetric struct { + m telegraf.Metric + returnErr bool + } + + bundledMsg struct { + *pubsub.Message + *stubResult + } + + stubResult struct { + metricIds []string + + sendError bool + err chan error + done chan struct{} + } + + stubTopic struct { + Settings pubsub.PublishSettings + ReturnErr map[string]bool + parsers.Parser + *testing.T + + stopped bool + pLock sync.Mutex + + published map[string]*pubsub.Message + + bundler *bundler.Bundler + bLock sync.Mutex + bundleCount int + } +) + +func getTestResources(tT *testing.T, settings pubsub.PublishSettings, testM []testMetric) (*PubSub, *stubTopic, []telegraf.Metric) { + s, _ := serializers.NewInfluxSerializer() + + metrics := make([]telegraf.Metric, len(testM)) + t := &stubTopic{ + T: tT, + ReturnErr: make(map[string]bool), + published: make(map[string]*pubsub.Message), + } + + for i, tm := range testM { + metrics[i] = tm.m + if tm.returnErr { + v, _ := tm.m.GetField("value") + t.ReturnErr[v.(string)] = true + } + } + + ps := &PubSub{ + Project: "test-project", + Topic: "test-topic", + stubTopic: func(string) topic { return t }, + PublishCountThreshold: settings.CountThreshold, + PublishByteThreshold: settings.ByteThreshold, + PublishNumGoroutines: settings.NumGoroutines, + PublishTimeout: internal.Duration{Duration: settings.Timeout}, + } + ps.SetSerializer(s) + + return ps, t, metrics +} + +func (t *stubTopic) ID() string { + return "test-topic" +} + +func (t *stubTopic) Stop() { + t.pLock.Lock() + defer t.pLock.Unlock() + + t.stopped = true + t.bundler.Flush() +} + +func (t *stubTopic) Publish(ctx context.Context, msg *pubsub.Message) publishResult { + t.pLock.Lock() + defer t.pLock.Unlock() + + if t.stopped || ctx.Err() != nil { + t.Fatalf("publish called after stop") + } + + ids := t.parseIDs(msg) + r := &stubResult{ + metricIds: ids, + err: make(chan error, 1), + done: make(chan struct{}, 1), + } + + for _, id := range ids { + _, ok := t.ReturnErr[id] + r.sendError = r.sendError || ok + } + + bundled := &bundledMsg{msg, r} + err := t.bundler.Add(bundled, len(msg.Data)) + if err != nil { + t.Fatalf("unexpected error while adding to bundle: %v", err) + } + return r +} + +func (t *stubTopic) PublishSettings() pubsub.PublishSettings { + return t.Settings +} + +func (t *stubTopic) SetPublishSettings(settings pubsub.PublishSettings) { + t.Settings = settings + t.initBundler() +} + +func (t *stubTopic) initBundler() *stubTopic { + t.bundler = bundler.NewBundler(&bundledMsg{}, t.sendBundle()) + t.bundler.DelayThreshold = t.Settings.DelayThreshold + t.bundler.BundleCountThreshold = t.Settings.CountThreshold + if t.bundler.BundleCountThreshold > pubsub.MaxPublishRequestCount { + t.bundler.BundleCountThreshold = pubsub.MaxPublishRequestCount + } + t.bundler.BundleByteThreshold = t.Settings.ByteThreshold + t.bundler.BundleByteLimit = pubsub.MaxPublishRequestBytes + t.bundler.HandlerLimit = 25 * runtime.GOMAXPROCS(0) + + return t +} + +func (t *stubTopic) sendBundle() func(items interface{}) { + return func(items interface{}) { + t.bLock.Lock() + defer t.bLock.Unlock() + + bundled := items.([]*bundledMsg) + + for _, msg := range bundled { + r := msg.stubResult + if r.sendError { + r.err <- errors.New(errMockFail) + } else { + r.done <- struct{}{} + } + for _, id := range r.metricIds { + t.published[id] = msg.Message + } + } + + t.bundleCount++ + } +} + +func (t *stubTopic) parseIDs(msg *pubsub.Message) []string { + p, _ := parsers.NewInfluxParser() + metrics, err := p.Parse(msg.Data) + if err != nil { + t.Fatalf("unexpected parsing error: %v", err) + } + ids := make([]string, len(metrics)) + for i, met := range metrics { + id, _ := met.GetField("value") + ids[i] = id.(string) + } + return ids +} + +func (r *stubResult) Get(ctx context.Context) (string, error) { + select { + case <-ctx.Done(): + return "", ctx.Err() + case err := <-r.err: + return "", err + case <-r.done: + return fmt.Sprintf("id-%s", r.metricIds[0]), nil + } +}