From 19149d3e132a5dc334d75cfd71c9436ccf50e88b Mon Sep 17 00:00:00 2001 From: Jonathan Chauncey Date: Thu, 9 Jun 2016 13:31:05 -0600 Subject: [PATCH] feat(nsq_consumer): Add input plugin to consume metrics from an nsqd topic --- CHANGELOG.md | 2 + README.md | 1 + plugins/inputs/all/all.go | 1 + plugins/inputs/nsq_consumer/README.md | 25 ++ plugins/inputs/nsq_consumer/nsq_consumer.go | 99 +++++++ .../inputs/nsq_consumer/nsq_consumer_test.go | 245 ++++++++++++++++++ 6 files changed, 373 insertions(+) create mode 100644 plugins/inputs/nsq_consumer/README.md create mode 100644 plugins/inputs/nsq_consumer/nsq_consumer.go create mode 100644 plugins/inputs/nsq_consumer/nsq_consumer_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index b942ec953..1afb9c895 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ - [#1390](https://github.com/influxdata/telegraf/pull/1390): Add support for Tengine - [#1320](https://github.com/influxdata/telegraf/pull/1320): Logparser input plugin for parsing grok-style log patterns. - [#1397](https://github.com/influxdata/telegraf/issues/1397): ElasticSearch: now supports connecting to ElasticSearch via SSL +- [#1369](https://github.com/influxdata/telegraf/pull/1369): Add input plugin for consuming metrics from NSQD. + ### Bugfixes diff --git a/README.md b/README.md index 682e96101..c463b919b 100644 --- a/README.md +++ b/README.md @@ -217,6 +217,7 @@ Telegraf can also collect metrics via the following service plugins: * [mqtt_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/mqtt_consumer) * [kafka_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/kafka_consumer) * [nats_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nats_consumer) +* [nsq_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nsq_consumer) * [github_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/github_webhooks) * [rollbar_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/rollbar_webhooks) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 1d8472469..03f0d40d1 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -41,6 +41,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/net_response" _ "github.com/influxdata/telegraf/plugins/inputs/nginx" _ "github.com/influxdata/telegraf/plugins/inputs/nsq" + _ "github.com/influxdata/telegraf/plugins/inputs/nsq_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/nstat" _ "github.com/influxdata/telegraf/plugins/inputs/ntpq" _ "github.com/influxdata/telegraf/plugins/inputs/passenger" diff --git a/plugins/inputs/nsq_consumer/README.md b/plugins/inputs/nsq_consumer/README.md new file mode 100644 index 000000000..eac494ccb --- /dev/null +++ b/plugins/inputs/nsq_consumer/README.md @@ -0,0 +1,25 @@ +# NSQ Consumer Input Plugin + +The [NSQ](http://nsq.io/) consumer plugin polls a specified NSQD +topic and adds messages to InfluxDB. This plugin allows a message to be in any of the supported `data_format` types. + +## Configuration + +```toml +# Read metrics from NSQD topic(s) +[[inputs.nsq_consumer]] + ## An array of NSQD HTTP API endpoints + server = "localhost:4150" + topic = "telegraf" + channel = "consumer" + max_in_flight = 100 + + ## Data format to consume. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +``` + +## Testing +The `nsq_consumer_test` mocks out the interaction with `NSQD`. It requires no outside dependencies. diff --git a/plugins/inputs/nsq_consumer/nsq_consumer.go b/plugins/inputs/nsq_consumer/nsq_consumer.go new file mode 100644 index 000000000..b227b7e50 --- /dev/null +++ b/plugins/inputs/nsq_consumer/nsq_consumer.go @@ -0,0 +1,99 @@ +package nsq_consumer + +import ( + "log" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/nsqio/go-nsq" +) + +//NSQConsumer represents the configuration of the plugin +type NSQConsumer struct { + Server string + Topic string + Channel string + MaxInFlight int + parser parsers.Parser + consumer *nsq.Consumer + acc telegraf.Accumulator +} + +var sampleConfig = ` + ## An string representing the NSQD TCP Endpoint + server = "localhost:4150" + topic = "telegraf" + channel = "consumer" + max_in_flight = 100 + + ## Data format to consume. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +` + +func init() { + inputs.Add("nsq_consumer", func() telegraf.Input { + return &NSQConsumer{} + }) +} + +// SetParser takes the data_format from the config and finds the right parser for that format +func (n *NSQConsumer) SetParser(parser parsers.Parser) { + n.parser = parser +} + +// SampleConfig returns config values for generating a sample configuration file +func (n *NSQConsumer) SampleConfig() string { + return sampleConfig +} + +// Description prints description string +func (n *NSQConsumer) Description() string { + return "Read NSQ topic for metrics." +} + +// Start pulls data from nsq +func (n *NSQConsumer) Start(acc telegraf.Accumulator) error { + n.acc = acc + n.connect() + n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error { + metrics, err := n.parser.Parse(message.Body) + if err != nil { + log.Printf("NSQConsumer Parse Error\nmessage:%s\nerror:%s", string(message.Body), err.Error()) + return nil + } + for _, metric := range metrics { + n.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) + } + message.Finish() + return nil + }), n.MaxInFlight) + n.consumer.ConnectToNSQD(n.Server) + return nil +} + +// Stop processing messages +func (n *NSQConsumer) Stop() { + n.consumer.Stop() +} + +// Gather is a noop +func (n *NSQConsumer) Gather(acc telegraf.Accumulator) error { + return nil +} + +func (n *NSQConsumer) connect() error { + if n.consumer == nil { + config := nsq.NewConfig() + config.MaxInFlight = n.MaxInFlight + consumer, err := nsq.NewConsumer(n.Topic, n.Channel, config) + if err != nil { + return err + } + n.consumer = consumer + } + return nil +} diff --git a/plugins/inputs/nsq_consumer/nsq_consumer_test.go b/plugins/inputs/nsq_consumer/nsq_consumer_test.go new file mode 100644 index 000000000..59db675a5 --- /dev/null +++ b/plugins/inputs/nsq_consumer/nsq_consumer_test.go @@ -0,0 +1,245 @@ +package nsq_consumer + +import ( + "bufio" + "bytes" + "encoding/binary" + "io" + "log" + "net" + "strconv" + "testing" + "time" + + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/testutil" + "github.com/nsqio/go-nsq" + "github.com/stretchr/testify/assert" +) + +// This test is modeled after the kafka consumer integration test +func TestReadsMetricsFromNSQ(t *testing.T) { + msgID := nsq.MessageID{'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'} + msg := nsq.NewMessage(msgID, []byte("cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257")) + + script := []instruction{ + // SUB + instruction{0, nsq.FrameTypeResponse, []byte("OK")}, + // IDENTIFY + instruction{0, nsq.FrameTypeResponse, []byte("OK")}, + instruction{20 * time.Millisecond, nsq.FrameTypeMessage, frameMessage(msg)}, + // needed to exit test + instruction{100 * time.Millisecond, -1, []byte("exit")}, + } + + addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:4155") + newMockNSQD(script, addr.String()) + + consumer := &NSQConsumer{ + Server: "127.0.0.1:4155", + Topic: "telegraf", + Channel: "consume", + MaxInFlight: 1, + } + + p, _ := parsers.NewInfluxParser() + consumer.SetParser(p) + var acc testutil.Accumulator + assert.Equal(t, 0, len(acc.Metrics), "There should not be any points") + if err := consumer.Start(&acc); err != nil { + t.Fatal(err.Error()) + } else { + defer consumer.Stop() + } + + waitForPoint(&acc, t) + + if len(acc.Metrics) == 1 { + point := acc.Metrics[0] + assert.Equal(t, "cpu_load_short", point.Measurement) + assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Fields) + assert.Equal(t, map[string]string{ + "host": "server01", + "direction": "in", + "region": "us-west", + }, point.Tags) + assert.Equal(t, time.Unix(0, 1422568543702900257).Unix(), point.Time.Unix()) + } else { + t.Errorf("No points found in accumulator, expected 1") + } + +} + +// Waits for the metric that was sent to the kafka broker to arrive at the kafka +// consumer +func waitForPoint(acc *testutil.Accumulator, t *testing.T) { + // Give the kafka container up to 2 seconds to get the point to the consumer + ticker := time.NewTicker(5 * time.Millisecond) + defer ticker.Stop() + counter := 0 + for { + select { + case <-ticker.C: + counter++ + if counter > 1000 { + t.Fatal("Waited for 5s, point never arrived to consumer") + } else if acc.NFields() == 1 { + return + } + } + } +} + +func newMockNSQD(script []instruction, addr string) *mockNSQD { + n := &mockNSQD{ + script: script, + exitChan: make(chan int), + } + + tcpListener, err := net.Listen("tcp", addr) + if err != nil { + log.Fatalf("FATAL: listen (%s) failed - %s", n.tcpAddr.String(), err) + } + n.tcpListener = tcpListener + n.tcpAddr = tcpListener.Addr().(*net.TCPAddr) + + go n.listen() + + return n +} + +// The code below allows us to mock the interactions with nsqd. This is taken from: +// https://github.com/nsqio/go-nsq/blob/master/mock_test.go +type instruction struct { + delay time.Duration + frameType int32 + body []byte +} + +type mockNSQD struct { + script []instruction + got [][]byte + tcpAddr *net.TCPAddr + tcpListener net.Listener + exitChan chan int +} + +func (n *mockNSQD) listen() { + for { + conn, err := n.tcpListener.Accept() + if err != nil { + break + } + go n.handle(conn) + } + close(n.exitChan) +} + +func (n *mockNSQD) handle(conn net.Conn) { + var idx int + buf := make([]byte, 4) + _, err := io.ReadFull(conn, buf) + if err != nil { + log.Fatalf("ERROR: failed to read protocol version - %s", err) + } + + readChan := make(chan []byte) + readDoneChan := make(chan int) + scriptTime := time.After(n.script[0].delay) + rdr := bufio.NewReader(conn) + + go func() { + for { + line, err := rdr.ReadBytes('\n') + if err != nil { + return + } + // trim the '\n' + line = line[:len(line)-1] + readChan <- line + <-readDoneChan + } + }() + + var rdyCount int + for idx < len(n.script) { + select { + case line := <-readChan: + n.got = append(n.got, line) + params := bytes.Split(line, []byte(" ")) + switch { + case bytes.Equal(params[0], []byte("IDENTIFY")): + l := make([]byte, 4) + _, err := io.ReadFull(rdr, l) + if err != nil { + log.Printf(err.Error()) + goto exit + } + size := int32(binary.BigEndian.Uint32(l)) + b := make([]byte, size) + _, err = io.ReadFull(rdr, b) + if err != nil { + log.Printf(err.Error()) + goto exit + } + case bytes.Equal(params[0], []byte("RDY")): + rdy, _ := strconv.Atoi(string(params[1])) + rdyCount = rdy + case bytes.Equal(params[0], []byte("FIN")): + case bytes.Equal(params[0], []byte("REQ")): + } + readDoneChan <- 1 + case <-scriptTime: + inst := n.script[idx] + if bytes.Equal(inst.body, []byte("exit")) { + goto exit + } + if inst.frameType == nsq.FrameTypeMessage { + if rdyCount == 0 { + scriptTime = time.After(n.script[idx+1].delay) + continue + } + rdyCount-- + } + _, err := conn.Write(framedResponse(inst.frameType, inst.body)) + if err != nil { + log.Printf(err.Error()) + goto exit + } + scriptTime = time.After(n.script[idx+1].delay) + idx++ + } + } + +exit: + n.tcpListener.Close() + conn.Close() +} + +func framedResponse(frameType int32, data []byte) []byte { + var w bytes.Buffer + + beBuf := make([]byte, 4) + size := uint32(len(data)) + 4 + + binary.BigEndian.PutUint32(beBuf, size) + _, err := w.Write(beBuf) + if err != nil { + return nil + } + + binary.BigEndian.PutUint32(beBuf, uint32(frameType)) + _, err = w.Write(beBuf) + if err != nil { + return nil + } + + w.Write(data) + return w.Bytes() +} + +func frameMessage(m *nsq.Message) []byte { + var b bytes.Buffer + m.WriteTo(&b) + return b.Bytes() +}