Add support for TLS configuration in NSQ input (#3903)

This commit is contained in:
Soulou 2018-10-23 02:50:32 +02:00 committed by Daniel Nelson
parent 2e59e4dd6c
commit 12279042d3
2 changed files with 43 additions and 16 deletions

View File

@ -33,17 +33,27 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
// Might add Lookupd endpoints for cluster discovery // Might add Lookupd endpoints for cluster discovery
type NSQ struct { type NSQ struct {
Endpoints []string Endpoints []string
tls.ClientConfig
httpClient *http.Client
} }
var sampleConfig = ` var sampleConfig = `
## An array of NSQD HTTP API endpoints ## An array of NSQD HTTP API endpoints
endpoints = ["http://localhost:4151"] endpoints = ["http://localhost:4151"]
## Or using HTTPS endpoint
endpoints = ["https://localhost:4152"]
tls_cert = "/path/to/client-cert.pem"
tls_key = "/path/to/client-key.pem"
tls_ca = "/path/to/ca.pem"
insecure_skip_verify = false
` `
const ( const (
@ -52,10 +62,14 @@ const (
func init() { func init() {
inputs.Add("nsq", func() telegraf.Input { inputs.Add("nsq", func() telegraf.Input {
return &NSQ{} return New()
}) })
} }
func New() *NSQ {
return &NSQ{}
}
func (n *NSQ) SampleConfig() string { func (n *NSQ) SampleConfig() string {
return sampleConfig return sampleConfig
} }
@ -65,6 +79,15 @@ func (n *NSQ) Description() string {
} }
func (n *NSQ) Gather(acc telegraf.Accumulator) error { func (n *NSQ) Gather(acc telegraf.Accumulator) error {
var err error
if n.httpClient == nil {
n.httpClient, err = n.getHttpClient()
if err != nil {
return err
}
}
var wg sync.WaitGroup var wg sync.WaitGroup
for _, e := range n.Endpoints { for _, e := range n.Endpoints {
wg.Add(1) wg.Add(1)
@ -78,13 +101,19 @@ func (n *NSQ) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
var tr = &http.Transport{ func (n *NSQ) getHttpClient() (*http.Client, error) {
ResponseHeaderTimeout: time.Duration(3 * time.Second), tlsConfig, err := n.ClientConfig.TLSConfig()
} if err != nil {
return nil, err
var client = &http.Client{ }
Transport: tr, tr := &http.Transport{
Timeout: time.Duration(4 * time.Second), TLSClientConfig: tlsConfig,
}
httpClient := &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
return httpClient, nil
} }
func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error { func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error {
@ -92,7 +121,7 @@ func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error {
if err != nil { if err != nil {
return err return err
} }
r, err := client.Get(u.String()) r, err := n.httpClient.Get(u.String())
if err != nil { if err != nil {
return fmt.Errorf("Error while polling %s: %s", u.String(), err) return fmt.Errorf("Error while polling %s: %s", u.String(), err)
} }

View File

@ -19,9 +19,8 @@ func TestNSQStatsV1(t *testing.T) {
})) }))
defer ts.Close() defer ts.Close()
n := &NSQ{ n := New()
Endpoints: []string{ts.URL}, n.Endpoints = []string{ts.URL}
}
var acc testutil.Accumulator var acc testutil.Accumulator
err := acc.GatherError(n.Gather) err := acc.GatherError(n.Gather)
@ -276,9 +275,8 @@ func TestNSQStatsPreV1(t *testing.T) {
})) }))
defer ts.Close() defer ts.Close()
n := &NSQ{ n := New()
Endpoints: []string{ts.URL}, n.Endpoints = []string{ts.URL}
}
var acc testutil.Accumulator var acc testutil.Accumulator
err := acc.GatherError(n.Gather) err := acc.GatherError(n.Gather)