diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 6d59cce28..cbb85e016 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -28,12 +28,17 @@ type natsConsumer struct { Servers []string Secure bool + // Client pending limits: + PendingMessageLimit int + PendingBytesLimit int + // Legacy metric buffer support MetricBuffer int parser parsers.Parser sync.Mutex + wg sync.WaitGroup Conn *nats.Conn Subs []*nats.Subscription @@ -47,13 +52,18 @@ type natsConsumer struct { var sampleConfig = ` ## urls of NATS servers - servers = ["nats://localhost:4222"] + # servers = ["nats://localhost:4222"] ## Use Transport Layer Security - secure = false + # secure = false ## subject(s) to consume - subjects = ["telegraf"] + # subjects = ["telegraf"] ## name a queue group - queue_group = "telegraf_consumers" + # queue_group = "telegraf_consumers" + + ## Sets the limits for pending msgs and bytes for each subscription + ## These shouldn't need to be adjusted except in very high throughput scenarios + # pending_message_limit = 65536 + # pending_bytes_limit = 67108864 ## Data format to consume. ## Each data format has it's own unique set of configuration options, read @@ -112,12 +122,22 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error { n.errs = make(chan error) n.Conn.SetErrorHandler(n.natsErrHandler) - n.in = make(chan *nats.Msg) + n.in = make(chan *nats.Msg, 1000) for _, subj := range n.Subjects { - sub, err := n.Conn.ChanQueueSubscribe(subj, n.QueueGroup, n.in) + sub, err := n.Conn.QueueSubscribe(subj, n.QueueGroup, func(m *nats.Msg) { + n.in <- m + }) if err != nil { return err } + // ensure that the subscription has been processed by the server + if err = n.Conn.Flush(); err != nil { + return err + } + // set the subscription pending limits + if err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit); err != nil { + return err + } n.Subs = append(n.Subs, sub) } } @@ -125,6 +145,7 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error { n.done = make(chan struct{}) // Start the message reader + n.wg.Add(1) go n.receiver() log.Printf("I! Started the NATS consumer service, nats: %v, subjects: %v, queue: %v\n", n.Conn.ConnectedUrl(), n.Subjects, n.QueueGroup) @@ -135,7 +156,7 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error { // receiver() reads all incoming messages from NATS, and parses them into // telegraf metrics. func (n *natsConsumer) receiver() { - defer n.clean() + defer n.wg.Done() for { select { case <-n.done: @@ -151,17 +172,11 @@ func (n *natsConsumer) receiver() { for _, metric := range metrics { n.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) } - } } } func (n *natsConsumer) clean() { - n.Lock() - defer n.Unlock() - close(n.in) - close(n.errs) - for _, sub := range n.Subs { if err := sub.Unsubscribe(); err != nil { log.Printf("E! Error unsubscribing from subject %s in queue %s: %s\n", @@ -177,6 +192,8 @@ func (n *natsConsumer) clean() { func (n *natsConsumer) Stop() { n.Lock() close(n.done) + n.wg.Wait() + n.clean() n.Unlock() } @@ -186,6 +203,13 @@ func (n *natsConsumer) Gather(acc telegraf.Accumulator) error { func init() { inputs.Add("nats_consumer", func() telegraf.Input { - return &natsConsumer{} + return &natsConsumer{ + Servers: []string{"nats://localhost:4222"}, + Secure: false, + Subjects: []string{"telegraf"}, + QueueGroup: "telegraf_consumers", + PendingBytesLimit: nats.DefaultSubPendingBytesLimit, + PendingMessageLimit: nats.DefaultSubPendingMsgsLimit, + } }) } diff --git a/plugins/inputs/nats_consumer/nats_consumer_test.go b/plugins/inputs/nats_consumer/nats_consumer_test.go index 75fde66a6..206714b1a 100644 --- a/plugins/inputs/nats_consumer/nats_consumer_test.go +++ b/plugins/inputs/nats_consumer/nats_consumer_test.go @@ -39,6 +39,7 @@ func TestRunParser(t *testing.T) { defer close(n.done) n.parser, _ = parsers.NewInfluxParser() + n.wg.Add(1) go n.receiver() in <- natsMsg(testMsg) time.Sleep(time.Millisecond * 25) @@ -56,6 +57,7 @@ func TestRunParserInvalidMsg(t *testing.T) { defer close(n.done) n.parser, _ = parsers.NewInfluxParser() + n.wg.Add(1) go n.receiver() in <- natsMsg(invalidMsg) time.Sleep(time.Millisecond * 25) @@ -73,6 +75,7 @@ func TestRunParserAndGather(t *testing.T) { defer close(n.done) n.parser, _ = parsers.NewInfluxParser() + n.wg.Add(1) go n.receiver() in <- natsMsg(testMsg) time.Sleep(time.Millisecond * 25) @@ -91,6 +94,7 @@ func TestRunParserAndGatherGraphite(t *testing.T) { defer close(n.done) n.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) + n.wg.Add(1) go n.receiver() in <- natsMsg(testMsgGraphite) time.Sleep(time.Millisecond * 25) @@ -109,6 +113,7 @@ func TestRunParserAndGatherJSON(t *testing.T) { defer close(n.done) n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil) + n.wg.Add(1) go n.receiver() in <- natsMsg(testMsgJSON) time.Sleep(time.Millisecond * 25)