Add support for NSQLookupd to nsq_consumer (#3215)

This commit is contained in:
Lukasz Jagiello 2017-09-25 16:33:05 -07:00 committed by Daniel Nelson
parent 4ccef6f99e
commit 77c7b6bee5
5 changed files with 29 additions and 11 deletions

2
Godeps
View File

@ -44,7 +44,7 @@ github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b
github.com/nats-io/go-nats ea9585611a4ab58a205b9b125ebd74c389a6b898 github.com/nats-io/go-nats ea9585611a4ab58a205b9b125ebd74c389a6b898
github.com/nats-io/nats ea9585611a4ab58a205b9b125ebd74c389a6b898 github.com/nats-io/nats ea9585611a4ab58a205b9b125ebd74c389a6b898
github.com/nats-io/nuid 289cccf02c178dc782430d534e3c1f5b72af807f github.com/nats-io/nuid 289cccf02c178dc782430d534e3c1f5b72af807f
github.com/nsqio/go-nsq a53d495e81424aaf7a7665a9d32a97715c40e953 github.com/nsqio/go-nsq eee57a3ac4174c55924125bb15eeeda8cffb6e6f
github.com/opencontainers/runc 89ab7f2ccc1e45ddf6485eaa802c35dcf321dfc8 github.com/opencontainers/runc 89ab7f2ccc1e45ddf6485eaa802c35dcf321dfc8
github.com/opentracing-contrib/go-observer a52f2342449246d5bcc273e65cbdcfa5f7d6c63c github.com/opentracing-contrib/go-observer a52f2342449246d5bcc273e65cbdcfa5f7d6c63c
github.com/opentracing/opentracing-go 06f47b42c792fef2796e9681353e1d908c417827 github.com/opentracing/opentracing-go 06f47b42c792fef2796e9681353e1d908c417827

View File

@ -2595,8 +2595,12 @@
# # Read NSQ topic for metrics. # # Read NSQ topic for metrics.
# [[inputs.nsq_consumer]] # [[inputs.nsq_consumer]]
# ## An string representing the NSQD TCP Endpoint # ## Server option still works but is deprecated, we just prepend it to the nsqd array.
# server = "localhost:4150" # # server = "localhost:4150"
# ## An array representing the NSQD TCP HTTP Endpoints
# nsqd = ["localhost:4150"]
# ## An array representing the NSQLookupd HTTP Endpoints
# nsqlookupd = ["localhost:4161"]
# topic = "telegraf" # topic = "telegraf"
# channel = "consumer" # channel = "consumer"
# max_in_flight = 100 # max_in_flight = 100
@ -2764,4 +2768,3 @@
# [[inputs.zipkin]] # [[inputs.zipkin]]
# # path = "/api/v1/spans" # URL path for span data # # path = "/api/v1/spans" # URL path for span data
# # port = 9411 # Port on which Telegraf listens # # port = 9411 # Port on which Telegraf listens

View File

@ -1,15 +1,19 @@
# NSQ Consumer Input Plugin # NSQ Consumer Input Plugin
The [NSQ](http://nsq.io/) consumer plugin polls a specified NSQD The [NSQ](http://nsq.io/) consumer plugin polls a specified NSQD
topic and adds messages to InfluxDB. This plugin allows a message to be in any of the supported `data_format` types. topic and adds messages to InfluxDB. This plugin allows a message to be in any of the supported `data_format` types.
## Configuration ## Configuration
```toml ```toml
# Read metrics from NSQD topic(s) # Read metrics from NSQD topic(s)
[[inputs.nsq_consumer]] [[inputs.nsq_consumer]]
## An array of NSQD HTTP API endpoints ## Server option still works but is deprecated, we just prepend it to the nsqd array.
server = "localhost:4150" # server = "localhost:4150"
## An array representing the NSQD TCP HTTP Endpoints
nsqd = ["localhost:4150"]
## An array representing the NSQLookupd HTTP Endpoints
nsqlookupd = ["localhost:4161"]
topic = "telegraf" topic = "telegraf"
channel = "consumer" channel = "consumer"
max_in_flight = 100 max_in_flight = 100

View File

@ -6,12 +6,14 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/nsqio/go-nsq" nsq "github.com/nsqio/go-nsq"
) )
//NSQConsumer represents the configuration of the plugin //NSQConsumer represents the configuration of the plugin
type NSQConsumer struct { type NSQConsumer struct {
Server string Server string
Nsqd []string
Nsqlookupd []string
Topic string Topic string
Channel string Channel string
MaxInFlight int MaxInFlight int
@ -21,8 +23,12 @@ type NSQConsumer struct {
} }
var sampleConfig = ` var sampleConfig = `
## An string representing the NSQD TCP Endpoint ## Server option still works but is deprecated, we just prepend it to the nsqd array.
server = "localhost:4150" # server = "localhost:4150"
## An array representing the NSQD TCP HTTP Endpoints
nsqd = ["localhost:4150"]
## An array representing the NSQLookupd HTTP Endpoints
nsqlookupd = ["localhost:4161"]
topic = "telegraf" topic = "telegraf"
channel = "consumer" channel = "consumer"
max_in_flight = 100 max_in_flight = 100
@ -71,7 +77,11 @@ func (n *NSQConsumer) Start(acc telegraf.Accumulator) error {
message.Finish() message.Finish()
return nil return nil
}), n.MaxInFlight) }), n.MaxInFlight)
n.consumer.ConnectToNSQD(n.Server)
if len(n.Nsqlookupd) > 0 {
n.consumer.ConnectToNSQLookupds(n.Nsqlookupd)
}
n.consumer.ConnectToNSQDs(append(n.Nsqd, n.Server))
return nil return nil
} }

View File

@ -40,6 +40,7 @@ func TestReadsMetricsFromNSQ(t *testing.T) {
Topic: "telegraf", Topic: "telegraf",
Channel: "consume", Channel: "consume",
MaxInFlight: 1, MaxInFlight: 1,
Nsqd: []string{"127.0.0.1:4155"},
} }
p, _ := parsers.NewInfluxParser() p, _ := parsers.NewInfluxParser()