2016-02-10 23:28:52 +00:00
|
|
|
package natsconsumer
|
|
|
|
|
|
|
|
import (
|
2018-11-05 21:34:28 +00:00
|
|
|
"context"
|
2016-02-10 23:28:52 +00:00
|
|
|
"fmt"
|
2020-02-20 22:30:04 +00:00
|
|
|
"strings"
|
2016-02-10 23:28:52 +00:00
|
|
|
"sync"
|
|
|
|
|
|
|
|
"github.com/influxdata/telegraf"
|
2019-08-02 20:10:14 +00:00
|
|
|
"github.com/influxdata/telegraf/internal/tls"
|
2016-02-10 23:28:52 +00:00
|
|
|
"github.com/influxdata/telegraf/plugins/inputs"
|
|
|
|
"github.com/influxdata/telegraf/plugins/parsers"
|
2020-02-20 22:30:04 +00:00
|
|
|
"github.com/nats-io/nats.go"
|
2016-02-10 23:28:52 +00:00
|
|
|
)
|
|
|
|
|
2018-11-05 21:34:28 +00:00
|
|
|
var (
|
|
|
|
defaultMaxUndeliveredMessages = 1000
|
|
|
|
)
|
|
|
|
|
|
|
|
type empty struct{}
|
|
|
|
type semaphore chan empty
|
|
|
|
|
2016-02-10 23:28:52 +00:00
|
|
|
type natsError struct {
|
|
|
|
conn *nats.Conn
|
|
|
|
sub *nats.Subscription
|
|
|
|
err error
|
|
|
|
}
|
|
|
|
|
|
|
|
func (e natsError) Error() string {
|
|
|
|
return fmt.Sprintf("%s url:%s id:%s sub:%s queue:%s",
|
|
|
|
e.err.Error(), e.conn.ConnectedUrl(), e.conn.ConnectedServerId(), e.sub.Subject, e.sub.Queue)
|
|
|
|
}
|
|
|
|
|
|
|
|
type natsConsumer struct {
|
2020-02-20 22:30:04 +00:00
|
|
|
QueueGroup string `toml:"queue_group"`
|
|
|
|
Subjects []string `toml:"subjects"`
|
|
|
|
Servers []string `toml:"servers"`
|
|
|
|
Secure bool `toml:"secure"`
|
|
|
|
Username string `toml:"username"`
|
|
|
|
Password string `toml:"password"`
|
|
|
|
Credentials string `toml:"credentials"`
|
|
|
|
|
2019-08-02 20:10:14 +00:00
|
|
|
tls.ClientConfig
|
2016-02-10 23:28:52 +00:00
|
|
|
|
2019-09-23 22:39:50 +00:00
|
|
|
Log telegraf.Logger
|
|
|
|
|
2016-10-26 15:38:56 +00:00
|
|
|
// Client pending limits:
|
2018-11-05 21:34:28 +00:00
|
|
|
PendingMessageLimit int `toml:"pending_message_limit"`
|
|
|
|
PendingBytesLimit int `toml:"pending_bytes_limit"`
|
|
|
|
|
|
|
|
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
|
2016-10-26 15:38:56 +00:00
|
|
|
|
2018-10-19 22:46:20 +00:00
|
|
|
// Legacy metric buffer support; deprecated in v0.10.3
|
2016-02-12 10:05:33 +00:00
|
|
|
MetricBuffer int
|
2016-02-16 00:21:38 +00:00
|
|
|
|
2018-11-05 21:34:28 +00:00
|
|
|
conn *nats.Conn
|
|
|
|
subs []*nats.Subscription
|
2016-02-10 23:28:52 +00:00
|
|
|
|
2018-11-05 21:34:28 +00:00
|
|
|
parser parsers.Parser
|
2016-02-10 23:28:52 +00:00
|
|
|
// channel for all incoming NATS messages
|
|
|
|
in chan *nats.Msg
|
|
|
|
// channel for all NATS read errors
|
2018-11-05 21:34:28 +00:00
|
|
|
errs chan error
|
|
|
|
acc telegraf.TrackingAccumulator
|
|
|
|
wg sync.WaitGroup
|
|
|
|
cancel context.CancelFunc
|
2016-02-10 23:28:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
var sampleConfig = `
|
2016-02-18 21:26:51 +00:00
|
|
|
## urls of NATS servers
|
2018-11-05 21:34:28 +00:00
|
|
|
servers = ["nats://localhost:4222"]
|
2019-08-02 21:59:28 +00:00
|
|
|
|
2016-02-18 21:26:51 +00:00
|
|
|
## subject(s) to consume
|
2018-11-05 21:34:28 +00:00
|
|
|
subjects = ["telegraf"]
|
2019-09-23 22:39:50 +00:00
|
|
|
|
2016-02-18 21:26:51 +00:00
|
|
|
## name a queue group
|
2018-11-05 21:34:28 +00:00
|
|
|
queue_group = "telegraf_consumers"
|
2016-10-26 15:38:56 +00:00
|
|
|
|
2019-08-02 20:10:14 +00:00
|
|
|
## Optional credentials
|
|
|
|
# username = ""
|
|
|
|
# password = ""
|
|
|
|
|
2020-02-20 22:30:04 +00:00
|
|
|
## Optional NATS 2.0 and NATS NGS compatible user credentials
|
|
|
|
# credentials = "/etc/telegraf/nats.creds"
|
|
|
|
|
2019-08-02 21:59:28 +00:00
|
|
|
## Use Transport Layer Security
|
|
|
|
# secure = false
|
|
|
|
|
2019-08-02 20:10:14 +00:00
|
|
|
## Optional TLS Config
|
|
|
|
# tls_ca = "/etc/telegraf/ca.pem"
|
|
|
|
# tls_cert = "/etc/telegraf/cert.pem"
|
|
|
|
# tls_key = "/etc/telegraf/key.pem"
|
|
|
|
## Use TLS but skip chain & host verification
|
|
|
|
# insecure_skip_verify = false
|
|
|
|
|
2016-10-26 15:38:56 +00:00
|
|
|
## Sets the limits for pending msgs and bytes for each subscription
|
|
|
|
## These shouldn't need to be adjusted except in very high throughput scenarios
|
|
|
|
# pending_message_limit = 65536
|
|
|
|
# pending_bytes_limit = 67108864
|
2016-02-16 00:21:38 +00:00
|
|
|
|
2018-11-05 21:34:28 +00:00
|
|
|
## Maximum messages to read from the broker that have not been written by an
|
|
|
|
## output. For best throughput set based on the number of metrics within
|
|
|
|
## each message and the size of the output's metric_batch_size.
|
|
|
|
##
|
|
|
|
## For example, if each message from the queue contains 10 metrics and the
|
|
|
|
## output metric_batch_size is 1000, setting this to 100 will ensure that a
|
|
|
|
## full batch is collected and the write is triggered immediately without
|
|
|
|
## waiting until the next flush_interval.
|
|
|
|
# max_undelivered_messages = 1000
|
|
|
|
|
2016-03-31 23:50:24 +00:00
|
|
|
## Data format to consume.
|
2017-04-27 21:59:18 +00:00
|
|
|
## Each data format has its own unique set of configuration options, read
|
2016-02-18 21:26:51 +00:00
|
|
|
## more about them here:
|
|
|
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
2016-02-10 23:28:52 +00:00
|
|
|
data_format = "influx"
|
|
|
|
`
|
|
|
|
|
|
|
|
func (n *natsConsumer) SampleConfig() string {
|
|
|
|
return sampleConfig
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *natsConsumer) Description() string {
|
|
|
|
return "Read metrics from NATS subject(s)"
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *natsConsumer) SetParser(parser parsers.Parser) {
|
|
|
|
n.parser = parser
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *natsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e error) {
|
|
|
|
select {
|
|
|
|
case n.errs <- natsError{conn: c, sub: s, err: e}:
|
|
|
|
default:
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Start the nats consumer. Caller must call *natsConsumer.Stop() to clean up.
|
2016-02-16 00:21:38 +00:00
|
|
|
func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
|
2018-11-05 21:34:28 +00:00
|
|
|
n.acc = acc.WithTracking(n.MaxUndeliveredMessages)
|
2016-02-16 00:21:38 +00:00
|
|
|
|
2016-02-10 23:28:52 +00:00
|
|
|
var connectErr error
|
|
|
|
|
2020-02-20 22:30:04 +00:00
|
|
|
options := []nats.Option{
|
|
|
|
nats.MaxReconnects(-1),
|
|
|
|
nats.ErrorHandler(n.natsErrHandler),
|
|
|
|
}
|
2016-10-26 14:45:33 +00:00
|
|
|
|
2019-08-02 20:10:14 +00:00
|
|
|
// override authentication, if any was specified
|
2020-02-20 22:30:04 +00:00
|
|
|
if n.Username != "" && n.Password != "" {
|
|
|
|
options = append(options, nats.UserInfo(n.Username, n.Password))
|
|
|
|
}
|
|
|
|
|
|
|
|
if n.Credentials != "" {
|
|
|
|
options = append(options, nats.UserCredentials(n.Credentials))
|
2019-08-02 20:10:14 +00:00
|
|
|
}
|
|
|
|
|
2019-08-02 21:59:28 +00:00
|
|
|
if n.Secure {
|
|
|
|
tlsConfig, err := n.ClientConfig.TLSConfig()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-02-20 22:30:04 +00:00
|
|
|
options = append(options, nats.Secure(tlsConfig))
|
2019-08-02 20:10:14 +00:00
|
|
|
}
|
2016-02-10 23:28:52 +00:00
|
|
|
|
2018-11-05 21:34:28 +00:00
|
|
|
if n.conn == nil || n.conn.IsClosed() {
|
2020-02-20 22:30:04 +00:00
|
|
|
n.conn, connectErr = nats.Connect(strings.Join(n.Servers, ","), options...)
|
2016-02-10 23:28:52 +00:00
|
|
|
if connectErr != nil {
|
|
|
|
return connectErr
|
|
|
|
}
|
|
|
|
|
|
|
|
// Setup message and error channels
|
|
|
|
n.errs = make(chan error)
|
|
|
|
|
2016-10-26 15:38:56 +00:00
|
|
|
n.in = make(chan *nats.Msg, 1000)
|
2016-02-10 23:28:52 +00:00
|
|
|
for _, subj := range n.Subjects {
|
2018-11-05 21:34:28 +00:00
|
|
|
sub, err := n.conn.QueueSubscribe(subj, n.QueueGroup, func(m *nats.Msg) {
|
2016-10-26 15:38:56 +00:00
|
|
|
n.in <- m
|
|
|
|
})
|
2016-02-10 23:28:52 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2020-02-20 22:30:04 +00:00
|
|
|
|
2016-10-26 15:38:56 +00:00
|
|
|
// set the subscription pending limits
|
2020-02-20 22:30:04 +00:00
|
|
|
err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit)
|
|
|
|
if err != nil {
|
2016-10-26 15:38:56 +00:00
|
|
|
return err
|
|
|
|
}
|
2020-02-20 22:30:04 +00:00
|
|
|
|
2018-11-05 21:34:28 +00:00
|
|
|
n.subs = append(n.subs, sub)
|
2016-02-10 23:28:52 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-11-05 21:34:28 +00:00
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
n.cancel = cancel
|
2016-02-10 23:28:52 +00:00
|
|
|
|
|
|
|
// Start the message reader
|
2016-10-26 15:38:56 +00:00
|
|
|
n.wg.Add(1)
|
2018-11-05 21:34:28 +00:00
|
|
|
go func() {
|
|
|
|
defer n.wg.Done()
|
|
|
|
go n.receiver(ctx)
|
|
|
|
}()
|
|
|
|
|
2019-09-23 22:39:50 +00:00
|
|
|
n.Log.Infof("Started the NATS consumer service, nats: %v, subjects: %v, queue: %v",
|
2018-11-05 21:34:28 +00:00
|
|
|
n.conn.ConnectedUrl(), n.Subjects, n.QueueGroup)
|
2016-02-10 23:28:52 +00:00
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// receiver() reads all incoming messages from NATS, and parses them into
|
2016-02-12 10:05:33 +00:00
|
|
|
// telegraf metrics.
|
2018-11-05 21:34:28 +00:00
|
|
|
func (n *natsConsumer) receiver(ctx context.Context) {
|
|
|
|
sem := make(semaphore, n.MaxUndeliveredMessages)
|
|
|
|
|
2016-02-10 23:28:52 +00:00
|
|
|
for {
|
|
|
|
select {
|
2018-11-05 21:34:28 +00:00
|
|
|
case <-ctx.Done():
|
2016-02-10 23:28:52 +00:00
|
|
|
return
|
2018-11-05 21:34:28 +00:00
|
|
|
case <-n.acc.Delivered():
|
|
|
|
<-sem
|
2016-02-10 23:28:52 +00:00
|
|
|
case err := <-n.errs:
|
2019-09-23 22:39:50 +00:00
|
|
|
n.Log.Error(err)
|
2018-11-05 21:34:28 +00:00
|
|
|
case sem <- empty{}:
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
|
|
|
case err := <-n.errs:
|
|
|
|
<-sem
|
2019-09-23 22:39:50 +00:00
|
|
|
n.Log.Error(err)
|
2018-11-05 21:34:28 +00:00
|
|
|
case <-n.acc.Delivered():
|
|
|
|
<-sem
|
|
|
|
<-sem
|
|
|
|
case msg := <-n.in:
|
|
|
|
metrics, err := n.parser.Parse(msg.Data)
|
|
|
|
if err != nil {
|
2019-09-23 22:39:50 +00:00
|
|
|
n.Log.Errorf("Subject: %s, error: %s", msg.Subject, err.Error())
|
2018-11-05 21:34:28 +00:00
|
|
|
<-sem
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
n.acc.AddTrackingMetricGroup(metrics)
|
2016-02-10 23:28:52 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *natsConsumer) clean() {
|
2018-11-05 21:34:28 +00:00
|
|
|
for _, sub := range n.subs {
|
2016-02-10 23:28:52 +00:00
|
|
|
if err := sub.Unsubscribe(); err != nil {
|
2019-09-23 22:39:50 +00:00
|
|
|
n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s",
|
|
|
|
sub.Subject, sub.Queue, err.Error())
|
2016-02-10 23:28:52 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-11-05 21:34:28 +00:00
|
|
|
if n.conn != nil && !n.conn.IsClosed() {
|
|
|
|
n.conn.Close()
|
2016-02-10 23:28:52 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *natsConsumer) Stop() {
|
2018-11-05 21:34:28 +00:00
|
|
|
n.cancel()
|
2016-10-26 15:38:56 +00:00
|
|
|
n.wg.Wait()
|
|
|
|
n.clean()
|
2016-02-10 23:28:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (n *natsConsumer) Gather(acc telegraf.Accumulator) error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func init() {
|
|
|
|
inputs.Add("nats_consumer", func() telegraf.Input {
|
2016-10-26 15:38:56 +00:00
|
|
|
return &natsConsumer{
|
2018-11-05 21:34:28 +00:00
|
|
|
Servers: []string{"nats://localhost:4222"},
|
|
|
|
Secure: false,
|
|
|
|
Subjects: []string{"telegraf"},
|
|
|
|
QueueGroup: "telegraf_consumers",
|
|
|
|
PendingBytesLimit: nats.DefaultSubPendingBytesLimit,
|
|
|
|
PendingMessageLimit: nats.DefaultSubPendingMsgsLimit,
|
|
|
|
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
|
2016-10-26 15:38:56 +00:00
|
|
|
}
|
2016-02-10 23:28:52 +00:00
|
|
|
})
|
|
|
|
}
|