Add cloud_pubsub input plugin (#5136)

This commit is contained in:
emily 2019-01-03 16:20:07 -08:00 committed by Daniel Nelson
parent 3a7a40a0a4
commit f42d9378ba
8 changed files with 747 additions and 0 deletions

21
Gopkg.lock generated
View File

@ -7,8 +7,13 @@
packages = [
"civil",
"compute/metadata",
"iam",
"internal/optional",
"internal/version",
"monitoring/apiv3",
"pubsub",
"pubsub/apiv1",
"pubsub/internal/distribution",
]
pruneopts = ""
revision = "c728a003b238b26cef9ab6753a5dc424b331c3ad"
@ -1217,6 +1222,17 @@
pruneopts = ""
revision = "d2e6202438beef2727060aa7cabdd924d92ebfd9"
[[projects]]
branch = "master"
digest = "1:88ecca26e54f601a8733c9a31d9f0883b915216a177673f0467f6b864fd0d90f"
name = "golang.org/x/sync"
packages = [
"errgroup",
"semaphore",
]
pruneopts = ""
revision = "42b317875d0fa942474b76e1b46a6060d720ae6e"
[[projects]]
branch = "master"
digest = "1:6a6eed3727d0e15703d9e930d8dbe333bea09eda309d75a015d3c6dc4e5c92a6"
@ -1277,6 +1293,7 @@
"internal",
"iterator",
"option",
"support/bundler",
"transport",
"transport/grpc",
"transport/http",
@ -1316,7 +1333,9 @@
"googleapis/api/label",
"googleapis/api/metric",
"googleapis/api/monitoredres",
"googleapis/iam/v1",
"googleapis/monitoring/v3",
"googleapis/pubsub/v1",
"googleapis/rpc/status",
"protobuf/field_mask",
]
@ -1459,6 +1478,7 @@
analyzer-version = 1
input-imports = [
"cloud.google.com/go/monitoring/apiv3",
"cloud.google.com/go/pubsub",
"collectd.org/api",
"collectd.org/network",
"github.com/Azure/go-autorest/autorest",
@ -1562,6 +1582,7 @@
"golang.org/x/net/html/charset",
"golang.org/x/oauth2",
"golang.org/x/oauth2/clientcredentials",
"golang.org/x/oauth2/google",
"golang.org/x/sys/unix",
"golang.org/x/sys/windows",
"golang.org/x/sys/windows/svc",

View File

@ -18,7 +18,9 @@ import (
"time"
"unicode"
"fmt"
"github.com/alecthomas/units"
"runtime"
)
const alphanum string = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
@ -58,6 +60,11 @@ func Version() string {
return version
}
// ProductToken returns a tag for Telegraf that can be used in user agents.
func ProductToken() string {
return fmt.Sprintf("Telegraf/%s Go/%s", Version(), runtime.Version())
}
// UnmarshalTOML parses the duration from the TOML config file
func (d *Duration) UnmarshalTOML(b []byte) error {
var err error

View File

@ -14,6 +14,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/ceph"
_ "github.com/influxdata/telegraf/plugins/inputs/cgroup"
_ "github.com/influxdata/telegraf/plugins/inputs/chrony"
_ "github.com/influxdata/telegraf/plugins/inputs/cloud_pubsub"
_ "github.com/influxdata/telegraf/plugins/inputs/cloudwatch"
_ "github.com/influxdata/telegraf/plugins/inputs/conntrack"
_ "github.com/influxdata/telegraf/plugins/inputs/consul"

View File

@ -0,0 +1,90 @@
# Google Cloud PubSub Input Plugin
The GCP PubSub plugin ingests metrics from [Google Cloud PubSub][pubsub]
and creates metrics using one of the supported [input data formats][].
### Configuration
This section contains the default TOML to configure the plugin. You can
generate it using `telegraf --usage pubsub`.
```toml
[[inputs.pubsub]]
## Required. Name of Google Cloud Platform (GCP) Project that owns
## the given PubSub subscription.
project = "my-project"
## Required. Name of PubSub subscription to ingest metrics from.
subscription = "my-subscription"
## Required. Data format to consume.
## Each data format has its own unique set of configuration options.
## Read more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Optional. Filepath for GCP credentials JSON file to authorize calls to
## PubSub APIs. If not set explicitly, Telegraf will attempt to use
## Application Default Credentials, which is preferred.
# credentials_file = "path/to/my/creds.json"
## Optional. Maximum byte length of a message to consume.
## Larger messages are dropped with an error. If less than 0 or unspecified,
## treated as no limit.
# max_message_len = 1000000
## Optional. Maximum messages to read from PubSub that have not been written
## to an output. Defaults to %d.
## For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message contains 10 metrics and the output
## metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## The following are optional Subscription ReceiveSettings in PubSub.
## Read more about these values:
## https://godoc.org/cloud.google.com/go/pubsub#ReceiveSettings
## Optional. Maximum number of seconds for which a PubSub subscription
## should auto-extend the PubSub ACK deadline for each message. If less than
## 0, auto-extension is disabled.
# max_extension = 0
## Optional. Maximum number of unprocessed messages in PubSub
## (unacknowledged but not yet expired in PubSub).
## A value of 0 is treated as the default PubSub value.
## Negative values will be treated as unlimited.
# max_outstanding_messages = 0
## Optional. Maximum size in bytes of unprocessed messages in PubSub
## (unacknowledged but not yet expired in PubSub).
## A value of 0 is treated as the default PubSub value.
## Negative values will be treated as unlimited.
# max_outstanding_bytes = 0
## Optional. Max number of goroutines a PubSub Subscription receiver can spawn
## to pull messages from PubSub concurrently. This limit applies to each
## subscription separately and is treated as the PubSub default if less than
## 1. Note this setting does not limit the number of messages that can be
## processed concurrently (use "max_outstanding_messages" instead).
# max_receiver_go_routines = 0
```
### Multiple Subscriptions and Topics
This plugin assumes you have already created a PULL subscription for a given
PubSub topic. To learn how to do so, see [how to create a subscription][pubsub create sub].
Each plugin agent can listen to one subscription at a time, so you will
need to run multiple instances of the plugin to pull messages from multiple
subscriptions/topics.
[pubsub]: https://cloud.google.com/pubsub
[pubsub create sub]: https://cloud.google.com/pubsub/docs/admin#create_a_pull_subscription
[input data formats]: /docs/DATA_FORMATS_INPUT.md

View File

@ -0,0 +1,307 @@
package cloud_pubsub
import (
"cloud.google.com/go/pubsub"
"context"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"golang.org/x/oauth2/google"
"google.golang.org/api/option"
"sync"
)
type empty struct{}
type semaphore chan empty
const defaultMaxUndeliveredMessages = 1000
type PubSub struct {
CredentialsFile string `toml:"credentials_file"`
Project string `toml:"project"`
Subscription string `toml:"subscription"`
// Subscription ReceiveSettings
MaxExtension internal.Duration `toml:"max_extension"`
MaxOutstandingMessages int `toml:"max_outstanding_messages"`
MaxOutstandingBytes int `toml:"max_outstanding_bytes"`
MaxReceiverGoRoutines int `toml:"max_receiver_go_routines"`
// Agent settings
MaxMessageLen int `toml:"max_message_len"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
sub subscription
stubSub func() subscription
cancel context.CancelFunc
parser parsers.Parser
wg *sync.WaitGroup
acc telegraf.TrackingAccumulator
mu sync.Mutex
undelivered map[telegraf.TrackingID]message
sem semaphore
}
func (ps *PubSub) Description() string {
return "Read metrics from Google PubSub"
}
func (ps *PubSub) SampleConfig() string {
return fmt.Sprintf(sampleConfig, defaultMaxUndeliveredMessages)
}
// Gather does nothing for this service input.
func (ps *PubSub) Gather(acc telegraf.Accumulator) error {
return nil
}
// SetParser implements ParserInput interface.
func (ps *PubSub) SetParser(parser parsers.Parser) {
ps.parser = parser
}
// Start initializes the plugin and processing messages from Google PubSub.
// Two goroutines are started - one pulling for the subscription, one
// receiving delivery notifications from the accumulator.
func (ps *PubSub) Start(ac telegraf.Accumulator) error {
if ps.Subscription == "" {
return fmt.Errorf(`"subscription" is required`)
}
if ps.Project == "" {
return fmt.Errorf(`"project" is required`)
}
cctx, cancel := context.WithCancel(context.Background())
ps.cancel = cancel
if ps.stubSub != nil {
ps.sub = ps.stubSub()
} else {
subRef, err := ps.getGCPSubscription(cctx, ps.Subscription)
if err != nil {
return err
}
ps.sub = subRef
}
ps.wg = &sync.WaitGroup{}
ps.acc = ac.WithTracking(ps.MaxUndeliveredMessages)
ps.sem = make(semaphore, ps.MaxUndeliveredMessages)
// Start receiver in new goroutine for each subscription.
ps.wg.Add(1)
go func() {
defer ps.wg.Done()
ps.subReceive(cctx)
}()
// Start goroutine to handle delivery notifications from accumulator.
ps.wg.Add(1)
go func() {
defer ps.wg.Done()
ps.receiveDelivered(cctx)
}()
return nil
}
// Stop ensures the PubSub subscriptions receivers are stopped by
// canceling the context and waits for goroutines to finish.
func (ps *PubSub) Stop() {
ps.cancel()
ps.wg.Wait()
}
func (ps *PubSub) subReceive(cctx context.Context) {
err := ps.sub.Receive(cctx, func(ctx context.Context, msg message) {
if err := ps.onMessage(ctx, msg); err != nil {
ps.acc.AddError(fmt.Errorf("unable to add message from subscription %s: %v", ps.sub.ID(), err))
}
})
ps.acc.AddError(fmt.Errorf("receiver for subscription %s exited: %v", ps.sub.ID(), err))
}
// onMessage handles parsing and adding a received message to the accumulator.
func (ps *PubSub) onMessage(ctx context.Context, msg message) error {
if ps.MaxMessageLen > 0 && len(msg.Data()) > ps.MaxMessageLen {
msg.Ack()
return fmt.Errorf("message longer than max_message_len (%d > %d)", len(msg.Data()), ps.MaxMessageLen)
}
metrics, err := ps.parser.Parse(msg.Data())
if err != nil {
msg.Ack()
return err
}
if len(metrics) == 0 {
msg.Ack()
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case ps.sem <- empty{}:
break
}
ps.mu.Lock()
defer ps.mu.Unlock()
id := ps.acc.AddTrackingMetricGroup(metrics)
if ps.undelivered == nil {
ps.undelivered = make(map[telegraf.TrackingID]message)
}
ps.undelivered[id] = msg
return nil
}
func (ps *PubSub) receiveDelivered(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case info := <-ps.acc.Delivered():
<-ps.sem
msg := ps.removeDelivered(info.ID())
if msg != nil {
msg.Ack()
}
}
}
}
func (ps *PubSub) removeDelivered(id telegraf.TrackingID) message {
ps.mu.Lock()
defer ps.mu.Unlock()
msg, ok := ps.undelivered[id]
if !ok {
return nil
}
delete(ps.undelivered, id)
return msg
}
func (ps *PubSub) getPubSubClient() (*pubsub.Client, error) {
var credsOpt option.ClientOption
if ps.CredentialsFile != "" {
credsOpt = option.WithCredentialsFile(ps.CredentialsFile)
} else {
creds, err := google.FindDefaultCredentials(context.Background(), pubsub.ScopeCloudPlatform)
if err != nil {
return nil, fmt.Errorf(
"unable to find GCP Application Default Credentials: %v."+
"Either set ADC or provide CredentialsFile config", err)
}
credsOpt = option.WithCredentials(creds)
}
client, err := pubsub.NewClient(
context.Background(),
ps.Project,
credsOpt,
option.WithScopes(pubsub.ScopeCloudPlatform),
option.WithUserAgent(internal.ProductToken()),
)
if err != nil {
return nil, fmt.Errorf("unable to generate PubSub client: %v", err)
}
return client, nil
}
func (ps *PubSub) getGCPSubscription(ctx context.Context, subId string) (subscription, error) {
client, err := ps.getPubSubClient()
if err != nil {
return nil, err
}
s := client.Subscription(subId)
s.ReceiveSettings = pubsub.ReceiveSettings{
NumGoroutines: ps.MaxReceiverGoRoutines,
MaxExtension: ps.MaxExtension.Duration,
MaxOutstandingMessages: ps.MaxOutstandingMessages,
MaxOutstandingBytes: ps.MaxOutstandingBytes,
}
return &gcpSubscription{s}, nil
}
func init() {
inputs.Add("cloud_pubsub", func() telegraf.Input {
ps := &PubSub{
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
}
return ps
})
}
const sampleConfig = `
## Required. Name of Google Cloud Platform (GCP) Project that owns
## the given PubSub subscription.
project = "my-project"
## Required. Name of PubSub subscription to ingest metrics from.
subscription = "my-subscription"
## Required. Data format to consume.
## Each data format has its own unique set of configuration options.
## Read more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Optional. Filepath for GCP credentials JSON file to authorize calls to
## PubSub APIs. If not set explicitly, Telegraf will attempt to use
## Application Default Credentials, which is preferred.
# credentials_file = "path/to/my/creds.json"
## Optional. Maximum byte length of a message to consume.
## Larger messages are dropped with an error. If less than 0 or unspecified,
## treated as no limit.
# max_message_len = 1000000
## Optional. Maximum messages to read from PubSub that have not been written
## to an output. Defaults to %d.
## For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message contains 10 metrics and the output
## metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## The following are optional Subscription ReceiveSettings in PubSub.
## Read more about these values:
## https://godoc.org/cloud.google.com/go/pubsub#ReceiveSettings
## Optional. Maximum number of seconds for which a PubSub subscription
## should auto-extend the PubSub ACK deadline for each message. If less than
## 0, auto-extension is disabled.
# max_extension = 0
## Optional. Maximum number of unprocessed messages in PubSub
## (unacknowledged but not yet expired in PubSub).
## A value of 0 is treated as the default PubSub value.
## Negative values will be treated as unlimited.
# max_outstanding_messages = 0
## Optional. Maximum size in bytes of unprocessed messages in PubSub
## (unacknowledged but not yet expired in PubSub).
## A value of 0 is treated as the default PubSub value.
## Negative values will be treated as unlimited.
# max_outstanding_bytes = 0
## Optional. Max number of goroutines a PubSub Subscription receiver can spawn
## to pull messages from PubSub concurrently. This limit applies to each
## subscription separately and is treated as the PubSub default if less than
## 1. Note this setting does not limit the number of messages that can be
## processed concurrently (use "max_outstanding_messages" instead).
# max_receiver_go_routines = 0
`

View File

@ -0,0 +1,149 @@
package cloud_pubsub
import (
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"testing"
)
const (
msgInflux = "cpu_load_short,host=server01 value=23422.0 1422568543702900257\n"
)
// Test ingesting InfluxDB-format PubSub message
func TestRunParse(t *testing.T) {
subId := "sub-run-parse"
testParser, _ := parsers.NewInfluxParser()
sub := &stubSub{
id: subId,
messages: make(chan *testMsg, 100),
}
ps := &PubSub{
parser: testParser,
stubSub: func() subscription { return sub },
Project: "projectIDontMatterForTests",
Subscription: subId,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
}
acc := &testutil.Accumulator{}
if err := ps.Start(acc); err != nil {
t.Fatalf("test PubSub failed to start: %s", err)
}
defer ps.Stop()
if ps.sub == nil {
t.Fatal("expected plugin subscription to be non-nil")
}
testTracker := &testTracker{}
msg := &testMsg{
value: msgInflux,
tracker: testTracker,
}
sub.messages <- msg
acc.Wait(1)
assert.Equal(t, acc.NFields(), 1)
metric := acc.Metrics[0]
validateTestInfluxMetric(t, metric)
}
func TestRunInvalidMessages(t *testing.T) {
subId := "sub-invalid-messages"
testParser, _ := parsers.NewInfluxParser()
sub := &stubSub{
id: subId,
messages: make(chan *testMsg, 100),
}
ps := &PubSub{
parser: testParser,
stubSub: func() subscription { return sub },
Project: "projectIDontMatterForTests",
Subscription: subId,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
}
acc := &testutil.Accumulator{}
if err := ps.Start(acc); err != nil {
t.Fatalf("test PubSub failed to start: %s", err)
}
defer ps.Stop()
if ps.sub == nil {
t.Fatal("expected plugin subscription to be non-nil")
}
testTracker := &testTracker{}
msg := &testMsg{
value: "~invalidInfluxMsg~",
tracker: testTracker,
}
sub.messages <- msg
acc.WaitError(1)
// Make sure we acknowledged message so we don't receive it again.
testTracker.WaitForAck(1)
assert.Equal(t, acc.NFields(), 0)
}
func TestRunOverlongMessages(t *testing.T) {
subId := "sub-message-too-long"
acc := &testutil.Accumulator{}
testParser, _ := parsers.NewInfluxParser()
sub := &stubSub{
id: subId,
messages: make(chan *testMsg, 100),
}
ps := &PubSub{
parser: testParser,
stubSub: func() subscription { return sub },
Project: "projectIDontMatterForTests",
Subscription: subId,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
// Add MaxMessageLen Param
MaxMessageLen: 1,
}
if err := ps.Start(acc); err != nil {
t.Fatalf("test PubSub failed to start: %s", err)
}
defer ps.Stop()
if ps.sub == nil {
t.Fatal("expected plugin subscription to be non-nil")
}
testTracker := &testTracker{}
msg := &testMsg{
value: msgInflux,
tracker: testTracker,
}
sub.messages <- msg
acc.WaitError(1)
// Make sure we acknowledged message so we don't receive it again.
testTracker.WaitForAck(1)
assert.Equal(t, acc.NFields(), 0)
}
func validateTestInfluxMetric(t *testing.T, m *testutil.Metric) {
assert.Equal(t, "cpu_load_short", m.Measurement)
assert.Equal(t, "server01", m.Tags["host"])
assert.Equal(t, 23422.0, m.Fields["value"])
assert.Equal(t, int64(1422568543702900257), m.Time.UnixNano())
}

View File

@ -0,0 +1,68 @@
package cloud_pubsub
import (
"cloud.google.com/go/pubsub"
"context"
"time"
)
type (
subscription interface {
ID() string
Receive(ctx context.Context, f func(context.Context, message)) error
}
message interface {
Ack()
Nack()
ID() string
Data() []byte
Attributes() map[string]string
PublishTime() time.Time
}
gcpSubscription struct {
sub *pubsub.Subscription
}
gcpMessage struct {
msg *pubsub.Message
}
)
func (s *gcpSubscription) ID() string {
if s.sub == nil {
return ""
}
return s.sub.ID()
}
func (s *gcpSubscription) Receive(ctx context.Context, f func(context.Context, message)) error {
return s.sub.Receive(ctx, func(cctx context.Context, m *pubsub.Message) {
f(cctx, &gcpMessage{m})
})
}
func (env *gcpMessage) Ack() {
env.msg.Ack()
}
func (env *gcpMessage) Nack() {
env.msg.Nack()
}
func (env *gcpMessage) ID() string {
return env.msg.ID
}
func (env *gcpMessage) Data() []byte {
return env.msg.Data
}
func (env *gcpMessage) Attributes() map[string]string {
return env.msg.Attributes
}
func (env *gcpMessage) PublishTime() time.Time {
return env.msg.PublishTime
}

View File

@ -0,0 +1,104 @@
package cloud_pubsub
import (
"context"
"sync"
"time"
)
type stubSub struct {
id string
messages chan *testMsg
}
func (s *stubSub) ID() string {
return s.id
}
func (s *stubSub) Receive(ctx context.Context, f func(context.Context, message)) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case m := <-s.messages:
f(ctx, m)
}
}
}
type testMsg struct {
id string
value string
attributes map[string]string
publishTime time.Time
tracker *testTracker
}
func (tm *testMsg) Ack() {
tm.tracker.Ack()
}
func (tm *testMsg) Nack() {
tm.tracker.Nack()
}
func (tm *testMsg) ID() string {
return tm.id
}
func (tm *testMsg) Data() []byte {
return []byte(tm.value)
}
func (tm *testMsg) Attributes() map[string]string {
return tm.attributes
}
func (tm *testMsg) PublishTime() time.Time {
return tm.publishTime
}
type testTracker struct {
sync.Mutex
*sync.Cond
numAcks int
numNacks int
}
func (t *testTracker) WaitForAck(num int) {
t.Lock()
if t.Cond == nil {
t.Cond = sync.NewCond(&t.Mutex)
}
for t.numAcks < num {
t.Wait()
}
t.Unlock()
}
func (t *testTracker) WaitForNack(num int) {
t.Lock()
if t.Cond == nil {
t.Cond = sync.NewCond(&t.Mutex)
}
for t.numNacks < num {
t.Wait()
}
t.Unlock()
}
func (t *testTracker) Ack() {
t.Lock()
defer t.Unlock()
t.numAcks++
}
func (t *testTracker) Nack() {
t.Lock()
defer t.Unlock()
t.numNacks++
}