diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index e03648036..765505c3e 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -15,6 +15,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/cgroup" _ "github.com/influxdata/telegraf/plugins/inputs/chrony" _ "github.com/influxdata/telegraf/plugins/inputs/cloud_pubsub" + _ "github.com/influxdata/telegraf/plugins/inputs/cloud_pubsub_push" _ "github.com/influxdata/telegraf/plugins/inputs/cloudwatch" _ "github.com/influxdata/telegraf/plugins/inputs/conntrack" _ "github.com/influxdata/telegraf/plugins/inputs/consul" diff --git a/plugins/inputs/cloud_pubsub_push/README.md b/plugins/inputs/cloud_pubsub_push/README.md new file mode 100644 index 000000000..76725c997 --- /dev/null +++ b/plugins/inputs/cloud_pubsub_push/README.md @@ -0,0 +1,72 @@ +# Google Cloud PubSub Push Input Service Plugin + +The Google Cloud PubSub Push listener is a service input plugin that listens for messages sent via an HTTP POST from [Google Cloud PubSub][pubsub]. +The plugin expects messages in Google's Pub/Sub JSON Format ONLY. +The intent of the plugin is to allow Telegraf to serve as an endpoint of the Google Pub/Sub 'Push' service. +Google's PubSub service will **only** send over HTTPS/TLS so this plugin must be behind a valid proxy or must be configured to use TLS. + +Enable TLS by specifying the file names of a service TLS certificate and key. + +Enable mutually authenticated TLS and authorize client connections by signing certificate authority by including a list of allowed CA certificate file names in `tls_allowed_cacerts`. + + +### Configuration: + +This is a sample configuration for the plugin. + +```toml +[[inputs.cloud_pubsub_push]] + ## Address and port to host HTTP listener on + service_address = ":8080" + + ## Application secret to verify messages originate from Cloud Pub/Sub + # token = "" + + ## Path to listen to. + # path = "/" + + ## Maximum duration before timing out read of the request + # read_timeout = "10s" + ## Maximum duration before timing out write of the response. This should be set to a value + ## large enough that you can send at least 'metric_batch_size' number of messages within the + ## duration. + # write_timeout = "10s" + + ## Maximum allowed http request body size in bytes. + ## 0 means to use the default of 524,288,00 bytes (500 mebibytes) + # max_body_size = "500MB" + + ## Whether to add the pubsub metadata, such as message attributes and subscription as a tag. + # add_meta = false + + ## Optional. Maximum messages to read from PubSub that have not been written + ## to an output. Defaults to 1000. + ## For best throughput set based on the number of metrics within + ## each message and the size of the output's metric_batch_size. + ## + ## For example, if each message contains 10 metrics and the output + ## metric_batch_size is 1000, setting this to 100 will ensure that a + ## full batch is collected and the write is triggered immediately without + ## waiting until the next flush_interval. + # max_undelivered_messages = 1000 + + ## Set one or more allowed client CA certificate file names to + ## enable mutually authenticated TLS connections + # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] + + ## Add service certificate and key + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +``` + +This plugin assumes you have already created a PUSH subscription for a given +PubSub topic. + +[pubsub]: https://cloud.google.com/pubsub +[input data formats]: /docs/DATA_FORMATS_INPUT.md diff --git a/plugins/inputs/cloud_pubsub_push/pubsub_push.go b/plugins/inputs/cloud_pubsub_push/pubsub_push.go new file mode 100644 index 000000000..8b83a440d --- /dev/null +++ b/plugins/inputs/cloud_pubsub_push/pubsub_push.go @@ -0,0 +1,323 @@ +package cloud_pubsub_push + +import ( + "context" + "crypto/subtle" + "encoding/base64" + "encoding/json" + "io/ioutil" + "log" + "net" + "net/http" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + tlsint "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" +) + +// defaultMaxBodySize is the default maximum request body size, in bytes. +// if the request body is over this size, we will return an HTTP 413 error. +// 500 MB +const defaultMaxBodySize = 500 * 1024 * 1024 +const defaultMaxUndeliveredMessages = 1000 + +type PubSubPush struct { + ServiceAddress string + Token string + Path string + ReadTimeout internal.Duration + WriteTimeout internal.Duration + MaxBodySize internal.Size + AddMeta bool + + MaxUndeliveredMessages int `toml:"max_undelivered_messages"` + + tlsint.ServerConfig + parsers.Parser + + listener net.Listener + server *http.Server + acc telegraf.TrackingAccumulator + ctx context.Context + cancel context.CancelFunc + wg *sync.WaitGroup + mu *sync.Mutex + + undelivered map[telegraf.TrackingID]chan bool + sem chan struct{} +} + +// Message defines the structure of a Google Pub/Sub message. +type Message struct { + Atts map[string]string `json:"attributes"` + Data string `json:"data"` // Data is base64 encoded data +} + +// Payload is the received Google Pub/Sub data. (https://cloud.google.com/pubsub/docs/push) +type Payload struct { + Msg Message `json:"message"` + Subscription string `json:"subscription"` +} + +const sampleConfig = ` + ## Address and port to host HTTP listener on + service_address = ":8080" + + ## Application secret to verify messages originate from Cloud Pub/Sub + # token = "" + + ## Path to listen to. + # path = "/" + + ## Maximum duration before timing out read of the request + # read_timeout = "10s" + ## Maximum duration before timing out write of the response. This should be set to a value + ## large enough that you can send at least 'metric_batch_size' number of messages within the + ## duration. + # write_timeout = "10s" + + ## Maximum allowed http request body size in bytes. + ## 0 means to use the default of 524,288,00 bytes (500 mebibytes) + # max_body_size = "500MB" + + ## Whether to add the pubsub metadata, such as message attributes and subscription as a tag. + # add_meta = false + + ## Optional. Maximum messages to read from PubSub that have not been written + ## to an output. Defaults to 1000. + ## For best throughput set based on the number of metrics within + ## each message and the size of the output's metric_batch_size. + ## + ## For example, if each message contains 10 metrics and the output + ## metric_batch_size is 1000, setting this to 100 will ensure that a + ## full batch is collected and the write is triggered immediately without + ## waiting until the next flush_interval. + # max_undelivered_messages = 1000 + + ## Set one or more allowed client CA certificate file names to + ## enable mutually authenticated TLS connections + # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] + + ## Add service certificate and key + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +` + +func (p *PubSubPush) SampleConfig() string { + return sampleConfig +} + +func (p *PubSubPush) Description() string { + return "Google Cloud Pub/Sub Push HTTP listener" +} + +func (p *PubSubPush) Gather(_ telegraf.Accumulator) error { + return nil +} + +func (p *PubSubPush) SetParser(parser parsers.Parser) { + p.Parser = parser +} + +// Start starts the http listener service. +func (p *PubSubPush) Start(acc telegraf.Accumulator) error { + if p.MaxBodySize.Size == 0 { + p.MaxBodySize.Size = defaultMaxBodySize + } + + if p.ReadTimeout.Duration < time.Second { + p.ReadTimeout.Duration = time.Second * 10 + } + if p.WriteTimeout.Duration < time.Second { + p.WriteTimeout.Duration = time.Second * 10 + } + + tlsConf, err := p.ServerConfig.TLSConfig() + if err != nil { + return err + } + + p.server = &http.Server{ + Addr: p.ServiceAddress, + Handler: http.TimeoutHandler(p, p.WriteTimeout.Duration, "timed out processing metric"), + ReadTimeout: p.ReadTimeout.Duration, + TLSConfig: tlsConf, + } + + p.ctx, p.cancel = context.WithCancel(context.Background()) + p.wg = &sync.WaitGroup{} + p.acc = acc.WithTracking(p.MaxUndeliveredMessages) + p.sem = make(chan struct{}, p.MaxUndeliveredMessages) + p.undelivered = make(map[telegraf.TrackingID]chan bool) + p.mu = &sync.Mutex{} + + p.wg.Add(1) + go func() { + defer p.wg.Done() + p.receiveDelivered() + }() + + p.wg.Add(1) + go func() { + defer p.wg.Done() + if tlsConf != nil { + p.server.ListenAndServeTLS("", "") + } else { + p.server.ListenAndServe() + } + }() + + return nil +} + +// Stop cleans up all resources +func (p *PubSubPush) Stop() { + p.cancel() + p.server.Shutdown(p.ctx) + p.wg.Wait() +} + +func (p *PubSubPush) ServeHTTP(res http.ResponseWriter, req *http.Request) { + if req.URL.Path == p.Path { + p.AuthenticateIfSet(p.serveWrite, res, req) + } else { + p.AuthenticateIfSet(http.NotFound, res, req) + } +} + +func (p *PubSubPush) serveWrite(res http.ResponseWriter, req *http.Request) { + select { + case <-req.Context().Done(): + res.WriteHeader(http.StatusServiceUnavailable) + return + case <-p.ctx.Done(): + res.WriteHeader(http.StatusServiceUnavailable) + return + case p.sem <- struct{}{}: + break + } + + // Check that the content length is not too large for us to handle. + if req.ContentLength > p.MaxBodySize.Size { + res.WriteHeader(http.StatusRequestEntityTooLarge) + return + } + + if req.Method != http.MethodPost { + res.WriteHeader(http.StatusMethodNotAllowed) + return + } + + body := http.MaxBytesReader(res, req.Body, p.MaxBodySize.Size) + bytes, err := ioutil.ReadAll(body) + if err != nil { + res.WriteHeader(http.StatusRequestEntityTooLarge) + return + } + + var payload Payload + if err = json.Unmarshal(bytes, &payload); err != nil { + log.Printf("E! [inputs.cloud_pubsub_push] Error decoding payload %s", err.Error()) + res.WriteHeader(http.StatusBadRequest) + return + } + + sDec, err := base64.StdEncoding.DecodeString(payload.Msg.Data) + if err != nil { + log.Printf("E! [inputs.cloud_pubsub_push] Base64-Decode Failed %s", err.Error()) + res.WriteHeader(http.StatusBadRequest) + return + } + + metrics, err := p.Parse(sDec) + if err != nil { + log.Println("D! [inputs.cloud_pubsub_push] " + err.Error()) + res.WriteHeader(http.StatusBadRequest) + return + } + + if p.AddMeta { + for i := range metrics { + for k, v := range payload.Msg.Atts { + metrics[i].AddTag(k, v) + } + metrics[i].AddTag("subscription", payload.Subscription) + } + } + + ch := make(chan bool, 1) + p.mu.Lock() + p.undelivered[p.acc.AddTrackingMetricGroup(metrics)] = ch + p.mu.Unlock() + + select { + case <-req.Context().Done(): + res.WriteHeader(http.StatusServiceUnavailable) + return + case success := <-ch: + if success { + res.WriteHeader(http.StatusNoContent) + } else { + res.WriteHeader(http.StatusInternalServerError) + } + } +} + +func (p *PubSubPush) receiveDelivered() { + for { + select { + case <-p.ctx.Done(): + return + case info := <-p.acc.Delivered(): + <-p.sem + + p.mu.Lock() + ch, ok := p.undelivered[info.ID()] + if !ok { + p.mu.Unlock() + continue + } + + delete(p.undelivered, info.ID()) + p.mu.Unlock() + + if info.Delivered() { + ch <- true + } else { + ch <- false + log.Println("D! [inputs.cloud_pubsub_push] Metric group failed to process") + } + } + } +} + +func (p *PubSubPush) AuthenticateIfSet(handler http.HandlerFunc, res http.ResponseWriter, req *http.Request) { + if p.Token != "" { + if subtle.ConstantTimeCompare([]byte(req.FormValue("token")), []byte(p.Token)) != 1 { + http.Error(res, "Unauthorized.", http.StatusUnauthorized) + return + } + } + + handler(res, req) +} + +func init() { + inputs.Add("cloud_pubsub_push", func() telegraf.Input { + return &PubSubPush{ + ServiceAddress: ":8080", + Path: "/", + MaxUndeliveredMessages: defaultMaxUndeliveredMessages, + } + }) +} diff --git a/plugins/inputs/cloud_pubsub_push/pubsub_push_test.go b/plugins/inputs/cloud_pubsub_push/pubsub_push_test.go new file mode 100644 index 000000000..57734c705 --- /dev/null +++ b/plugins/inputs/cloud_pubsub_push/pubsub_push_test.go @@ -0,0 +1,216 @@ +package cloud_pubsub_push + +import ( + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/agent" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/models" + "github.com/influxdata/telegraf/plugins/parsers" +) + +func TestServeHTTP(t *testing.T) { + tests := []struct { + name string + method string + path string + body io.Reader + status int + maxsize int64 + expected string + fail bool + full bool + }{ + { + name: "bad method get", + method: "GET", + path: "/", + status: http.StatusMethodNotAllowed, + }, + { + name: "post not found", + method: "POST", + path: "/allthings", + status: http.StatusNotFound, + }, + { + name: "post large date", + method: "POST", + path: "/", + status: http.StatusRequestEntityTooLarge, + body: strings.NewReader(`{"message":{"attributes":{"deviceId":"myPi","deviceNumId":"2808946627307959","deviceRegistryId":"my-registry","deviceRegistryLocation":"us-central1","projectId":"conference-demos","subFolder":""},"data":"dGVzdGluZ0dvb2dsZSxzZW5zb3I9Ym1lXzI4MCB0ZW1wX2M9MjMuOTUsaHVtaWRpdHk9NjIuODMgMTUzNjk1Mjk3NDU1MzUxMDIzMQ==","messageId":"204004313210337","message_id":"204004313210337","publishTime":"2018-09-14T19:22:54.587Z","publish_time":"2018-09-14T19:22:54.587Z"},"subscription":"projects/conference-demos/subscriptions/my-subscription"}`), + }, + { + name: "post valid data", + method: "POST", + path: "/", + maxsize: 500 * 1024 * 1024, + status: http.StatusNoContent, + body: strings.NewReader(`{"message":{"attributes":{"deviceId":"myPi","deviceNumId":"2808946627307959","deviceRegistryId":"my-registry","deviceRegistryLocation":"us-central1","projectId":"conference-demos","subFolder":""},"data":"dGVzdGluZ0dvb2dsZSxzZW5zb3I9Ym1lXzI4MCB0ZW1wX2M9MjMuOTUsaHVtaWRpdHk9NjIuODMgMTUzNjk1Mjk3NDU1MzUxMDIzMQ==","messageId":"204004313210337","message_id":"204004313210337","publishTime":"2018-09-14T19:22:54.587Z","publish_time":"2018-09-14T19:22:54.587Z"},"subscription":"projects/conference-demos/subscriptions/my-subscription"}`), + }, + { + name: "fail write", + method: "POST", + path: "/", + maxsize: 500 * 1024 * 1024, + status: http.StatusServiceUnavailable, + body: strings.NewReader(`{"message":{"attributes":{"deviceId":"myPi","deviceNumId":"2808946627307959","deviceRegistryId":"my-registry","deviceRegistryLocation":"us-central1","projectId":"conference-demos","subFolder":""},"data":"dGVzdGluZ0dvb2dsZSxzZW5zb3I9Ym1lXzI4MCB0ZW1wX2M9MjMuOTUsaHVtaWRpdHk9NjIuODMgMTUzNjk1Mjk3NDU1MzUxMDIzMQ==","messageId":"204004313210337","message_id":"204004313210337","publishTime":"2018-09-14T19:22:54.587Z","publish_time":"2018-09-14T19:22:54.587Z"},"subscription":"projects/conference-demos/subscriptions/my-subscription"}`), + fail: true, + }, + { + name: "full buffer", + method: "POST", + path: "/", + maxsize: 500 * 1024 * 1024, + status: http.StatusServiceUnavailable, + body: strings.NewReader(`{"message":{"attributes":{"deviceId":"myPi","deviceNumId":"2808946627307959","deviceRegistryId":"my-registry","deviceRegistryLocation":"us-central1","projectId":"conference-demos","subFolder":""},"data":"dGVzdGluZ0dvb2dsZSxzZW5zb3I9Ym1lXzI4MCB0ZW1wX2M9MjMuOTUsaHVtaWRpdHk9NjIuODMgMTUzNjk1Mjk3NDU1MzUxMDIzMQ==","messageId":"204004313210337","message_id":"204004313210337","publishTime":"2018-09-14T19:22:54.587Z","publish_time":"2018-09-14T19:22:54.587Z"},"subscription":"projects/conference-demos/subscriptions/my-subscription"}`), + full: true, + }, + { + name: "post invalid body", + method: "POST", + path: "/", + maxsize: 500 * 1024 * 1024, + status: http.StatusBadRequest, + body: strings.NewReader(`invalid body`), + }, + { + name: "post invalid data", + method: "POST", + path: "/", + maxsize: 500 * 1024 * 1024, + status: http.StatusBadRequest, + body: strings.NewReader(`{"message":{"attributes":{"deviceId":"myPi","deviceNumId":"2808946627307959","deviceRegistryId":"my-registry","deviceRegistryLocation":"us-central1","projectId":"conference-demos","subFolder":""},"data":"not base 64 encoded data","messageId":"204004313210337","message_id":"204004313210337","publishTime":"2018-09-14T19:22:54.587Z","publish_time":"2018-09-14T19:22:54.587Z"},"subscription":"projects/conference-demos/subscriptions/my-subscription"}`), + }, + { + name: "post invalid data format", + method: "POST", + path: "/", + maxsize: 500 * 1024 * 1024, + status: http.StatusBadRequest, + body: strings.NewReader(`{"message":{"attributes":{"deviceId":"myPi","deviceNumId":"2808946627307959","deviceRegistryId":"my-registry","deviceRegistryLocation":"us-central1","projectId":"conference-demos","subFolder":""},"data":"bm90IHZhbGlkIGZvcm1hdHRlZCBkYXRh","messageId":"204004313210337","message_id":"204004313210337","publishTime":"2018-09-14T19:22:54.587Z","publish_time":"2018-09-14T19:22:54.587Z"},"subscription":"projects/conference-demos/subscriptions/my-subscription"}`), + }, + { + name: "post invalid structured body", + method: "POST", + path: "/", + maxsize: 500 * 1024 * 1024, + status: http.StatusBadRequest, + body: strings.NewReader(`{"message":{"attributes":{"thing":1},"data":"bm90IHZhbGlkIGZvcm1hdHRlZCBkYXRh"},"subscription":"projects/conference-demos/subscriptions/my-subscription"}`), + }, + } + + for _, test := range tests { + wg := &sync.WaitGroup{} + req, err := http.NewRequest(test.method, test.path, test.body) + require.NoError(t, err) + + rr := httptest.NewRecorder() + pubPush := &PubSubPush{ + Path: "/", + MaxBodySize: internal.Size{ + Size: test.maxsize, + }, + sem: make(chan struct{}, 1), + undelivered: make(map[telegraf.TrackingID]chan bool), + mu: &sync.Mutex{}, + WriteTimeout: internal.Duration{Duration: time.Second * 1}, + } + + pubPush.ctx, pubPush.cancel = context.WithCancel(context.Background()) + + if test.full { + // fill buffer with fake message + pubPush.sem <- struct{}{} + } + + p, _ := parsers.NewParser(&parsers.Config{ + MetricName: "cloud_pubsub_push", + DataFormat: "influx", + }) + pubPush.SetParser(p) + + dst := make(chan telegraf.Metric, 1) + ro := models.NewRunningOutput("test", &testOutput{failWrite: test.fail}, &models.OutputConfig{}, 1, 1) + pubPush.acc = agent.NewAccumulator(&testMetricMaker{}, dst).WithTracking(1) + + wg.Add(1) + go func() { + defer wg.Done() + pubPush.receiveDelivered() + }() + + wg.Add(1) + go func(status int, d chan telegraf.Metric) { + defer wg.Done() + for m := range d { + ro.AddMetric(m) + ro.Write() + } + }(test.status, dst) + + ctx, cancel := context.WithTimeout(req.Context(), pubPush.WriteTimeout.Duration) + req = req.WithContext(ctx) + + pubPush.ServeHTTP(rr, req) + require.Equal(t, test.status, rr.Code, test.name) + + if test.expected != "" { + require.Equal(t, test.expected, rr.Body.String(), test.name) + } + + pubPush.cancel() + cancel() + close(dst) + wg.Wait() + } +} + +type testMetricMaker struct{} + +func (tm *testMetricMaker) Name() string { + return "TestPlugin" +} + +func (tm *testMetricMaker) MakeMetric(metric telegraf.Metric) telegraf.Metric { + return metric +} + +type testOutput struct { + // if true, mock a write failure + failWrite bool +} + +func (*testOutput) Connect() error { + return nil +} + +func (*testOutput) Close() error { + return nil +} + +func (*testOutput) Description() string { + return "" +} + +func (*testOutput) SampleConfig() string { + return "" +} + +func (t *testOutput) Write(metrics []telegraf.Metric) error { + if t.failWrite { + return fmt.Errorf("failed write") + } + return nil +}