Add GCP Cloud Pubsub output plugin (#5202)

This commit is contained in:
emily 2019-01-08 15:53:02 -08:00 committed by Daniel Nelson
parent 2474a3a54b
commit f5f85aa74f
7 changed files with 738 additions and 0 deletions

1
Gopkg.lock generated
View File

@ -1588,6 +1588,7 @@
"golang.org/x/sys/windows/svc",
"golang.org/x/sys/windows/svc/mgr",
"google.golang.org/api/option",
"google.golang.org/api/support/bundler",
"google.golang.org/genproto/googleapis/api/metric",
"google.golang.org/genproto/googleapis/api/monitoredres",
"google.golang.org/genproto/googleapis/monitoring/v3",

View File

@ -5,6 +5,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/amqp"
_ "github.com/influxdata/telegraf/plugins/outputs/application_insights"
_ "github.com/influxdata/telegraf/plugins/outputs/azure_monitor"
_ "github.com/influxdata/telegraf/plugins/outputs/cloud_pubsub"
_ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch"
_ "github.com/influxdata/telegraf/plugins/outputs/cratedb"
_ "github.com/influxdata/telegraf/plugins/outputs/datadog"

View File

@ -0,0 +1,61 @@
# Google Cloud PubSub Output Plugin
The GCP PubSub plugin publishes metrics to a [Google Cloud PubSub][pubsub] topic
as one of the supported [output data formats][].
### Configuration
This section contains the default TOML to configure the plugin. You can
generate it using `telegraf --usage pubsub`.
```toml
[[inputs.pubsub]]
## Required. Name of Google Cloud Platform (GCP) Project that owns
## the given PubSub subscription.
project = "my-project"
## Required. Name of PubSub subscription to ingest metrics from.
subscription = "my-subscription"
## Required. Data format to consume.
## Each data format has its own unique set of configuration options.
## Read more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Optional. Filepath for GCP credentials JSON file to authorize calls to
## PubSub APIs. If not set explicitly, Telegraf will attempt to use
## Application Default Credentials, which is preferred.
# credentials_file = "path/to/my/creds.json"
## Optional. If true, will send all metrics per write in one PubSub message.
# send_batched = true
## The following publish_* parameters specifically configures batching
## requests made to the GCP Cloud PubSub API via the PubSub Golang library. Read
## more here: https://godoc.org/cloud.google.com/go/pubsub#PublishSettings
## Optional. Send a request to PubSub (i.e. actually publish a batch)
## when it has this many PubSub messages. If send_batched is true,
## this is ignored and treated as if it were 1.
# publish_count_threshold = 1000
## Optional. Send a request to PubSub (i.e. actually publish a batch)
## when it has this many PubSub messages. If send_batched is true,
## this is ignored and treated as if it were 1
# publish_byte_threshold = 1000000
## Optional. Specifically configures requests made to the PubSub API.
# publish_num_go_routines = 2
## Optional. Specifies a timeout for requests to the PubSub API.
# publish_timeout = "30s"
## Optional. PubSub attributes to add to metrics.
# [[inputs.pubsub.attributes]]
# my_attr = "tag_value"
```
[pubsub]: https://cloud.google.com/pubsub
[output data formats]: /docs/DATA_FORMATS_OUTPUT.md

View File

@ -0,0 +1,263 @@
package cloud_pubsub
import (
"cloud.google.com/go/pubsub"
"context"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
"golang.org/x/oauth2/google"
"google.golang.org/api/option"
"sync"
)
const sampleConfig = `
[[inputs.pubsub]]
## Required. Name of Google Cloud Platform (GCP) Project that owns
## the given PubSub subscription.
project = "my-project"
## Required. Name of PubSub subscription to ingest metrics from.
subscription = "my-subscription"
## Required. Data format to consume.
## Each data format has its own unique set of configuration options.
## Read more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Optional. Filepath for GCP credentials JSON file to authorize calls to
## PubSub APIs. If not set explicitly, Telegraf will attempt to use
## Application Default Credentials, which is preferred.
# credentials_file = "path/to/my/creds.json"
## Optional. If true, will send all metrics per write in one PubSub message.
# send_batched = true
## The following publish_* parameters specifically configures batching
## requests made to the GCP Cloud PubSub API via the PubSub Golang library. Read
## more here: https://godoc.org/cloud.google.com/go/pubsub#PublishSettings
## Optional. Send a request to PubSub (i.e. actually publish a batch)
## when it has this many PubSub messages. If send_batched is true,
## this is ignored and treated as if it were 1.
# publish_count_threshold = 1000
## Optional. Send a request to PubSub (i.e. actually publish a batch)
## when it has this many PubSub messages. If send_batched is true,
## this is ignored and treated as if it were 1
# publish_byte_threshold = 1000000
## Optional. Specifically configures requests made to the PubSub API.
# publish_num_go_routines = 2
## Optional. Specifies a timeout for requests to the PubSub API.
# publish_timeout = "30s"
## Optional. PubSub attributes to add to metrics.
# [[inputs.pubsub.attributes]]
# my_attr = "tag_value"
`
type PubSub struct {
CredentialsFile string `toml:"credentials_file"`
Project string `toml:"project"`
Topic string `toml:"topic"`
Attributes map[string]string `toml:"attributes"`
SendBatched bool `toml:"send_batched"`
PublishCountThreshold int `toml:"publish_count_threshold"`
PublishByteThreshold int `toml:"publish_byte_threshold"`
PublishNumGoroutines int `toml:"publish_num_go_routines"`
PublishTimeout internal.Duration `toml:"publish_timeout"`
t topic
c *pubsub.Client
stubTopic func(id string) topic
serializer serializers.Serializer
publishResults []publishResult
}
func (ps *PubSub) Description() string {
return "Publish Telegraf metrics to a Google Cloud PubSub topic"
}
func (ps *PubSub) SampleConfig() string {
return sampleConfig
}
func (ps *PubSub) SetSerializer(serializer serializers.Serializer) {
ps.serializer = serializer
}
func (ps *PubSub) Connect() error {
if ps.Topic == "" {
return fmt.Errorf(`"topic" is required`)
}
if ps.Project == "" {
return fmt.Errorf(`"project" is required`)
}
if ps.stubTopic == nil {
return ps.initPubSubClient()
} else {
return nil
}
}
func (ps *PubSub) Close() error {
if ps.t != nil {
ps.t.Stop()
}
return nil
}
func (ps *PubSub) Write(metrics []telegraf.Metric) error {
ps.refreshTopic()
// Serialize metrics and package into appropriate PubSub messages
msgs, err := ps.toMessages(metrics)
if err != nil {
return err
}
cctx, cancel := context.WithCancel(context.Background())
// Publish all messages - each call to Publish returns a future.
ps.publishResults = make([]publishResult, len(msgs))
for i, m := range msgs {
ps.publishResults[i] = ps.t.Publish(cctx, m)
}
// topic.Stop() forces all published messages to be sent, even
// if PubSub batch limits have not been reached.
go ps.t.Stop()
return ps.waitForResults(cctx, cancel)
}
func (ps *PubSub) initPubSubClient() error {
var credsOpt option.ClientOption
if ps.CredentialsFile != "" {
credsOpt = option.WithCredentialsFile(ps.CredentialsFile)
} else {
creds, err := google.FindDefaultCredentials(context.Background(), pubsub.ScopeCloudPlatform)
if err != nil {
return fmt.Errorf(
"unable to find GCP Application Default Credentials: %v."+
"Either set ADC or provide CredentialsFile config", err)
}
credsOpt = option.WithCredentials(creds)
}
client, err := pubsub.NewClient(
context.Background(),
ps.Project,
credsOpt,
option.WithScopes(pubsub.ScopeCloudPlatform),
option.WithUserAgent(internal.ProductToken()),
)
if err != nil {
return fmt.Errorf("unable to generate PubSub client: %v", err)
}
ps.c = client
return nil
}
func (ps *PubSub) refreshTopic() {
if ps.stubTopic != nil {
ps.t = ps.stubTopic(ps.Topic)
} else {
t := ps.c.Topic(ps.Topic)
ps.t = &topicWrapper{t}
}
ps.t.SetPublishSettings(ps.publishSettings())
}
func (ps *PubSub) publishSettings() pubsub.PublishSettings {
settings := pubsub.PublishSettings{}
if ps.PublishNumGoroutines > 0 {
settings.NumGoroutines = ps.PublishNumGoroutines
}
if ps.PublishTimeout.Duration > 0 {
settings.CountThreshold = 1
}
if ps.SendBatched {
settings.CountThreshold = 1
} else if ps.PublishCountThreshold > 0 {
settings.CountThreshold = ps.PublishCountThreshold
}
if ps.PublishByteThreshold > 0 {
settings.ByteThreshold = ps.PublishByteThreshold
}
return settings
}
func (ps *PubSub) toMessages(metrics []telegraf.Metric) ([]*pubsub.Message, error) {
if ps.SendBatched {
b, err := ps.serializer.SerializeBatch(metrics)
if err != nil {
return nil, err
}
msg := &pubsub.Message{Data: b}
if ps.Attributes != nil {
msg.Attributes = ps.Attributes
}
return []*pubsub.Message{msg}, nil
}
msgs := make([]*pubsub.Message, len(metrics))
for i, m := range metrics {
b, err := ps.serializer.Serialize(m)
if err != nil {
return nil, err
}
msgs[i] = &pubsub.Message{
Data: b,
}
if ps.Attributes != nil {
msgs[i].Attributes = ps.Attributes
}
}
return msgs, nil
}
func (ps *PubSub) waitForResults(ctx context.Context, cancel context.CancelFunc) error {
var pErr error
var setErr sync.Once
var wg sync.WaitGroup
for _, pr := range ps.publishResults {
wg.Add(1)
go func(r publishResult) {
defer wg.Done()
// Wait on each future
_, err := r.Get(ctx)
if err != nil {
setErr.Do(func() {
pErr = err
cancel()
})
}
}(pr)
}
wg.Wait()
return pErr
}
func init() {
outputs.Add("cloud_pubsub", func() telegraf.Output {
return &PubSub{}
})
}

View File

@ -0,0 +1,167 @@
package cloud_pubsub
import (
"cloud.google.com/go/pubsub"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"testing"
)
func TestPubSub_WriteSingle(t *testing.T) {
testMetrics := []testMetric{
{testutil.TestMetric("value_1", "test"), false /*return error */},
}
settings := pubsub.DefaultPublishSettings
settings.CountThreshold = 1
ps, topic, metrics := getTestResources(t, settings, testMetrics)
err := ps.Write(metrics)
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
for _, testM := range testMetrics {
verifyMetricPublished(t, testM.m, topic.published)
}
}
func TestPubSub_WriteWithAttribute(t *testing.T) {
testMetrics := []testMetric{
{testutil.TestMetric("value_1", "test"), false /*return error*/},
}
settings := pubsub.DefaultPublishSettings
ps, topic, metrics := getTestResources(t, settings, testMetrics)
ps.Attributes = map[string]string{
"foo1": "bar1",
"foo2": "bar2",
}
err := ps.Write(metrics)
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
for _, testM := range testMetrics {
msg := verifyMetricPublished(t, testM.m, topic.published)
assert.Equalf(t, "bar1", msg.Attributes["foo1"], "expected attribute foo1=bar1")
assert.Equalf(t, "bar2", msg.Attributes["foo2"], "expected attribute foo2=bar2")
}
}
func TestPubSub_WriteMultiple(t *testing.T) {
testMetrics := []testMetric{
{testutil.TestMetric("value_1", "test"), false /*return error*/},
{testutil.TestMetric("value_2", "test"), false},
}
settings := pubsub.DefaultPublishSettings
ps, topic, metrics := getTestResources(t, settings, testMetrics)
err := ps.Write(metrics)
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
for _, testM := range testMetrics {
verifyMetricPublished(t, testM.m, topic.published)
}
assert.Equalf(t, 1, topic.bundleCount, "unexpected bundle count")
}
func TestPubSub_WriteOverCountThreshold(t *testing.T) {
testMetrics := []testMetric{
{testutil.TestMetric("value_1", "test"), false /*return error*/},
{testutil.TestMetric("value_2", "test"), false},
{testutil.TestMetric("value_3", "test"), false},
{testutil.TestMetric("value_4", "test"), false},
}
settings := pubsub.DefaultPublishSettings
settings.CountThreshold = 2
ps, topic, metrics := getTestResources(t, settings, testMetrics)
err := ps.Write(metrics)
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
for _, testM := range testMetrics {
verifyMetricPublished(t, testM.m, topic.published)
}
assert.Equalf(t, 2, topic.bundleCount, "unexpected bundle count")
}
func TestPubSub_WriteOverByteThreshold(t *testing.T) {
testMetrics := []testMetric{
{testutil.TestMetric("value_1", "test"), false /*return error*/},
{testutil.TestMetric("value_2", "test"), false},
}
settings := pubsub.DefaultPublishSettings
settings.CountThreshold = 10
settings.ByteThreshold = 1
ps, topic, metrics := getTestResources(t, settings, testMetrics)
err := ps.Write(metrics)
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
for _, testM := range testMetrics {
verifyMetricPublished(t, testM.m, topic.published)
}
assert.Equalf(t, 2, topic.bundleCount, "unexpected bundle count")
}
func TestPubSub_Error(t *testing.T) {
testMetrics := []testMetric{
// Force this batch to return error
{testutil.TestMetric("value_1", "test"), true},
{testutil.TestMetric("value_2", "test"), false},
}
settings := pubsub.DefaultPublishSettings
ps, _, metrics := getTestResources(t, settings, testMetrics)
err := ps.Write(metrics)
if err == nil {
t.Fatalf("expected error")
}
if err.Error() != errMockFail {
t.Fatalf("expected fake error, got %v", err)
}
}
func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string]*pubsub.Message) *pubsub.Message {
p, _ := parsers.NewInfluxParser()
v, _ := m.GetField("value")
psMsg, ok := published[v.(string)]
if !ok {
t.Fatalf("expected metric to get published (value: %s)", v.(string))
}
parsed, err := p.Parse(psMsg.Data)
if err != nil {
t.Fatalf("could not parse influxdb metric from published message: %s", string(psMsg.Data))
}
if len(parsed) > 1 {
t.Fatalf("expected only one influxdb metric per published message, got %d", len(published))
}
publishedV, ok := parsed[0].GetField("value")
if !ok {
t.Fatalf("expected published metric to have a value")
}
assert.Equal(t, v, publishedV, "incorrect published value")
return psMsg
}

