diff --git a/Godeps b/Godeps index 654a345eb..0802675ba 100644 --- a/Godeps +++ b/Godeps @@ -44,7 +44,7 @@ github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b github.com/nats-io/go-nats ea9585611a4ab58a205b9b125ebd74c389a6b898 github.com/nats-io/nats ea9585611a4ab58a205b9b125ebd74c389a6b898 github.com/nats-io/nuid 289cccf02c178dc782430d534e3c1f5b72af807f -github.com/nsqio/go-nsq a53d495e81424aaf7a7665a9d32a97715c40e953 +github.com/nsqio/go-nsq eee57a3ac4174c55924125bb15eeeda8cffb6e6f github.com/opencontainers/runc 89ab7f2ccc1e45ddf6485eaa802c35dcf321dfc8 github.com/opentracing-contrib/go-observer a52f2342449246d5bcc273e65cbdcfa5f7d6c63c github.com/opentracing/opentracing-go 06f47b42c792fef2796e9681353e1d908c417827 diff --git a/etc/telegraf.conf b/etc/telegraf.conf index c636efe56..f89d54b96 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -2595,8 +2595,12 @@ # # Read NSQ topic for metrics. # [[inputs.nsq_consumer]] -# ## An string representing the NSQD TCP Endpoint -# server = "localhost:4150" +# ## Server option still works but is deprecated, we just prepend it to the nsqd array. +# # server = "localhost:4150" +# ## An array representing the NSQD TCP HTTP Endpoints +# nsqd = ["localhost:4150"] +# ## An array representing the NSQLookupd HTTP Endpoints +# nsqlookupd = ["localhost:4161"] # topic = "telegraf" # channel = "consumer" # max_in_flight = 100 @@ -2764,4 +2768,3 @@ # [[inputs.zipkin]] # # path = "/api/v1/spans" # URL path for span data # # port = 9411 # Port on which Telegraf listens - diff --git a/plugins/inputs/nsq_consumer/README.md b/plugins/inputs/nsq_consumer/README.md index d207d8de1..5ac156eec 100644 --- a/plugins/inputs/nsq_consumer/README.md +++ b/plugins/inputs/nsq_consumer/README.md @@ -1,15 +1,19 @@ # NSQ Consumer Input Plugin The [NSQ](http://nsq.io/) consumer plugin polls a specified NSQD -topic and adds messages to InfluxDB. This plugin allows a message to be in any of the supported `data_format` types. +topic and adds messages to InfluxDB. This plugin allows a message to be in any of the supported `data_format` types. ## Configuration ```toml # Read metrics from NSQD topic(s) [[inputs.nsq_consumer]] - ## An array of NSQD HTTP API endpoints - server = "localhost:4150" + ## Server option still works but is deprecated, we just prepend it to the nsqd array. + # server = "localhost:4150" + ## An array representing the NSQD TCP HTTP Endpoints + nsqd = ["localhost:4150"] + ## An array representing the NSQLookupd HTTP Endpoints + nsqlookupd = ["localhost:4161"] topic = "telegraf" channel = "consumer" max_in_flight = 100 diff --git a/plugins/inputs/nsq_consumer/nsq_consumer.go b/plugins/inputs/nsq_consumer/nsq_consumer.go index b93c4c68e..0823b3ac9 100644 --- a/plugins/inputs/nsq_consumer/nsq_consumer.go +++ b/plugins/inputs/nsq_consumer/nsq_consumer.go @@ -6,12 +6,14 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" - "github.com/nsqio/go-nsq" + nsq "github.com/nsqio/go-nsq" ) //NSQConsumer represents the configuration of the plugin type NSQConsumer struct { Server string + Nsqd []string + Nsqlookupd []string Topic string Channel string MaxInFlight int @@ -21,8 +23,12 @@ type NSQConsumer struct { } var sampleConfig = ` - ## An string representing the NSQD TCP Endpoint - server = "localhost:4150" + ## Server option still works but is deprecated, we just prepend it to the nsqd array. + # server = "localhost:4150" + ## An array representing the NSQD TCP HTTP Endpoints + nsqd = ["localhost:4150"] + ## An array representing the NSQLookupd HTTP Endpoints + nsqlookupd = ["localhost:4161"] topic = "telegraf" channel = "consumer" max_in_flight = 100 @@ -71,7 +77,11 @@ func (n *NSQConsumer) Start(acc telegraf.Accumulator) error { message.Finish() return nil }), n.MaxInFlight) - n.consumer.ConnectToNSQD(n.Server) + + if len(n.Nsqlookupd) > 0 { + n.consumer.ConnectToNSQLookupds(n.Nsqlookupd) + } + n.consumer.ConnectToNSQDs(append(n.Nsqd, n.Server)) return nil } diff --git a/plugins/inputs/nsq_consumer/nsq_consumer_test.go b/plugins/inputs/nsq_consumer/nsq_consumer_test.go index 9342e13ac..a6d8c27e5 100644 --- a/plugins/inputs/nsq_consumer/nsq_consumer_test.go +++ b/plugins/inputs/nsq_consumer/nsq_consumer_test.go @@ -40,6 +40,7 @@ func TestReadsMetricsFromNSQ(t *testing.T) { Topic: "telegraf", Channel: "consume", MaxInFlight: 1, + Nsqd: []string{"127.0.0.1:4155"}, } p, _ := parsers.NewInfluxParser()