324 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			324 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Go
		
	
	
	
| package cloud_pubsub_push
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"crypto/subtle"
 | |
| 	"encoding/base64"
 | |
| 	"encoding/json"
 | |
| 	"io/ioutil"
 | |
| 	"net"
 | |
| 	"net/http"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/influxdata/telegraf"
 | |
| 	"github.com/influxdata/telegraf/internal"
 | |
| 	tlsint "github.com/influxdata/telegraf/internal/tls"
 | |
| 	"github.com/influxdata/telegraf/plugins/inputs"
 | |
| 	"github.com/influxdata/telegraf/plugins/parsers"
 | |
| )
 | |
| 
 | |
| // defaultMaxBodySize is the default maximum request body size, in bytes.
 | |
| // if the request body is over this size, we will return an HTTP 413 error.
 | |
| // 500 MB
 | |
| const defaultMaxBodySize = 500 * 1024 * 1024
 | |
| const defaultMaxUndeliveredMessages = 1000
 | |
| 
 | |
| type PubSubPush struct {
 | |
| 	ServiceAddress string
 | |
| 	Token          string
 | |
| 	Path           string
 | |
| 	ReadTimeout    internal.Duration
 | |
| 	WriteTimeout   internal.Duration
 | |
| 	MaxBodySize    internal.Size
 | |
| 	AddMeta        bool
 | |
| 	Log            telegraf.Logger
 | |
| 
 | |
| 	MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
 | |
| 
 | |
| 	tlsint.ServerConfig
 | |
| 	parsers.Parser
 | |
| 
 | |
| 	listener net.Listener
 | |
| 	server   *http.Server
 | |
| 	acc      telegraf.TrackingAccumulator
 | |
| 	ctx      context.Context
 | |
| 	cancel   context.CancelFunc
 | |
| 	wg       *sync.WaitGroup
 | |
| 	mu       *sync.Mutex
 | |
| 
 | |
| 	undelivered map[telegraf.TrackingID]chan bool
 | |
| 	sem         chan struct{}
 | |
| }
 | |
| 
 | |
| // Message defines the structure of a Google Pub/Sub message.
 | |
| type Message struct {
 | |
| 	Atts map[string]string `json:"attributes"`
 | |
| 	Data string            `json:"data"` // Data is base64 encoded data
 | |
| }
 | |
| 
 | |
| // Payload is the received Google Pub/Sub data. (https://cloud.google.com/pubsub/docs/push)
 | |
| type Payload struct {
 | |
| 	Msg          Message `json:"message"`
 | |
| 	Subscription string  `json:"subscription"`
 | |
| }
 | |
| 
 | |
| const sampleConfig = `
 | |
|   ## Address and port to host HTTP listener on
 | |
|   service_address = ":8080"
 | |
| 
 | |
|   ## Application secret to verify messages originate from Cloud Pub/Sub
 | |
|   # token = ""
 | |
| 
 | |
|   ## Path to listen to.
 | |
|   # path = "/"
 | |
| 
 | |
|   ## Maximum duration before timing out read of the request
 | |
|   # read_timeout = "10s"
 | |
|   ## Maximum duration before timing out write of the response. This should be set to a value
 | |
|   ## large enough that you can send at least 'metric_batch_size' number of messages within the
 | |
|   ## duration.
 | |
|   # write_timeout = "10s"
 | |
| 
 | |
|   ## Maximum allowed http request body size in bytes.
 | |
|   ## 0 means to use the default of 524,288,00 bytes (500 mebibytes)
 | |
|   # max_body_size = "500MB"
 | |
| 
 | |
|   ## Whether to add the pubsub metadata, such as message attributes and subscription as a tag.
 | |
|   # add_meta = false
 | |
| 
 | |
|   ## Optional. Maximum messages to read from PubSub that have not been written
 | |
|   ## to an output. Defaults to 1000.
 | |
|   ## For best throughput set based on the number of metrics within
 | |
|   ## each message and the size of the output's metric_batch_size.
 | |
|   ##
 | |
|   ## For example, if each message contains 10 metrics and the output
 | |
|   ## metric_batch_size is 1000, setting this to 100 will ensure that a
 | |
|   ## full batch is collected and the write is triggered immediately without
 | |
|   ## waiting until the next flush_interval.
 | |
|   # max_undelivered_messages = 1000
 | |
| 
 | |
|   ## Set one or more allowed client CA certificate file names to
 | |
|   ## enable mutually authenticated TLS connections
 | |
|   # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
 | |
| 
 | |
|   ## Add service certificate and key
 | |
|   # tls_cert = "/etc/telegraf/cert.pem"
 | |
|   # tls_key = "/etc/telegraf/key.pem"
 | |
| 
 | |
|   ## Data format to consume.
 | |
|   ## Each data format has its own unique set of configuration options, read
 | |
|   ## more about them here:
 | |
|   ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
 | |
|   data_format = "influx"
 | |
| `
 | |