View File

@ -0,0 +1,46 @@
package cloud_pubsub
import (
"cloud.google.com/go/pubsub"
"context"
)
type (
topicFactory func(string) (topic, error)
topic interface {
ID() string
Stop()
Publish(ctx context.Context, msg *pubsub.Message) publishResult
PublishSettings() pubsub.PublishSettings
SetPublishSettings(settings pubsub.PublishSettings)
}
publishResult interface {
Get(ctx context.Context) (string, error)
}
topicWrapper struct {
topic *pubsub.Topic
}
)
func (tw *topicWrapper) ID() string {
return tw.topic.ID()
}
func (tw *topicWrapper) Stop() {
tw.topic.Stop()
}
func (tw *topicWrapper) Publish(ctx context.Context, msg *pubsub.Message) publishResult {
return tw.topic.Publish(ctx, msg)
}
func (tw *topicWrapper) PublishSettings() pubsub.PublishSettings {
return tw.topic.PublishSettings
}
func (tw *topicWrapper) SetPublishSettings(settings pubsub.PublishSettings) {
tw.topic.PublishSettings = settings
}

View File

@ -0,0 +1,199 @@
package cloud_pubsub
import (
"cloud.google.com/go/pubsub"
"context"
"errors"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/serializers"
"google.golang.org/api/support/bundler"
"runtime"
"sync"
"testing"
)
const (
errMockFail = "this is an error"
)
type (
testMetric struct {
m telegraf.Metric
returnErr bool
}
bundledMsg struct {
*pubsub.Message
*stubResult
}
stubResult struct {
metricIds []string
sendError bool
err chan error
done chan struct{}
}
stubTopic struct {
Settings pubsub.PublishSettings
ReturnErr map[string]bool
parsers.Parser
*testing.T
stopped bool
pLock sync.Mutex
published map[string]*pubsub.Message
bundler *bundler.Bundler
bLock sync.Mutex
bundleCount int
}
)
func getTestResources(tT *testing.T, settings pubsub.PublishSettings, testM []testMetric) (*PubSub, *stubTopic, []telegraf.Metric) {
s, _ := serializers.NewInfluxSerializer()
metrics := make([]telegraf.Metric, len(testM))
t := &stubTopic{
T: tT,
ReturnErr: make(map[string]bool),
published: make(map[string]*pubsub.Message),
}
for i, tm := range testM {
metrics[i] = tm.m
if tm.returnErr {
v, _ := tm.m.GetField("value")
t.ReturnErr[v.(string)] = true
}
}
ps := &PubSub{
Project: "test-project",
Topic: "test-topic",
stubTopic: func(string) topic { return t },
PublishCountThreshold: settings.CountThreshold,
PublishByteThreshold: settings.ByteThreshold,
PublishNumGoroutines: settings.NumGoroutines,
PublishTimeout: internal.Duration{Duration: settings.Timeout},
}
ps.SetSerializer(s)
return ps, t, metrics
}
func (t *stubTopic) ID() string {
return "test-topic"
}
func (t *stubTopic) Stop() {
t.pLock.Lock()
defer t.pLock.Unlock()
t.stopped = true
t.bundler.Flush()
}
func (t *stubTopic) Publish(ctx context.Context, msg *pubsub.Message) publishResult {
t.pLock.Lock()
defer t.pLock.Unlock()
if t.stopped || ctx.Err() != nil {
t.Fatalf("publish called after stop")
}
ids := t.parseIDs(msg)
r := &stubResult{
metricIds: ids,
err: make(chan error, 1),
done: make(chan struct{}, 1),
}
for _, id := range ids {
_, ok := t.ReturnErr[id]
r.sendError = r.sendError || ok
}
bundled := &bundledMsg{msg, r}
err := t.bundler.Add(bundled, len(msg.Data))
if err != nil {
t.Fatalf("unexpected error while adding to bundle: %v", err)
}
return r
}
func (t *stubTopic) PublishSettings() pubsub.PublishSettings {
return t.Settings
}
func (t *stubTopic) SetPublishSettings(settings pubsub.PublishSettings) {
t.Settings = settings
t.initBundler()
}
func (t *stubTopic) initBundler() *stubTopic {
t.bundler = bundler.NewBundler(&bundledMsg{}, t.sendBundle())
t.bundler.DelayThreshold = t.Settings.DelayThreshold
t.bundler.BundleCountThreshold = t.Settings.CountThreshold
if t.bundler.BundleCountThreshold > pubsub.MaxPublishRequestCount {
t.bundler.BundleCountThreshold = pubsub.MaxPublishRequestCount
}
t.bundler.BundleByteThreshold = t.Settings.ByteThreshold
t.bundler.BundleByteLimit = pubsub.MaxPublishRequestBytes
t.bundler.HandlerLimit = 25 * runtime.GOMAXPROCS(0)
return t
}
func (t *stubTopic) sendBundle() func(items interface{}) {
return func(items interface{}) {
t.bLock.Lock()
defer t.bLock.Unlock()
bundled := items.([]*bundledMsg)
for _, msg := range bundled {
r := msg.stubResult
if r.sendError {
r.err <- errors.New(errMockFail)
} else {
r.done <- struct{}{}
}
for _, id := range r.metricIds {
t.published[id] = msg.Message
}
}
t.bundleCount++
}
}
func (t *stubTopic) parseIDs(msg *pubsub.Message) []string {
p, _ := parsers.NewInfluxParser()
metrics, err := p.Parse(msg.Data)
if err != nil {
t.Fatalf("unexpected parsing error: %v", err)
}
ids := make([]string, len(metrics))
for i, met := range metrics {
id, _ := met.GetField("value")
ids[i] = id.(string)
}
return ids
}
func (r *stubResult) Get(ctx context.Context) (string, error) {
select {
case <-ctx.Done():
return "", ctx.Err()
case err := <-r.err:
return "", err
case <-r.done:
return fmt.Sprintf("id-%s", r.metricIds[0]), nil
}
}