Add cloud pubsub push input plugin (#5442)
This commit is contained in:
parent
c023ffe0a5
commit
c6612a4e4a
|
@ -15,6 +15,7 @@ import (
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/cgroup"
|
_ "github.com/influxdata/telegraf/plugins/inputs/cgroup"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/chrony"
|
_ "github.com/influxdata/telegraf/plugins/inputs/chrony"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/cloud_pubsub"
|
_ "github.com/influxdata/telegraf/plugins/inputs/cloud_pubsub"
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/inputs/cloud_pubsub_push"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/cloudwatch"
|
_ "github.com/influxdata/telegraf/plugins/inputs/cloudwatch"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/conntrack"
|
_ "github.com/influxdata/telegraf/plugins/inputs/conntrack"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/consul"
|
_ "github.com/influxdata/telegraf/plugins/inputs/consul"
|
||||||
|
|
|
@ -0,0 +1,72 @@
|
||||||
|
# Google Cloud PubSub Push Input Service Plugin
|
||||||
|
|
||||||
|
The Google Cloud PubSub Push listener is a service input plugin that listens for messages sent via an HTTP POST from [Google Cloud PubSub][pubsub].
|
||||||
|
The plugin expects messages in Google's Pub/Sub JSON Format ONLY.
|
||||||
|
The intent of the plugin is to allow Telegraf to serve as an endpoint of the Google Pub/Sub 'Push' service.
|
||||||
|
Google's PubSub service will **only** send over HTTPS/TLS so this plugin must be behind a valid proxy or must be configured to use TLS.
|
||||||
|
|
||||||
|
Enable TLS by specifying the file names of a service TLS certificate and key.
|
||||||
|
|
||||||
|
Enable mutually authenticated TLS and authorize client connections by signing certificate authority by including a list of allowed CA certificate file names in `tls_allowed_cacerts`.
|
||||||
|
|
||||||
|
|
||||||
|
### Configuration:
|
||||||
|
|
||||||
|
This is a sample configuration for the plugin.
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[inputs.cloud_pubsub_push]]
|
||||||
|
## Address and port to host HTTP listener on
|
||||||
|
service_address = ":8080"
|
||||||
|
|
||||||
|
## Application secret to verify messages originate from Cloud Pub/Sub
|
||||||
|
# token = ""
|
||||||
|
|
||||||
|
## Path to listen to.
|
||||||
|
# path = "/"
|
||||||
|
|
||||||
|
## Maximum duration before timing out read of the request
|
||||||
|
# read_timeout = "10s"
|
||||||
|
## Maximum duration before timing out write of the response. This should be set to a value
|
||||||
|
## large enough that you can send at least 'metric_batch_size' number of messages within the
|
||||||
|
## duration.
|
||||||
|
# write_timeout = "10s"
|
||||||
|
|
||||||
|
## Maximum allowed http request body size in bytes.
|
||||||
|
## 0 means to use the default of 524,288,00 bytes (500 mebibytes)
|
||||||
|
# max_body_size = "500MB"
|
||||||
|
|
||||||
|
## Whether to add the pubsub metadata, such as message attributes and subscription as a tag.
|
||||||
|
# add_meta = false
|
||||||
|
|
||||||
|
## Optional. Maximum messages to read from PubSub that have not been written
|
||||||
|
## to an output. Defaults to 1000.
|
||||||
|
## For best throughput set based on the number of metrics within
|
||||||
|
## each message and the size of the output's metric_batch_size.
|
||||||
|
##
|
||||||
|
## For example, if each message contains 10 metrics and the output
|
||||||
|
## metric_batch_size is 1000, setting this to 100 will ensure that a
|
||||||
|
## full batch is collected and the write is triggered immediately without
|
||||||
|
## waiting until the next flush_interval.
|
||||||
|
# max_undelivered_messages = 1000
|
||||||
|
|
||||||
|
## Set one or more allowed client CA certificate file names to
|
||||||
|
## enable mutually authenticated TLS connections
|
||||||
|
# tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
|
||||||
|
|
||||||
|
## Add service certificate and key
|
||||||
|
# tls_cert = "/etc/telegraf/cert.pem"
|
||||||
|
# tls_key = "/etc/telegraf/key.pem"
|
||||||
|
|
||||||
|
## Data format to consume.
|
||||||
|
## Each data format has its own unique set of configuration options, read
|
||||||
|
## more about them here:
|
||||||
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||||
|
data_format = "influx"
|
||||||
|
```
|
||||||
|
|
||||||
|
This plugin assumes you have already created a PUSH subscription for a given
|
||||||
|
PubSub topic.
|
||||||
|
|
||||||
|
[pubsub]: https://cloud.google.com/pubsub
|
||||||
|
[input data formats]: /docs/DATA_FORMATS_INPUT.md
|
|
@ -0,0 +1,323 @@
|
||||||
|
package cloud_pubsub_push
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/subtle"
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/json"
|
||||||
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
tlsint "github.com/influxdata/telegraf/internal/tls"
|
||||||
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
|
)
|
||||||
|
|
||||||
|
// defaultMaxBodySize is the default maximum request body size, in bytes.
|
||||||
|
// if the request body is over this size, we will return an HTTP 413 error.
|
||||||
|
// 500 MB
|
||||||
|
const defaultMaxBodySize = 500 * 1024 * 1024
|
||||||
|
const defaultMaxUndeliveredMessages = 1000
|
||||||
|
|
||||||
|
type PubSubPush struct {
|
||||||
|
ServiceAddress string
|
||||||
|
Token string
|
||||||
|
Path string
|
||||||
|
ReadTimeout internal.Duration
|
||||||
|
WriteTimeout internal.Duration
|
||||||
|
MaxBodySize internal.Size
|
||||||
|
AddMeta bool
|
||||||
|
|
||||||
|
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
|
||||||
|
|
||||||
|
tlsint.ServerConfig
|
||||||
|
parsers.Parser
|
||||||
|
|
||||||
|
listener net.Listener
|
||||||
|
server *http.Server
|
||||||
|
acc telegraf.TrackingAccumulator
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
wg *sync.WaitGroup
|
||||||
|
mu *sync.Mutex
|
||||||
|
|
||||||
|
undelivered map[telegraf.TrackingID]chan bool
|
||||||
|
sem chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Message defines the structure of a Google Pub/Sub message.
|
||||||
|
type Message struct {
|
||||||
|
Atts map[string]string `json:"attributes"`
|
||||||
|
Data string `json:"data"` // Data is base64 encoded data
|
||||||
|
}
|
||||||
|
|
||||||
|
// Payload is the received Google Pub/Sub data. (https://cloud.google.com/pubsub/docs/push)
|
||||||
|
type Payload struct {
|
||||||
|
Msg Message `json:"message"`
|
||||||
|
Subscription string `json:"subscription"`
|
||||||
|
}
|
||||||
|
|
||||||
|
const sampleConfig = `
|
||||||
|
## Address and port to host HTTP listener on
|
||||||
|
service_address = ":8080"
|
||||||
|
|
||||||
|
## Application secret to verify messages originate from Cloud Pub/Sub
|
||||||
|
# token = ""
|
||||||
|
|
||||||
|
## Path to listen to.
|
||||||
|
# path = "/"
|
||||||
|
|
||||||
|
## Maximum duration before timing out read of the request
|
||||||
|
# read_timeout = "10s"
|
||||||
|
## Maximum duration before timing out write of the response. This should be set to a value
|
||||||
|
## large enough that you can send at least 'metric_batch_size' number of messages within the
|
||||||
|
## duration.
|
||||||
|
# write_timeout = "10s"
|
||||||
|
|
||||||
|
## Maximum allowed http request body size in bytes.
|
||||||
|
## 0 means to use the default of 524,288,00 bytes (500 mebibytes)
|
||||||
|
# max_body_size = "500MB"
|
||||||
|
|
||||||
|
## Whether to add the pubsub metadata, such as message attributes and subscription as a tag.
|
||||||
|
# add_meta = false
|
||||||
|
|
||||||
|
## Optional. Maximum messages to read from PubSub that have not been written
|
||||||
|
## to an output. Defaults to 1000.
|
||||||
|
## For best throughput set based on the number of metrics within
|
||||||
|
## each message and the size of the output's metric_batch_size.
|
||||||
|
##
|
||||||
|
## For example, if each message contains 10 metrics and the output
|
||||||
|
## metric_batch_size is 1000, setting this to 100 will ensure that a
|
||||||
|
## full batch is collected and the write is triggered immediately without
|
||||||
|
## waiting until the next flush_interval.
|
||||||
|
# max_undelivered_messages = 1000
|
||||||
|
|
||||||
|
## Set one or more allowed client CA certificate file names to
|
||||||
|
## enable mutually authenticated TLS connections
|
||||||
|
# tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
|
||||||
|
|
||||||
|
## Add service certificate and key
|
||||||
|
# tls_cert = "/etc/telegraf/cert.pem"
|
||||||
|
# tls_key = "/etc/telegraf/key.pem"
|
||||||
|
|
||||||
|
## Data format to consume.
|
||||||
|
## Each data format has its own unique set of configuration options, read
|
||||||
|
## more about them here:
|
||||||
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||||
|
data_format = "influx"
|
||||||
|
`
|
||||||
|
|
||||||
|
func (p *PubSubPush) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PubSubPush) Description() string {
|
||||||
|
return "Google Cloud Pub/Sub Push HTTP listener"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PubSubPush) Gather(_ telegraf.Accumulator) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PubSubPush) SetParser(parser parsers.Parser) {
|
||||||
|
p.Parser = parser
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start starts the http listener service.
|
||||||
|
func (p *PubSubPush) Start(acc telegraf.Accumulator) error {
|
||||||
|
if p.MaxBodySize.Size == 0 {
|
||||||
|
p.MaxBodySize.Size = defaultMaxBodySize
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.ReadTimeout.Duration < time.Second {
|
||||||
|
p.ReadTimeout.Duration = time.Second * 10
|
||||||
|
}
|
||||||
|
if p.WriteTimeout.Duration < time.Second {
|
||||||
|
p.WriteTimeout.Duration = time.Second * 10
|
||||||
|
}
|
||||||
|
|
||||||
|
tlsConf, err := p.ServerConfig.TLSConfig()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
p.server = &http.Server{
|
||||||
|
Addr: p.ServiceAddress,
|
||||||
|
Handler: http.TimeoutHandler(p, p.WriteTimeout.Duration, "timed out processing metric"),
|
||||||
|
ReadTimeout: p.ReadTimeout.Duration,
|
||||||
|
TLSConfig: tlsConf,
|
||||||
|
}
|
||||||
|
|
||||||
|
p.ctx, p.cancel = context.WithCancel(context.Background())
|
||||||
|
p.wg = &sync.WaitGroup{}
|
||||||
|
p.acc = acc.WithTracking(p.MaxUndeliveredMessages)
|
||||||
|
p.sem = make(chan struct{}, p.MaxUndeliveredMessages)
|
||||||
|
p.undelivered = make(map[telegraf.TrackingID]chan bool)
|
||||||
|
p.mu = &sync.Mutex{}
|
||||||
|
|
||||||
|
p.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer p.wg.Done()
|
||||||
|
p.receiveDelivered()
|
||||||
|
}()
|
||||||
|
|
||||||
|
p.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer p.wg.Done()
|
||||||
|
if tlsConf != nil {
|
||||||
|
p.server.ListenAndServeTLS("", "")
|
||||||
|
} else {
|
||||||
|
p.server.ListenAndServe()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop cleans up all resources
|
||||||
|
func (p *PubSubPush) Stop() {
|
||||||
|
p.cancel()
|
||||||
|
p.server.Shutdown(p.ctx)
|
||||||
|
p.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PubSubPush) ServeHTTP(res http.ResponseWriter, req *http.Request) {
|
||||||
|
if req.URL.Path == p.Path {
|
||||||
|
p.AuthenticateIfSet(p.serveWrite, res, req)
|
||||||
|
} else {
|
||||||
|
p.AuthenticateIfSet(http.NotFound, res, req)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PubSubPush) serveWrite(res http.ResponseWriter, req *http.Request) {
|
||||||
|
select {
|
||||||
|
case <-req.Context().Done():
|
||||||
|
res.WriteHeader(http.StatusServiceUnavailable)
|
||||||
|
return
|
||||||
|
case <-p.ctx.Done():
|
||||||
|
res.WriteHeader(http.StatusServiceUnavailable)
|
||||||
|
return
|
||||||
|
case p.sem <- struct{}{}:
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that the content length is not too large for us to handle.
|
||||||
|
if req.ContentLength > p.MaxBodySize.Size {
|
||||||
|
res.WriteHeader(http.StatusRequestEntityTooLarge)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.Method != http.MethodPost {
|
||||||
|
res.WriteHeader(http.StatusMethodNotAllowed)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
body := http.MaxBytesReader(res, req.Body, p.MaxBodySize.Size)
|
||||||
|
bytes, err := ioutil.ReadAll(body)
|
||||||
|
if err != nil {
|
||||||
|
res.WriteHeader(http.StatusRequestEntityTooLarge)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var payload Payload
|
||||||
|
if err = json.Unmarshal(bytes, &payload); err != nil {
|
||||||
|
log.Printf("E! [inputs.cloud_pubsub_push] Error decoding payload %s", err.Error())
|
||||||
|
res.WriteHeader(http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
sDec, err := base64.StdEncoding.DecodeString(payload.Msg.Data)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("E! [inputs.cloud_pubsub_push] Base64-Decode Failed %s", err.Error())
|
||||||
|
res.WriteHeader(http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics, err := p.Parse(sDec)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("D! [inputs.cloud_pubsub_push] " + err.Error())
|
||||||
|
res.WriteHeader(http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.AddMeta {
|
||||||
|
for i := range metrics {
|
||||||
|
for k, v := range payload.Msg.Atts {
|
||||||
|
metrics[i].AddTag(k, v)
|
||||||
|
}
|
||||||
|
metrics[i].AddTag("subscription", payload.Subscription)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ch := make(chan bool, 1)
|
||||||
|
p.mu.Lock()
|
||||||
|
p.undelivered[p.acc.AddTrackingMetricGroup(metrics)] = ch
|
||||||
|
p.mu.Unlock()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-req.Context().Done():
|
||||||
|
res.WriteHeader(http.StatusServiceUnavailable)
|
||||||
|
return
|
||||||
|
case success := <-ch:
|
||||||
|
if success {
|
||||||
|
res.WriteHeader(http.StatusNoContent)
|
||||||
|
} else {
|
||||||
|
res.WriteHeader(http.StatusInternalServerError)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PubSubPush) receiveDelivered() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-p.ctx.Done():
|
||||||
|
return
|
||||||
|
case info := <-p.acc.Delivered():
|
||||||
|
<-p.sem
|
||||||
|
|
||||||
|
p.mu.Lock()
|
||||||
|
ch, ok := p.undelivered[info.ID()]
|
||||||
|
if !ok {
|
||||||
|
p.mu.Unlock()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(p.undelivered, info.ID())
|
||||||
|
p.mu.Unlock()
|
||||||
|
|
||||||
|
if info.Delivered() {
|
||||||
|
ch <- true
|
||||||
|
} else {
|
||||||
|
ch <- false
|
||||||
|
log.Println("D! [inputs.cloud_pubsub_push] Metric group failed to process")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PubSubPush) AuthenticateIfSet(handler http.HandlerFunc, res http.ResponseWriter, req *http.Request) {
|
||||||
|
if p.Token != "" {
|
||||||
|
if subtle.ConstantTimeCompare([]byte(req.FormValue("token")), []byte(p.Token)) != 1 {
|
||||||
|
http.Error(res, "Unauthorized.", http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
handler(res, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
inputs.Add("cloud_pubsub_push", func() telegraf.Input {
|
||||||
|
return &PubSubPush{
|
||||||
|
ServiceAddress: ":8080",
|
||||||
|
Path: "/",
|
||||||
|
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,216 @@
|
||||||
|
package cloud_pubsub_push
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/agent"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
"github.com/influxdata/telegraf/internal/models"
|
||||||
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestServeHTTP(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
method string
|
||||||
|
path string
|
||||||
|
body io.Reader
|
||||||
|
status int
|
||||||
|
maxsize int64
|
||||||
|
expected string
|
||||||
|
fail bool
|
||||||
|
full bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "bad method get",
|
||||||
|
method: "GET",
|
||||||
|
path: "/",
|
||||||
|
status: http.StatusMethodNotAllowed,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "post not found",
|
||||||
|
method: "POST",
|
||||||
|
path: "/allthings",
|
||||||
|
status: http.StatusNotFound,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "post large date",
|
||||||
|
method: "POST",
|
||||||
|
path: "/",
|
||||||
|
status: http.StatusRequestEntityTooLarge,
|
||||||
|
body: strings.NewReader(`{"message":{"attributes":{"deviceId":"myPi","deviceNumId":"2808946627307959","deviceRegistryId":"my-registry","deviceRegistryLocation":"us-central1","projectId":"conference-demos","subFolder":""},"data":"dGVzdGluZ0dvb2dsZSxzZW5zb3I9Ym1lXzI4MCB0ZW1wX2M9MjMuOTUsaHVtaWRpdHk9NjIuODMgMTUzNjk1Mjk3NDU1MzUxMDIzMQ==","messageId":"204004313210337","message_id":"204004313210337","publishTime":"2018-09-14T19:22:54.587Z","publish_time":"2018-09-14T19:22:54.587Z"},"subscription":"projects/conference-demos/subscriptions/my-subscription"}`),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "post valid data",
|
||||||
|
method: "POST",
|
||||||
|
path: "/",
|
||||||
|
maxsize: 500 * 1024 * 1024,
|
||||||
|
status: http.StatusNoContent,
|
||||||
|
body: strings.NewReader(`{"message":{"attributes":{"deviceId":"myPi","deviceNumId":"2808946627307959","deviceRegistryId":"my-registry","deviceRegistryLocation":"us-central1","projectId":"conference-demos","subFolder":""},"data":"dGVzdGluZ0dvb2dsZSxzZW5zb3I9Ym1lXzI4MCB0ZW1wX2M9MjMuOTUsaHVtaWRpdHk9NjIuODMgMTUzNjk1Mjk3NDU1MzUxMDIzMQ==","messageId":"204004313210337","message_id":"204004313210337","publishTime":"2018-09-14T19:22:54.587Z","publish_time":"2018-09-14T19:22:54.587Z"},"subscription":"projects/conference-demos/subscriptions/my-subscription"}`),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "fail write",
|
||||||
|
method: "POST",
|
||||||
|
path: "/",
|
||||||
|
maxsize: 500 * 1024 * 1024,
|
||||||
|
status: http.StatusServiceUnavailable,
|
||||||
|
body: strings.NewReader(`{"message":{"attributes":{"deviceId":"myPi","deviceNumId":"2808946627307959","deviceRegistryId":"my-registry","deviceRegistryLocation":"us-central1","projectId":"conference-demos","subFolder":""},"data":"dGVzdGluZ0dvb2dsZSxzZW5zb3I9Ym1lXzI4MCB0ZW1wX2M9MjMuOTUsaHVtaWRpdHk9NjIuODMgMTUzNjk1Mjk3NDU1MzUxMDIzMQ==","messageId":"204004313210337","message_id":"204004313210337","publishTime":"2018-09-14T19:22:54.587Z","publish_time":"2018-09-14T19:22:54.587Z"},"subscription":"projects/conference-demos/subscriptions/my-subscription"}`),
|
||||||
|
fail: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "full buffer",
|
||||||
|
method: "POST",
|
||||||
|
path: "/",
|
||||||
|
maxsize: 500 * 1024 * 1024,
|
||||||
|
status: http.StatusServiceUnavailable,
|
||||||
|
body: strings.NewReader(`{"message":{"attributes":{"deviceId":"myPi","deviceNumId":"2808946627307959","deviceRegistryId":"my-registry","deviceRegistryLocation":"us-central1","projectId":"conference-demos","subFolder":""},"data":"dGVzdGluZ0dvb2dsZSxzZW5zb3I9Ym1lXzI4MCB0ZW1wX2M9MjMuOTUsaHVtaWRpdHk9NjIuODMgMTUzNjk1Mjk3NDU1MzUxMDIzMQ==","messageId":"204004313210337","message_id":"204004313210337","publishTime":"2018-09-14T19:22:54.587Z","publish_time":"2018-09-14T19:22:54.587Z"},"subscription":"projects/conference-demos/subscriptions/my-subscription"}`),
|
||||||
|
full: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "post invalid body",
|
||||||
|
method: "POST",
|
||||||
|
path: "/",
|
||||||
|
maxsize: 500 * 1024 * 1024,
|
||||||
|
status: http.StatusBadRequest,
|
||||||
|
body: strings.NewReader(`invalid body`),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "post invalid data",
|
||||||
|
method: "POST",
|
||||||
|
path: "/",
|
||||||
|
maxsize: 500 * 1024 * 1024,
|
||||||
|
status: http.StatusBadRequest,
|
||||||
|
body: strings.NewReader(`{"message":{"attributes":{"deviceId":"myPi","deviceNumId":"2808946627307959","deviceRegistryId":"my-registry","deviceRegistryLocation":"us-central1","projectId":"conference-demos","subFolder":""},"data":"not base 64 encoded data","messageId":"204004313210337","message_id":"204004313210337","publishTime":"2018-09-14T19:22:54.587Z","publish_time":"2018-09-14T19:22:54.587Z"},"subscription":"projects/conference-demos/subscriptions/my-subscription"}`),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "post invalid data format",
|
||||||
|
method: "POST",
|
||||||
|
path: "/",
|
||||||
|
maxsize: 500 * 1024 * 1024,
|
||||||
|
status: http.StatusBadRequest,
|
||||||
|
body: strings.NewReader(`{"message":{"attributes":{"deviceId":"myPi","deviceNumId":"2808946627307959","deviceRegistryId":"my-registry","deviceRegistryLocation":"us-central1","projectId":"conference-demos","subFolder":""},"data":"bm90IHZhbGlkIGZvcm1hdHRlZCBkYXRh","messageId":"204004313210337","message_id":"204004313210337","publishTime":"2018-09-14T19:22:54.587Z","publish_time":"2018-09-14T19:22:54.587Z"},"subscription":"projects/conference-demos/subscriptions/my-subscription"}`),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "post invalid structured body",
|
||||||
|
method: "POST",
|
||||||
|
path: "/",
|
||||||
|
maxsize: 500 * 1024 * 1024,
|
||||||
|
status: http.StatusBadRequest,
|
||||||
|
body: strings.NewReader(`{"message":{"attributes":{"thing":1},"data":"bm90IHZhbGlkIGZvcm1hdHRlZCBkYXRh"},"subscription":"projects/conference-demos/subscriptions/my-subscription"}`),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
req, err := http.NewRequest(test.method, test.path, test.body)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
rr := httptest.NewRecorder()
|
||||||
|
pubPush := &PubSubPush{
|
||||||
|
Path: "/",
|
||||||
|
MaxBodySize: internal.Size{
|
||||||
|
Size: test.maxsize,
|
||||||
|
},
|
||||||
|
sem: make(chan struct{}, 1),
|
||||||
|
undelivered: make(map[telegraf.TrackingID]chan bool),
|
||||||
|
mu: &sync.Mutex{},
|
||||||
|
WriteTimeout: internal.Duration{Duration: time.Second * 1},
|
||||||
|
}
|
||||||
|
|
||||||
|
pubPush.ctx, pubPush.cancel = context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
if test.full {
|
||||||
|
// fill buffer with fake message
|
||||||
|
pubPush.sem <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
p, _ := parsers.NewParser(&parsers.Config{
|
||||||
|
MetricName: "cloud_pubsub_push",
|
||||||
|
DataFormat: "influx",
|
||||||
|
})
|
||||||
|
pubPush.SetParser(p)
|
||||||
|
|
||||||
|
dst := make(chan telegraf.Metric, 1)
|
||||||
|
ro := models.NewRunningOutput("test", &testOutput{failWrite: test.fail}, &models.OutputConfig{}, 1, 1)
|
||||||
|
pubPush.acc = agent.NewAccumulator(&testMetricMaker{}, dst).WithTracking(1)
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
pubPush.receiveDelivered()
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func(status int, d chan telegraf.Metric) {
|
||||||
|
defer wg.Done()
|
||||||
|
for m := range d {
|
||||||
|
ro.AddMetric(m)
|
||||||
|
ro.Write()
|
||||||
|
}
|
||||||
|
}(test.status, dst)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(req.Context(), pubPush.WriteTimeout.Duration)
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
|
||||||
|
pubPush.ServeHTTP(rr, req)
|
||||||
|
require.Equal(t, test.status, rr.Code, test.name)
|
||||||
|
|
||||||
|
if test.expected != "" {
|
||||||
|
require.Equal(t, test.expected, rr.Body.String(), test.name)
|
||||||
|
}
|
||||||
|
|
||||||
|
pubPush.cancel()
|
||||||
|
cancel()
|
||||||
|
close(dst)
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type testMetricMaker struct{}
|
||||||
|
|
||||||
|
func (tm *testMetricMaker) Name() string {
|
||||||
|
return "TestPlugin"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tm *testMetricMaker) MakeMetric(metric telegraf.Metric) telegraf.Metric {
|
||||||
|
return metric
|
||||||
|
}
|
||||||
|
|
||||||
|
type testOutput struct {
|
||||||
|
// if true, mock a write failure
|
||||||
|
failWrite bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*testOutput) Connect() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*testOutput) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*testOutput) Description() string {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*testOutput) SampleConfig() string {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testOutput) Write(metrics []telegraf.Metric) error {
|
||||||
|
if t.failWrite {
|
||||||
|
return fmt.Errorf("failed write")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
Reference in New Issue