| 
 | |
| func (p *PubSubPush) SampleConfig() string {
 | |
| 	return sampleConfig
 | |
| }
 | |
| 
 | |
| func (p *PubSubPush) Description() string {
 | |
| 	return "Google Cloud Pub/Sub Push HTTP listener"
 | |
| }
 | |
| 
 | |
| func (p *PubSubPush) Gather(_ telegraf.Accumulator) error {
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (p *PubSubPush) SetParser(parser parsers.Parser) {
 | |
| 	p.Parser = parser
 | |
| }
 | |
| 
 | |
| // Start starts the http listener service.
 | |
| func (p *PubSubPush) Start(acc telegraf.Accumulator) error {
 | |
| 	if p.MaxBodySize.Size == 0 {
 | |
| 		p.MaxBodySize.Size = defaultMaxBodySize
 | |
| 	}
 | |
| 
 | |
| 	if p.ReadTimeout.Duration < time.Second {
 | |
| 		p.ReadTimeout.Duration = time.Second * 10
 | |
| 	}
 | |
| 	if p.WriteTimeout.Duration < time.Second {
 | |
| 		p.WriteTimeout.Duration = time.Second * 10
 | |
| 	}
 | |
| 
 | |
| 	tlsConf, err := p.ServerConfig.TLSConfig()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	p.server = &http.Server{
 | |
| 		Addr:        p.ServiceAddress,
 | |
| 		Handler:     http.TimeoutHandler(p, p.WriteTimeout.Duration, "timed out processing metric"),
 | |
| 		ReadTimeout: p.ReadTimeout.Duration,
 | |
| 		TLSConfig:   tlsConf,
 | |
| 	}
 | |
| 
 | |
| 	p.ctx, p.cancel = context.WithCancel(context.Background())
 | |
| 	p.wg = &sync.WaitGroup{}
 | |
| 	p.acc = acc.WithTracking(p.MaxUndeliveredMessages)
 | |
| 	p.sem = make(chan struct{}, p.MaxUndeliveredMessages)
 | |
| 	p.undelivered = make(map[telegraf.TrackingID]chan bool)
 | |
| 	p.mu = &sync.Mutex{}
 | |
| 
 | |
| 	p.wg.Add(1)
 | |
| 	go func() {
 | |
| 		defer p.wg.Done()
 | |
| 		p.receiveDelivered()
 | |
| 	}()
 | |
| 
 | |
| 	p.wg.Add(1)
 | |
| 	go func() {
 | |
| 		defer p.wg.Done()
 | |
| 		if tlsConf != nil {
 | |
| 			p.server.ListenAndServeTLS("", "")
 | |
| 		} else {
 | |
| 			p.server.ListenAndServe()
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Stop cleans up all resources
 | |
| func (p *PubSubPush) Stop() {
 | |
| 	p.cancel()
 | |
| 	p.server.Shutdown(p.ctx)
 | |
| 	p.wg.Wait()
 | |
| }
 | |
| 
 | |
| func (p *PubSubPush) ServeHTTP(res http.ResponseWriter, req *http.Request) {
 | |
| 	if req.URL.Path == p.Path {
 | |
| 		p.AuthenticateIfSet(p.serveWrite, res, req)
 | |
| 	} else {
 | |
| 		p.AuthenticateIfSet(http.NotFound, res, req)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (p *PubSubPush) serveWrite(res http.ResponseWriter, req *http.Request) {
 | |
| 	select {
 | |
| 	case <-req.Context().Done():
 | |
| 		res.WriteHeader(http.StatusServiceUnavailable)
 | |
| 		return
 | |
| 	case <-p.ctx.Done():
 | |
| 		res.WriteHeader(http.StatusServiceUnavailable)
 | |
| 		return
 | |
| 	case p.sem <- struct{}{}:
 | |
| 		break
 | |
| 	}
 | |
| 
 | |
| 	// Check that the content length is not too large for us to handle.
 | |
| 	if req.ContentLength > p.MaxBodySize.Size {
 | |
| 		res.WriteHeader(http.StatusRequestEntityTooLarge)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if req.Method != http.MethodPost {
 | |
| 		res.WriteHeader(http.StatusMethodNotAllowed)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	body := http.MaxBytesReader(res, req.Body, p.MaxBodySize.Size)
 | |
| 	bytes, err := ioutil.ReadAll(body)
 | |
| 	if err != nil {
 | |
| 		res.WriteHeader(http.StatusRequestEntityTooLarge)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	var payload Payload
 | |
| 	if err = json.Unmarshal(bytes, &payload); err != nil {
 | |
| 		p.Log.Errorf("Error decoding payload %s", err.Error())
 | |
| 		res.WriteHeader(http.StatusBadRequest)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	sDec, err := base64.StdEncoding.DecodeString(payload.Msg.Data)
 | |
| 	if err != nil {
 | |
| 		p.Log.Errorf("Base64-decode failed %s", err.Error())
 | |
| 		res.WriteHeader(http.StatusBadRequest)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	metrics, err := p.Parse(sDec)
 | |
| 	if err != nil {
 | |
| 		p.Log.Debug(err.Error())
 | |
| 		res.WriteHeader(http.StatusBadRequest)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if p.AddMeta {
 | |
| 		for i := range metrics {
 | |
| 			for k, v := range payload.Msg.Atts {
 | |
| 				metrics[i].AddTag(k, v)
 | |
| 			}
 | |
| 			metrics[i].AddTag("subscription", payload.Subscription)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	ch := make(chan bool, 1)
 | |
| 	p.mu.Lock()
 | |
| 	p.undelivered[p.acc.AddTrackingMetricGroup(metrics)] = ch
 | |
| 	p.mu.Unlock()
 | |
| 
 | |
| 	select {
 | |
| 	case <-req.Context().Done():
 | |
| 		res.WriteHeader(http.StatusServiceUnavailable)
 | |
| 		return
 | |
| 	case success := <-ch:
 | |
| 		if success {
 | |
| 			res.WriteHeader(http.StatusNoContent)
 | |
| 		} else {
 | |
| 			res.WriteHeader(http.StatusInternalServerError)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (p *PubSubPush) receiveDelivered() {
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-p.ctx.Done():
 | |
| 			return
 | |
| 		case info := <-p.acc.Delivered():
 | |
| 			<-p.sem
 | |
| 
 | |
| 			p.mu.Lock()
 | |
| 			ch, ok := p.undelivered[info.ID()]
 | |
| 			if !ok {
 | |
| 				p.mu.Unlock()
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			delete(p.undelivered, info.ID())
 | |
| 			p.mu.Unlock()
 | |
| 
 | |
| 			if info.Delivered() {
 | |
| 				ch <- true
 | |
| 			} else {
 | |
| 				ch <- false
 | |
| 				p.Log.Debug("Metric group failed to process")
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (p *PubSubPush) AuthenticateIfSet(handler http.HandlerFunc, res http.ResponseWriter, req *http.Request) {
 | |
| 	if p.Token != "" {
 | |
| 		if subtle.ConstantTimeCompare([]byte(req.FormValue("token")), []byte(p.Token)) != 1 {
 | |
| 			http.Error(res, "Unauthorized.", http.StatusUnauthorized)
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	handler(res, req)
 | |
| }
 | |
| 
 | |
| func init() {
 | |
| 	inputs.Add("cloud_pubsub_push", func() telegraf.Input {
 | |
| 		return &PubSubPush{
 | |
| 			ServiceAddress:         ":8080",
 | |
| 			Path:                   "/",
 | |
| 			MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
 | |
| 		}
 | |
| 	})
 | |
| }
 |