Add support for credentials file to nats_consumer and nats output (#7022)

This commit is contained in:
R.I.Pienaar 2020-02-20 22:30:04 +00:00 committed by GitHub
parent 82a358b910
commit 79ff743064
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 78 additions and 59 deletions

View File

@ -83,8 +83,10 @@ following works:
- github.com/mitchellh/mapstructure [MIT License](https://github.com/mitchellh/mapstructure/blob/master/LICENSE) - github.com/mitchellh/mapstructure [MIT License](https://github.com/mitchellh/mapstructure/blob/master/LICENSE)
- github.com/multiplay/go-ts3 [BSD 2-Clause "Simplified" License](https://github.com/multiplay/go-ts3/blob/master/LICENSE) - github.com/multiplay/go-ts3 [BSD 2-Clause "Simplified" License](https://github.com/multiplay/go-ts3/blob/master/LICENSE)
- github.com/naoina/go-stringutil [MIT License](https://github.com/naoina/go-stringutil/blob/master/LICENSE) - github.com/naoina/go-stringutil [MIT License](https://github.com/naoina/go-stringutil/blob/master/LICENSE)
- github.com/nats-io/gnatsd [Apache License 2.0](https://github.com/nats-io/gnatsd/blob/master/LICENSE) - github.com/nats-io/nats-server [Apache License 2.0](https://github.com/nats-io/nats-server/blob/master/LICENSE)
- github.com/nats-io/go-nats [Apache License 2.0](https://github.com/nats-io/go-nats/blob/master/LICENSE) - github.com/nats-io/nats.go [Apache License 2.0](https://github.com/nats-io/nats.go/blob/master/LICENSE)
- github.com/nats-io/jwt [Apache License 2.0](https://github.com/nats-io/jwt/blob/master/LICENSE)
- github.com/nats-io/nkeys [Apache License 2.0](https://github.com/nats-io/nkeys/blob/master/LICENSE)
- github.com/nats-io/nuid [Apache License 2.0](https://github.com/nats-io/nuid/blob/master/LICENSE) - github.com/nats-io/nuid [Apache License 2.0](https://github.com/nats-io/nuid/blob/master/LICENSE)
- github.com/nsqio/go-nsq [MIT License](https://github.com/nsqio/go-nsq/blob/master/LICENSE) - github.com/nsqio/go-nsq [MIT License](https://github.com/nsqio/go-nsq/blob/master/LICENSE)
- github.com/openconfig/gnmi [Apache License 2.0](https://github.com/openconfig/gnmi/blob/master/LICENSE) - github.com/openconfig/gnmi [Apache License 2.0](https://github.com/openconfig/gnmi/blob/master/LICENSE)

5
go.mod
View File

@ -86,9 +86,8 @@ require (
github.com/mitchellh/mapstructure v0.0.0-20180715050151-f15292f7a699 // indirect github.com/mitchellh/mapstructure v0.0.0-20180715050151-f15292f7a699 // indirect
github.com/multiplay/go-ts3 v1.0.0 github.com/multiplay/go-ts3 v1.0.0
github.com/naoina/go-stringutil v0.1.0 // indirect github.com/naoina/go-stringutil v0.1.0 // indirect
github.com/nats-io/gnatsd v1.2.0 github.com/nats-io/nats-server/v2 v2.1.4
github.com/nats-io/go-nats v1.5.0 github.com/nats-io/nats.go v1.9.1
github.com/nats-io/nuid v1.0.0 // indirect
github.com/nsqio/go-nsq v1.0.7 github.com/nsqio/go-nsq v1.0.7
github.com/openconfig/gnmi v0.0.0-20180912164834-33a1865c3029 github.com/openconfig/gnmi v0.0.0-20180912164834-33a1865c3029
github.com/opencontainers/go-digest v1.0.0-rc1 // indirect github.com/opencontainers/go-digest v1.0.0-rc1 // indirect

20
go.sum
View File

@ -318,12 +318,18 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/naoina/go-stringutil v0.1.0 h1:rCUeRUHjBjGTSHl0VC00jUPLz8/F9dDzYI70Hzifhks= github.com/naoina/go-stringutil v0.1.0 h1:rCUeRUHjBjGTSHl0VC00jUPLz8/F9dDzYI70Hzifhks=
github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0= github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0=
github.com/nats-io/gnatsd v1.2.0 h1:WKLzmB8LyP4CiVJuAoZMxdYBurENVX4piS358tjcBhw= github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg=
github.com/nats-io/gnatsd v1.2.0/go.mod h1:nqco77VO78hLCJpIcVfygDP2rPGfsEHkGTUk94uh5DQ= github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI=
github.com/nats-io/go-nats v1.5.0 h1:OrEQSvQQrP+A+9EBBxY86Z4Es6uaUdObZ5UhWHn9b08= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/go-nats v1.5.0/go.mod h1:+t7RHT5ApZebkrQdnn6AhQJmhJJiKAvJUio1PiiCtj0= github.com/nats-io/nats-server/v2 v2.1.4 h1:BILRnsJ2Yb/fefiFbBWADpViGF69uh4sxe8poVDQ06g=
github.com/nats-io/nuid v1.0.0 h1:44QGdhbiANq8ZCbUkdn6W5bqtg+mHuDE4wOUuxxndFs= github.com/nats-io/nats-server/v2 v2.1.4/go.mod h1:Jw1Z28soD/QasIA2uWjXyM9El1jly3YwyFOuR8tH1rg=
github.com/nats-io/nuid v1.0.0/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nats-io/nats.go v1.9.1 h1:ik3HbLhZ0YABLto7iX80pZLPw/6dx3T+++MZJwLnMrQ=
github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w=
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.3 h1:6JrEfig+HzTH85yxzhSVbjHRJv9cn0p6n3IngIcM5/k=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nsqio/go-nsq v1.0.7 h1:O0pIZJYTf+x7cZBA0UMY8WxFG79lYTURmWzAAh48ljY= github.com/nsqio/go-nsq v1.0.7 h1:O0pIZJYTf+x7cZBA0UMY8WxFG79lYTURmWzAAh48ljY=
github.com/nsqio/go-nsq v1.0.7/go.mod h1:XP5zaUs3pqf+Q71EqUJs3HYfBIqfK6G83WQMdNN+Ito= github.com/nsqio/go-nsq v1.0.7/go.mod h1:XP5zaUs3pqf+Q71EqUJs3HYfBIqfK6G83WQMdNN+Ito=
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
@ -444,6 +450,7 @@ golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413 h1:ULYEB3JvPRE/IfO+9uO7vKV/xzVTO7XPAwm8xbf4w2g= golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413 h1:ULYEB3JvPRE/IfO+9uO7vKV/xzVTO7XPAwm8xbf4w2g=
golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@ -493,6 +500,7 @@ golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456 h1:ng0gs1AKnRRuEMZoTLLlbOd+C17zUDepwGQBb/n+JVg= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456 h1:ng0gs1AKnRRuEMZoTLLlbOd+C17zUDepwGQBb/n+JVg=
golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4 h1:sfkvUWPNGwSV+8/fNqctR5lS2AqCSqYwXdrjCxp/dXo= golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4 h1:sfkvUWPNGwSV+8/fNqctR5lS2AqCSqYwXdrjCxp/dXo=

View File

@ -13,7 +13,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
gnatsd "github.com/nats-io/gnatsd/server" gnatsd "github.com/nats-io/nats-server/v2/server"
) )
type Nats struct { type Nats struct {

View File

@ -23,6 +23,9 @@ instances of telegraf can read from a NATS cluster in parallel.
# username = "" # username = ""
# password = "" # password = ""
## Optional NATS 2.0 and NATS NGS compatible user credentials
# credentials = "/etc/telegraf/nats.creds"
## Use Transport Layer Security ## Use Transport Layer Security
# secure = false # secure = false

View File

@ -3,13 +3,14 @@ package natsconsumer
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"sync" "sync"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
nats "github.com/nats-io/go-nats" "github.com/nats-io/nats.go"
) )
var ( var (
@ -37,6 +38,8 @@ type natsConsumer struct {
Secure bool `toml:"secure"` Secure bool `toml:"secure"`
Username string `toml:"username"` Username string `toml:"username"`
Password string `toml:"password"` Password string `toml:"password"`
Credentials string `toml:"credentials"`
tls.ClientConfig tls.ClientConfig
Log telegraf.Logger Log telegraf.Logger
@ -77,6 +80,9 @@ var sampleConfig = `
# username = "" # username = ""
# password = "" # password = ""
## Optional NATS 2.0 and NATS NGS compatible user credentials
# credentials = "/etc/telegraf/nats.creds"
## Use Transport Layer Security ## Use Transport Layer Security
# secure = false # secure = false
@ -135,19 +141,18 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
var connectErr error var connectErr error
// set default NATS connection options options := []nats.Option{
opts := nats.DefaultOptions nats.MaxReconnects(-1),
nats.ErrorHandler(n.natsErrHandler),
// override max reconnection tries }
opts.MaxReconnect = -1
// override servers if any were specified
opts.Servers = n.Servers
// override authentication, if any was specified // override authentication, if any was specified
if n.Username != "" { if n.Username != "" && n.Password != "" {
opts.User = n.Username options = append(options, nats.UserInfo(n.Username, n.Password))
opts.Password = n.Password }
if n.Credentials != "" {
options = append(options, nats.UserCredentials(n.Credentials))
} }
if n.Secure { if n.Secure {
@ -156,19 +161,17 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
return err return err
} }
opts.Secure = true options = append(options, nats.Secure(tlsConfig))
opts.TLSConfig = tlsConfig
} }
if n.conn == nil || n.conn.IsClosed() { if n.conn == nil || n.conn.IsClosed() {
n.conn, connectErr = opts.Connect() n.conn, connectErr = nats.Connect(strings.Join(n.Servers, ","), options...)
if connectErr != nil { if connectErr != nil {
return connectErr return connectErr
} }
// Setup message and error channels // Setup message and error channels
n.errs = make(chan error) n.errs = make(chan error)
n.conn.SetErrorHandler(n.natsErrHandler)
n.in = make(chan *nats.Msg, 1000) n.in = make(chan *nats.Msg, 1000)
for _, subj := range n.Subjects { for _, subj := range n.Subjects {
@ -178,14 +181,13 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
if err != nil { if err != nil {
return err return err
} }
// ensure that the subscription has been processed by the server
if err = n.conn.Flush(); err != nil {
return err
}
// set the subscription pending limits // set the subscription pending limits
if err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit); err != nil { err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit)
if err != nil {
return err return err
} }
n.subs = append(n.subs, sub) n.subs = append(n.subs, sub)
} }
} }

View File

@ -9,6 +9,10 @@ This plugin writes to a (list of) specified NATS instance(s).
## Optional credentials ## Optional credentials
# username = "" # username = ""
# password = "" # password = ""
## Optional NATS 2.0 and NATS NGS compatible user credentials
# credentials = "/etc/telegraf/nats.creds"
## NATS subject for producer messages ## NATS subject for producer messages
subject = "telegraf" subject = "telegraf"

View File

@ -3,12 +3,13 @@ package nats
import ( import (
"fmt" "fmt"
"log" "log"
"strings"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/plugins/serializers"
nats_client "github.com/nats-io/go-nats" "github.com/nats-io/nats.go"
) )
type NATS struct { type NATS struct {
@ -16,19 +17,26 @@ type NATS struct {
Secure bool `toml:"secure"` Secure bool `toml:"secure"`
Username string `toml:"username"` Username string `toml:"username"`
Password string `toml:"password"` Password string `toml:"password"`
Credentials string `toml:"credentials"`
Subject string `toml:"subject"` Subject string `toml:"subject"`
tls.ClientConfig tls.ClientConfig
conn *nats_client.Conn conn *nats.Conn
serializer serializers.Serializer serializer serializers.Serializer
} }
var sampleConfig = ` var sampleConfig = `
## URLs of NATS servers ## URLs of NATS servers
servers = ["nats://localhost:4222"] servers = ["nats://localhost:4222"]
## Optional credentials ## Optional credentials
# username = "" # username = ""
# password = "" # password = ""
## Optional NATS 2.0 and NATS NGS compatible user credentials
# credentials = "/etc/telegraf/nats.creds"
## NATS subject for producer messages ## NATS subject for producer messages
subject = "telegraf" subject = "telegraf"
@ -56,19 +64,13 @@ func (n *NATS) SetSerializer(serializer serializers.Serializer) {
func (n *NATS) Connect() error { func (n *NATS) Connect() error {
var err error var err error
// set default NATS connection options opts := []nats.Option{
opts := nats_client.DefaultOptions nats.MaxReconnects(-1),
}
// override max reconnection tries
opts.MaxReconnect = -1
// override servers, if any were specified
opts.Servers = n.Servers
// override authentication, if any was specified // override authentication, if any was specified
if n.Username != "" { if n.Username != "" {
opts.User = n.Username opts = append(opts, nats.UserInfo(n.Username, n.Password))
opts.Password = n.Password
} }
if n.Secure { if n.Secure {
@ -77,12 +79,11 @@ func (n *NATS) Connect() error {
return err return err
} }
opts.Secure = true opts = append(opts, nats.Secure(tlsConfig))
opts.TLSConfig = tlsConfig
} }
// try and connect // try and connect
n.conn, err = opts.Connect() n.conn, err = nats.Connect(strings.Join(n.Servers, ","), opts...)
return err return err
} }