package natsconsumer import ( "fmt" "log" "sync" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" "github.com/nats-io/nats" ) type natsError struct { conn *nats.Conn sub *nats.Subscription err error } func (e natsError) Error() string { return fmt.Sprintf("%s url:%s id:%s sub:%s queue:%s", e.err.Error(), e.conn.ConnectedUrl(), e.conn.ConnectedServerId(), e.sub.Subject, e.sub.Queue) } type natsConsumer struct { QueueGroup string Subjects []string Servers []string Secure bool // Client pending limits: PendingMessageLimit int PendingBytesLimit int // Legacy metric buffer support MetricBuffer int parser parsers.Parser sync.Mutex wg sync.WaitGroup Conn *nats.Conn Subs []*nats.Subscription // channel for all incoming NATS messages in chan *nats.Msg // channel for all NATS read errors errs chan error done chan struct{} acc telegraf.Accumulator } var sampleConfig = ` ## urls of NATS servers # servers = ["nats://localhost:4222"] ## Use Transport Layer Security # secure = false ## subject(s) to consume # subjects = ["telegraf"] ## name a queue group # queue_group = "telegraf_consumers" ## Sets the limits for pending msgs and bytes for each subscription ## These shouldn't need to be adjusted except in very high throughput scenarios # pending_message_limit = 65536 # pending_bytes_limit = 67108864 ## Data format to consume. ## Each data format has it's own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "influx" ` func (n *natsConsumer) SampleConfig() string { return sampleConfig } func (n *natsConsumer) Description() string { return "Read metrics from NATS subject(s)" } func (n *natsConsumer) SetParser(parser parsers.Parser) { n.parser = parser } func (n *natsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e error) { select { case n.errs <- natsError{conn: c, sub: s, err: e}: default: return } } // Start the nats consumer. Caller must call *natsConsumer.Stop() to clean up. func (n *natsConsumer) Start(acc telegraf.Accumulator) error { n.Lock() defer n.Unlock() n.acc = acc var connectErr error // set default NATS connection options opts := nats.DefaultOptions // override max reconnection tries opts.MaxReconnect = -1 // override servers if any were specified opts.Servers = n.Servers opts.Secure = n.Secure if n.Conn == nil || n.Conn.IsClosed() { n.Conn, connectErr = opts.Connect() if connectErr != nil { return connectErr } // Setup message and error channels n.errs = make(chan error) n.Conn.SetErrorHandler(n.natsErrHandler) n.in = make(chan *nats.Msg, 1000) for _, subj := range n.Subjects { sub, err := n.Conn.QueueSubscribe(subj, n.QueueGroup, func(m *nats.Msg) { n.in <- m }) if err != nil { return err } // ensure that the subscription has been processed by the server if err = n.Conn.Flush(); err != nil { return err } // set the subscription pending limits if err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit); err != nil { return err } n.Subs = append(n.Subs, sub) } } n.done = make(chan struct{}) // Start the message reader n.wg.Add(1) go n.receiver() log.Printf("I! Started the NATS consumer service, nats: %v, subjects: %v, queue: %v\n", n.Conn.ConnectedUrl(), n.Subjects, n.QueueGroup) return nil } // receiver() reads all incoming messages from NATS, and parses them into // telegraf metrics. func (n *natsConsumer) receiver() { defer n.wg.Done() for { select { case <-n.done: return case err := <-n.errs: log.Printf("E! error reading from %s\n", err.Error()) case msg := <-n.in: metrics, err := n.parser.Parse(msg.Data) if err != nil { log.Printf("E! subject: %s, error: %s", msg.Subject, err.Error()) } for _, metric := range metrics { n.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) } } } } func (n *natsConsumer) clean() { for _, sub := range n.Subs { if err := sub.Unsubscribe(); err != nil { log.Printf("E! Error unsubscribing from subject %s in queue %s: %s\n", sub.Subject, sub.Queue, err.Error()) } } if n.Conn != nil && !n.Conn.IsClosed() { n.Conn.Close() } } func (n *natsConsumer) Stop() { n.Lock() close(n.done) n.wg.Wait() n.clean() n.Unlock() } func (n *natsConsumer) Gather(acc telegraf.Accumulator) error { return nil } func init() { inputs.Add("nats_consumer", func() telegraf.Input { return &natsConsumer{ Servers: []string{"nats://localhost:4222"}, Secure: false, Subjects: []string{"telegraf"}, QueueGroup: "telegraf_consumers", PendingBytesLimit: nats.DefaultSubPendingBytesLimit, PendingMessageLimit: nats.DefaultSubPendingMsgsLimit, } }) }