Add TLS & credentials configuration for nats_consumer input plugin (#6195)

This commit is contained in:
Mike Melnyk 2019-08-02 16:10:14 -04:00 committed by Daniel Nelson
parent 3c811c15b3
commit 0732b41b4b
2 changed files with 48 additions and 5 deletions

View File

@ -12,13 +12,22 @@ instances of telegraf can read from a NATS cluster in parallel.
[[inputs.nats_consumer]] [[inputs.nats_consumer]]
## urls of NATS servers ## urls of NATS servers
servers = ["nats://localhost:4222"] servers = ["nats://localhost:4222"]
## Use Transport Layer Security
secure = false
## subject(s) to consume ## subject(s) to consume
subjects = ["telegraf"] subjects = ["telegraf"]
## name a queue group ## name a queue group
queue_group = "telegraf_consumers" queue_group = "telegraf_consumers"
## Optional credentials
# username = ""
# password = ""
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Sets the limits for pending msgs and bytes for each subscription ## Sets the limits for pending msgs and bytes for each subscription
## These shouldn't need to be adjusted except in very high throughput scenarios ## These shouldn't need to be adjusted except in very high throughput scenarios
# pending_message_limit = 65536 # pending_message_limit = 65536

View File

@ -7,6 +7,7 @@ import (
"sync" "sync"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
nats "github.com/nats-io/go-nats" nats "github.com/nats-io/go-nats"
@ -34,6 +35,10 @@ type natsConsumer struct {
QueueGroup string `toml:"queue_group"` QueueGroup string `toml:"queue_group"`
Subjects []string `toml:"subjects"` Subjects []string `toml:"subjects"`
Servers []string `toml:"servers"` Servers []string `toml:"servers"`
Username string `toml:"username"`
Password string `toml:"password"`
tls.ClientConfig
// Legacy; Should be deprecated
Secure bool `toml:"secure"` Secure bool `toml:"secure"`
// Client pending limits: // Client pending limits:
@ -61,13 +66,24 @@ type natsConsumer struct {
var sampleConfig = ` var sampleConfig = `
## urls of NATS servers ## urls of NATS servers
servers = ["nats://localhost:4222"] servers = ["nats://localhost:4222"]
## Use Transport Layer Security ## Deprecated: Use Transport Layer Security
secure = false secure = false
## subject(s) to consume ## subject(s) to consume
subjects = ["telegraf"] subjects = ["telegraf"]
## name a queue group ## name a queue group
queue_group = "telegraf_consumers" queue_group = "telegraf_consumers"
## Optional credentials
# username = ""
# password = ""
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Sets the limits for pending msgs and bytes for each subscription ## Sets the limits for pending msgs and bytes for each subscription
## These shouldn't need to be adjusted except in very high throughput scenarios ## These shouldn't need to be adjusted except in very high throughput scenarios
# pending_message_limit = 65536 # pending_message_limit = 65536
@ -125,7 +141,25 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
// override servers if any were specified // override servers if any were specified
opts.Servers = n.Servers opts.Servers = n.Servers
// override authentication, if any was specified
if n.Username != "" {
opts.User = n.Username
opts.Password = n.Password
}
// override TLS, if it was specified
tlsConfig, err := n.ClientConfig.TLSConfig()
if err != nil {
return err
}
if tlsConfig != nil {
// set NATS connection TLS options
opts.Secure = true
opts.TLSConfig = tlsConfig
} else {
// should be deprecated; use TLS
opts.Secure = n.Secure opts.Secure = n.Secure
}
if n.conn == nil || n.conn.IsClosed() { if n.conn == nil || n.conn.IsClosed() {
n.conn, connectErr = opts.Connect() n.conn, connectErr = opts.Connect()