nats_consumer: buffer incoming messages

fixes #1956
This commit is contained in:
Cameron Sparr 2016-10-26 16:38:56 +01:00
parent fc59757a1a
commit 2150510bd4
2 changed files with 43 additions and 14 deletions

View File

@ -28,12 +28,17 @@ type natsConsumer struct {
Servers []string Servers []string
Secure bool Secure bool
// Client pending limits:
PendingMessageLimit int
PendingBytesLimit int
// Legacy metric buffer support // Legacy metric buffer support
MetricBuffer int MetricBuffer int
parser parsers.Parser parser parsers.Parser
sync.Mutex sync.Mutex
wg sync.WaitGroup
Conn *nats.Conn Conn *nats.Conn
Subs []*nats.Subscription Subs []*nats.Subscription
@ -47,13 +52,18 @@ type natsConsumer struct {
var sampleConfig = ` var sampleConfig = `
## urls of NATS servers ## urls of NATS servers
servers = ["nats://localhost:4222"] # servers = ["nats://localhost:4222"]
## Use Transport Layer Security ## Use Transport Layer Security
secure = false # secure = false
## subject(s) to consume ## subject(s) to consume
subjects = ["telegraf"] # subjects = ["telegraf"]
## name a queue group ## name a queue group
queue_group = "telegraf_consumers" # queue_group = "telegraf_consumers"
## Sets the limits for pending msgs and bytes for each subscription
## These shouldn't need to be adjusted except in very high throughput scenarios
# pending_message_limit = 65536
# pending_bytes_limit = 67108864
## Data format to consume. ## Data format to consume.
## Each data format has it's own unique set of configuration options, read ## Each data format has it's own unique set of configuration options, read
@ -112,12 +122,22 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
n.errs = make(chan error) n.errs = make(chan error)
n.Conn.SetErrorHandler(n.natsErrHandler) n.Conn.SetErrorHandler(n.natsErrHandler)
n.in = make(chan *nats.Msg) n.in = make(chan *nats.Msg, 1000)
for _, subj := range n.Subjects { for _, subj := range n.Subjects {
sub, err := n.Conn.ChanQueueSubscribe(subj, n.QueueGroup, n.in) sub, err := n.Conn.QueueSubscribe(subj, n.QueueGroup, func(m *nats.Msg) {
n.in <- m
})
if err != nil { if err != nil {
return err return err
} }
// ensure that the subscription has been processed by the server
if err = n.Conn.Flush(); err != nil {
return err
}
// set the subscription pending limits
if err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit); err != nil {
return err
}
n.Subs = append(n.Subs, sub) n.Subs = append(n.Subs, sub)
} }
} }
@ -125,6 +145,7 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
n.done = make(chan struct{}) n.done = make(chan struct{})
// Start the message reader // Start the message reader
n.wg.Add(1)
go n.receiver() go n.receiver()
log.Printf("I! Started the NATS consumer service, nats: %v, subjects: %v, queue: %v\n", log.Printf("I! Started the NATS consumer service, nats: %v, subjects: %v, queue: %v\n",
n.Conn.ConnectedUrl(), n.Subjects, n.QueueGroup) n.Conn.ConnectedUrl(), n.Subjects, n.QueueGroup)
@ -135,7 +156,7 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
// receiver() reads all incoming messages from NATS, and parses them into // receiver() reads all incoming messages from NATS, and parses them into
// telegraf metrics. // telegraf metrics.
func (n *natsConsumer) receiver() { func (n *natsConsumer) receiver() {
defer n.clean() defer n.wg.Done()
for { for {
select { select {
case <-n.done: case <-n.done:
@ -151,17 +172,11 @@ func (n *natsConsumer) receiver() {
for _, metric := range metrics { for _, metric := range metrics {
n.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) n.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
} }
} }
} }
} }
func (n *natsConsumer) clean() { func (n *natsConsumer) clean() {
n.Lock()
defer n.Unlock()
close(n.in)
close(n.errs)
for _, sub := range n.Subs { for _, sub := range n.Subs {
if err := sub.Unsubscribe(); err != nil { if err := sub.Unsubscribe(); err != nil {
log.Printf("E! Error unsubscribing from subject %s in queue %s: %s\n", log.Printf("E! Error unsubscribing from subject %s in queue %s: %s\n",
@ -177,6 +192,8 @@ func (n *natsConsumer) clean() {
func (n *natsConsumer) Stop() { func (n *natsConsumer) Stop() {
n.Lock() n.Lock()
close(n.done) close(n.done)
n.wg.Wait()
n.clean()
n.Unlock() n.Unlock()
} }
@ -186,6 +203,13 @@ func (n *natsConsumer) Gather(acc telegraf.Accumulator) error {
func init() { func init() {
inputs.Add("nats_consumer", func() telegraf.Input { inputs.Add("nats_consumer", func() telegraf.Input {
return &natsConsumer{} return &natsConsumer{
Servers: []string{"nats://localhost:4222"},
Secure: false,
Subjects: []string{"telegraf"},
QueueGroup: "telegraf_consumers",
PendingBytesLimit: nats.DefaultSubPendingBytesLimit,
PendingMessageLimit: nats.DefaultSubPendingMsgsLimit,
}
}) })
} }

View File

@ -39,6 +39,7 @@ func TestRunParser(t *testing.T) {
defer close(n.done) defer close(n.done)
n.parser, _ = parsers.NewInfluxParser() n.parser, _ = parsers.NewInfluxParser()
n.wg.Add(1)
go n.receiver() go n.receiver()
in <- natsMsg(testMsg) in <- natsMsg(testMsg)
time.Sleep(time.Millisecond * 25) time.Sleep(time.Millisecond * 25)
@ -56,6 +57,7 @@ func TestRunParserInvalidMsg(t *testing.T) {
defer close(n.done) defer close(n.done)
n.parser, _ = parsers.NewInfluxParser() n.parser, _ = parsers.NewInfluxParser()
n.wg.Add(1)
go n.receiver() go n.receiver()
in <- natsMsg(invalidMsg) in <- natsMsg(invalidMsg)
time.Sleep(time.Millisecond * 25) time.Sleep(time.Millisecond * 25)
@ -73,6 +75,7 @@ func TestRunParserAndGather(t *testing.T) {
defer close(n.done) defer close(n.done)
n.parser, _ = parsers.NewInfluxParser() n.parser, _ = parsers.NewInfluxParser()
n.wg.Add(1)
go n.receiver() go n.receiver()
in <- natsMsg(testMsg) in <- natsMsg(testMsg)
time.Sleep(time.Millisecond * 25) time.Sleep(time.Millisecond * 25)
@ -91,6 +94,7 @@ func TestRunParserAndGatherGraphite(t *testing.T) {
defer close(n.done) defer close(n.done)
n.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) n.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
n.wg.Add(1)
go n.receiver() go n.receiver()
in <- natsMsg(testMsgGraphite) in <- natsMsg(testMsgGraphite)
time.Sleep(time.Millisecond * 25) time.Sleep(time.Millisecond * 25)
@ -109,6 +113,7 @@ func TestRunParserAndGatherJSON(t *testing.T) {
defer close(n.done) defer close(n.done)
n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil) n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
n.wg.Add(1)
go n.receiver() go n.receiver()
in <- natsMsg(testMsgJSON) in <- natsMsg(testMsgJSON)
time.Sleep(time.Millisecond * 25) time.Sleep(time.Millisecond * 25)