From 79ff743064c5d61ecad86661f0bfc8073e628a6c Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Thu, 20 Feb 2020 22:30:04 +0000 Subject: [PATCH] Add support for credentials file to nats_consumer and nats output (#7022) --- docs/LICENSE_OF_DEPENDENCIES.md | 6 +- go.mod | 5 +- go.sum | 20 +++++-- plugins/inputs/nats/nats.go | 2 +- plugins/inputs/nats_consumer/README.md | 3 + plugins/inputs/nats_consumer/nats_consumer.go | 56 ++++++++++--------- plugins/outputs/nats/README.md | 4 ++ plugins/outputs/nats/nats.go | 41 +++++++------- 8 files changed, 78 insertions(+), 59 deletions(-) diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 71636a0b8..f22e3c7e9 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -83,8 +83,10 @@ following works: - github.com/mitchellh/mapstructure [MIT License](https://github.com/mitchellh/mapstructure/blob/master/LICENSE) - github.com/multiplay/go-ts3 [BSD 2-Clause "Simplified" License](https://github.com/multiplay/go-ts3/blob/master/LICENSE) - github.com/naoina/go-stringutil [MIT License](https://github.com/naoina/go-stringutil/blob/master/LICENSE) -- github.com/nats-io/gnatsd [Apache License 2.0](https://github.com/nats-io/gnatsd/blob/master/LICENSE) -- github.com/nats-io/go-nats [Apache License 2.0](https://github.com/nats-io/go-nats/blob/master/LICENSE) +- github.com/nats-io/nats-server [Apache License 2.0](https://github.com/nats-io/nats-server/blob/master/LICENSE) +- github.com/nats-io/nats.go [Apache License 2.0](https://github.com/nats-io/nats.go/blob/master/LICENSE) +- github.com/nats-io/jwt [Apache License 2.0](https://github.com/nats-io/jwt/blob/master/LICENSE) +- github.com/nats-io/nkeys [Apache License 2.0](https://github.com/nats-io/nkeys/blob/master/LICENSE) - github.com/nats-io/nuid [Apache License 2.0](https://github.com/nats-io/nuid/blob/master/LICENSE) - github.com/nsqio/go-nsq [MIT License](https://github.com/nsqio/go-nsq/blob/master/LICENSE) - github.com/openconfig/gnmi [Apache License 2.0](https://github.com/openconfig/gnmi/blob/master/LICENSE) diff --git a/go.mod b/go.mod index bea4cff74..84b24db20 100644 --- a/go.mod +++ b/go.mod @@ -86,9 +86,8 @@ require ( github.com/mitchellh/mapstructure v0.0.0-20180715050151-f15292f7a699 // indirect github.com/multiplay/go-ts3 v1.0.0 github.com/naoina/go-stringutil v0.1.0 // indirect - github.com/nats-io/gnatsd v1.2.0 - github.com/nats-io/go-nats v1.5.0 - github.com/nats-io/nuid v1.0.0 // indirect + github.com/nats-io/nats-server/v2 v2.1.4 + github.com/nats-io/nats.go v1.9.1 github.com/nsqio/go-nsq v1.0.7 github.com/openconfig/gnmi v0.0.0-20180912164834-33a1865c3029 github.com/opencontainers/go-digest v1.0.0-rc1 // indirect diff --git a/go.sum b/go.sum index edb53ea89..f940ab12a 100644 --- a/go.sum +++ b/go.sum @@ -318,12 +318,18 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/naoina/go-stringutil v0.1.0 h1:rCUeRUHjBjGTSHl0VC00jUPLz8/F9dDzYI70Hzifhks= github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0= -github.com/nats-io/gnatsd v1.2.0 h1:WKLzmB8LyP4CiVJuAoZMxdYBurENVX4piS358tjcBhw= -github.com/nats-io/gnatsd v1.2.0/go.mod h1:nqco77VO78hLCJpIcVfygDP2rPGfsEHkGTUk94uh5DQ= -github.com/nats-io/go-nats v1.5.0 h1:OrEQSvQQrP+A+9EBBxY86Z4Es6uaUdObZ5UhWHn9b08= -github.com/nats-io/go-nats v1.5.0/go.mod h1:+t7RHT5ApZebkrQdnn6AhQJmhJJiKAvJUio1PiiCtj0= -github.com/nats-io/nuid v1.0.0 h1:44QGdhbiANq8ZCbUkdn6W5bqtg+mHuDE4wOUuxxndFs= -github.com/nats-io/nuid v1.0.0/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= +github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI= +github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= +github.com/nats-io/nats-server/v2 v2.1.4 h1:BILRnsJ2Yb/fefiFbBWADpViGF69uh4sxe8poVDQ06g= +github.com/nats-io/nats-server/v2 v2.1.4/go.mod h1:Jw1Z28soD/QasIA2uWjXyM9El1jly3YwyFOuR8tH1rg= +github.com/nats-io/nats.go v1.9.1 h1:ik3HbLhZ0YABLto7iX80pZLPw/6dx3T+++MZJwLnMrQ= +github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= +github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= +github.com/nats-io/nkeys v0.1.3 h1:6JrEfig+HzTH85yxzhSVbjHRJv9cn0p6n3IngIcM5/k= +github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nsqio/go-nsq v1.0.7 h1:O0pIZJYTf+x7cZBA0UMY8WxFG79lYTURmWzAAh48ljY= github.com/nsqio/go-nsq v1.0.7/go.mod h1:XP5zaUs3pqf+Q71EqUJs3HYfBIqfK6G83WQMdNN+Ito= github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -444,6 +450,7 @@ golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= +golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413 h1:ULYEB3JvPRE/IfO+9uO7vKV/xzVTO7XPAwm8xbf4w2g= golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -493,6 +500,7 @@ golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456 h1:ng0gs1AKnRRuEMZoTLLlbOd+C17zUDepwGQBb/n+JVg= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4 h1:sfkvUWPNGwSV+8/fNqctR5lS2AqCSqYwXdrjCxp/dXo= diff --git a/plugins/inputs/nats/nats.go b/plugins/inputs/nats/nats.go index 83e262ec8..1afb0046d 100644 --- a/plugins/inputs/nats/nats.go +++ b/plugins/inputs/nats/nats.go @@ -13,7 +13,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" - gnatsd "github.com/nats-io/gnatsd/server" + gnatsd "github.com/nats-io/nats-server/v2/server" ) type Nats struct { diff --git a/plugins/inputs/nats_consumer/README.md b/plugins/inputs/nats_consumer/README.md index 7c1abab0b..ae40d9185 100644 --- a/plugins/inputs/nats_consumer/README.md +++ b/plugins/inputs/nats_consumer/README.md @@ -23,6 +23,9 @@ instances of telegraf can read from a NATS cluster in parallel. # username = "" # password = "" + ## Optional NATS 2.0 and NATS NGS compatible user credentials + # credentials = "/etc/telegraf/nats.creds" + ## Use Transport Layer Security # secure = false diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index eff726964..6ac19b0a8 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -3,13 +3,14 @@ package natsconsumer import ( "context" "fmt" + "strings" "sync" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" - nats "github.com/nats-io/go-nats" + "github.com/nats-io/nats.go" ) var ( @@ -31,12 +32,14 @@ func (e natsError) Error() string { } type natsConsumer struct { - QueueGroup string `toml:"queue_group"` - Subjects []string `toml:"subjects"` - Servers []string `toml:"servers"` - Secure bool `toml:"secure"` - Username string `toml:"username"` - Password string `toml:"password"` + QueueGroup string `toml:"queue_group"` + Subjects []string `toml:"subjects"` + Servers []string `toml:"servers"` + Secure bool `toml:"secure"` + Username string `toml:"username"` + Password string `toml:"password"` + Credentials string `toml:"credentials"` + tls.ClientConfig Log telegraf.Logger @@ -77,6 +80,9 @@ var sampleConfig = ` # username = "" # password = "" + ## Optional NATS 2.0 and NATS NGS compatible user credentials + # credentials = "/etc/telegraf/nats.creds" + ## Use Transport Layer Security # secure = false @@ -135,19 +141,18 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error { var connectErr error - // set default NATS connection options - opts := nats.DefaultOptions - - // override max reconnection tries - opts.MaxReconnect = -1 - - // override servers if any were specified - opts.Servers = n.Servers + options := []nats.Option{ + nats.MaxReconnects(-1), + nats.ErrorHandler(n.natsErrHandler), + } // override authentication, if any was specified - if n.Username != "" { - opts.User = n.Username - opts.Password = n.Password + if n.Username != "" && n.Password != "" { + options = append(options, nats.UserInfo(n.Username, n.Password)) + } + + if n.Credentials != "" { + options = append(options, nats.UserCredentials(n.Credentials)) } if n.Secure { @@ -156,19 +161,17 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error { return err } - opts.Secure = true - opts.TLSConfig = tlsConfig + options = append(options, nats.Secure(tlsConfig)) } if n.conn == nil || n.conn.IsClosed() { - n.conn, connectErr = opts.Connect() + n.conn, connectErr = nats.Connect(strings.Join(n.Servers, ","), options...) if connectErr != nil { return connectErr } // Setup message and error channels n.errs = make(chan error) - n.conn.SetErrorHandler(n.natsErrHandler) n.in = make(chan *nats.Msg, 1000) for _, subj := range n.Subjects { @@ -178,14 +181,13 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error { if err != nil { return err } - // ensure that the subscription has been processed by the server - if err = n.conn.Flush(); err != nil { - return err - } + // set the subscription pending limits - if err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit); err != nil { + err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit) + if err != nil { return err } + n.subs = append(n.subs, sub) } } diff --git a/plugins/outputs/nats/README.md b/plugins/outputs/nats/README.md index f6dc04f53..c5539900b 100644 --- a/plugins/outputs/nats/README.md +++ b/plugins/outputs/nats/README.md @@ -9,6 +9,10 @@ This plugin writes to a (list of) specified NATS instance(s). ## Optional credentials # username = "" # password = "" + + ## Optional NATS 2.0 and NATS NGS compatible user credentials + # credentials = "/etc/telegraf/nats.creds" + ## NATS subject for producer messages subject = "telegraf" diff --git a/plugins/outputs/nats/nats.go b/plugins/outputs/nats/nats.go index e4817d6c9..620ac8b44 100644 --- a/plugins/outputs/nats/nats.go +++ b/plugins/outputs/nats/nats.go @@ -3,32 +3,40 @@ package nats import ( "fmt" "log" + "strings" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers" - nats_client "github.com/nats-io/go-nats" + "github.com/nats-io/nats.go" ) type NATS struct { - Servers []string `toml:"servers"` - Secure bool `toml:"secure"` - Username string `toml:"username"` - Password string `toml:"password"` - Subject string `toml:"subject"` + Servers []string `toml:"servers"` + Secure bool `toml:"secure"` + Username string `toml:"username"` + Password string `toml:"password"` + Credentials string `toml:"credentials"` + Subject string `toml:"subject"` + tls.ClientConfig - conn *nats_client.Conn + conn *nats.Conn serializer serializers.Serializer } var sampleConfig = ` ## URLs of NATS servers servers = ["nats://localhost:4222"] + ## Optional credentials # username = "" # password = "" + + ## Optional NATS 2.0 and NATS NGS compatible user credentials + # credentials = "/etc/telegraf/nats.creds" + ## NATS subject for producer messages subject = "telegraf" @@ -56,19 +64,13 @@ func (n *NATS) SetSerializer(serializer serializers.Serializer) { func (n *NATS) Connect() error { var err error - // set default NATS connection options - opts := nats_client.DefaultOptions - - // override max reconnection tries - opts.MaxReconnect = -1 - - // override servers, if any were specified - opts.Servers = n.Servers + opts := []nats.Option{ + nats.MaxReconnects(-1), + } // override authentication, if any was specified if n.Username != "" { - opts.User = n.Username - opts.Password = n.Password + opts = append(opts, nats.UserInfo(n.Username, n.Password)) } if n.Secure { @@ -77,12 +79,11 @@ func (n *NATS) Connect() error { return err } - opts.Secure = true - opts.TLSConfig = tlsConfig + opts = append(opts, nats.Secure(tlsConfig)) } // try and connect - n.conn, err = opts.Connect() + n.conn, err = nats.Connect(strings.Join(n.Servers, ","), opts...) return err }