From 8b2578b475f796fa08dd46064179b5384506bd22 Mon Sep 17 00:00:00 2001 From: "Dragostin Yanev (netixen)" Date: Thu, 11 Feb 2016 01:28:52 +0200 Subject: [PATCH] Add NATS consumer input plugin. --- Godeps | 1 + plugins/inputs/all/all.go | 1 + plugins/inputs/nats_consumer/README.md | 38 ++++ plugins/inputs/nats_consumer/nats_consumer.go | 202 ++++++++++++++++++ .../nats_consumer/nats_consumer_test.go | 152 +++++++++++++ 5 files changed, 394 insertions(+) create mode 100644 plugins/inputs/nats_consumer/README.md create mode 100644 plugins/inputs/nats_consumer/nats_consumer.go create mode 100644 plugins/inputs/nats_consumer/nats_consumer_test.go diff --git a/Godeps b/Godeps index 5cdfecbe7..0b9a16727 100644 --- a/Godeps +++ b/Godeps @@ -28,6 +28,7 @@ github.com/matttproud/golang_protobuf_extensions d0c3fe89de86839aecf2e0579c40ba3 github.com/mreiferson/go-snappystream 028eae7ab5c4c9e2d1cb4c4ca1e53259bbe7e504 github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b github.com/naoina/toml 751171607256bb66e64c9f0220c00662420c38e9 +github.com/nats-io/nats 6a83f1a633cfbfd90aa648ac99fb38c06a8b40df github.com/nsqio/go-nsq 2118015c120962edc5d03325c680daf3163a8b5f github.com/pborman/uuid dee7705ef7b324f27ceb85a121c61f2c2e8ce988 github.com/pmezard/go-difflib 792786c7400a136282c1664665ae0a8db921c6c2 diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index e7329b042..794885129 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -22,6 +22,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/memcached" _ "github.com/influxdata/telegraf/plugins/inputs/mongodb" _ "github.com/influxdata/telegraf/plugins/inputs/mysql" + _ "github.com/influxdata/telegraf/plugins/inputs/nats_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/nginx" _ "github.com/influxdata/telegraf/plugins/inputs/nsq" _ "github.com/influxdata/telegraf/plugins/inputs/passenger" diff --git a/plugins/inputs/nats_consumer/README.md b/plugins/inputs/nats_consumer/README.md new file mode 100644 index 000000000..f3b67c9d5 --- /dev/null +++ b/plugins/inputs/nats_consumer/README.md @@ -0,0 +1,38 @@ +# NATS Consumer + +The [NATS](http://www.nats.io/about/) consumer plugin reads from +specified NATS subjects and adds messages to InfluxDB. The plugin expects messages +in the [Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md). +A [Queue Group](http://www.nats.io/documentation/concepts/nats-queueing/) +is used when subscribing to subjects so multiple instances of telegraf can read +from a NATS cluster in parallel. + +## Configuration +``` +# Read metrics from NATS subject(s) +[[inputs.nats_consumer]] + ### urls of NATS servers + servers = ["nats://localhost:4222"] + ### Use Transport Layer Security + secure = false + ### subject(s) to consume + subjects = ["telegraf"] + ### name a queue group + queue_group = "telegraf_consumers" + ### Maximum number of points to buffer between collection intervals + point_buffer = 100000 + + ### Data format to consume. This can be "json", "influx" or "graphite" + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md + data_format = "influx" +``` + +## Testing + +To run tests: + +``` +go test +``` \ No newline at end of file diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go new file mode 100644 index 000000000..4b25fa0a1 --- /dev/null +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -0,0 +1,202 @@ +package natsconsumer + +import ( + "fmt" + "log" + "sync" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/nats-io/nats" +) + +type natsError struct { + conn *nats.Conn + sub *nats.Subscription + err error +} + +func (e natsError) Error() string { + return fmt.Sprintf("%s url:%s id:%s sub:%s queue:%s", + e.err.Error(), e.conn.ConnectedUrl(), e.conn.ConnectedServerId(), e.sub.Subject, e.sub.Queue) +} + +type natsConsumer struct { + QueueGroup string + Subjects []string + Servers []string + Secure bool + + PointBuffer int + parser parsers.Parser + + sync.Mutex + Conn *nats.Conn + Subs []*nats.Subscription + + // channel for all incoming NATS messages + in chan *nats.Msg + // channel for all NATS read errors + errs chan error + // channel for all incoming parsed points + metricC chan telegraf.Metric + done chan struct{} +} + +var sampleConfig = ` + ### urls of NATS servers + servers = ["nats://localhost:4222"] + ### Use Transport Layer Security + secure = false + ### subject(s) to consume + subjects = ["telegraf"] + ### name a queue group + queue_group = "telegraf_consumers" + ### Maximum number of points to buffer between collection intervals + point_buffer = 100000 + + ### Data format to consume. This can be "json", "influx" or "graphite" + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md + data_format = "influx" +` + +func (n *natsConsumer) SampleConfig() string { + return sampleConfig +} + +func (n *natsConsumer) Description() string { + return "Read metrics from NATS subject(s)" +} + +func (n *natsConsumer) SetParser(parser parsers.Parser) { + n.parser = parser +} + +func (n *natsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e error) { + select { + case n.errs <- natsError{conn: c, sub: s, err: e}: + default: + return + } +} + +// Start the nats consumer. Caller must call *natsConsumer.Stop() to clean up. +func (n *natsConsumer) Start() error { + n.Lock() + defer n.Unlock() + + var connectErr error + + opts := nats.DefaultOptions + opts.Servers = n.Servers + opts.Secure = n.Secure + + if n.Conn == nil || n.Conn.IsClosed() { + n.Conn, connectErr = opts.Connect() + if connectErr != nil { + return connectErr + } + + // Setup message and error channels + n.errs = make(chan error) + n.Conn.SetErrorHandler(n.natsErrHandler) + + n.in = make(chan *nats.Msg) + for _, subj := range n.Subjects { + sub, err := n.Conn.ChanQueueSubscribe(subj, n.QueueGroup, n.in) + if err != nil { + return err + } + n.Subs = append(n.Subs, sub) + } + } + + n.done = make(chan struct{}) + if n.PointBuffer == 0 { + n.PointBuffer = 100000 + } + + n.metricC = make(chan telegraf.Metric, n.PointBuffer) + + // Start the message reader + go n.receiver() + log.Printf("Started the NATS consumer service, nats: %v, subjects: %v, queue: %v\n", + n.Conn.ConnectedUrl(), n.Subjects, n.QueueGroup) + + return nil +} + +// receiver() reads all incoming messages from NATS, and parses them into +// influxdb metric points. +func (n *natsConsumer) receiver() { + defer n.clean() + for { + select { + case <-n.done: + return + case err := <-n.errs: + log.Printf("error reading from %s\n", err.Error()) + case msg := <-n.in: + metrics, err := n.parser.Parse(msg.Data) + if err != nil { + log.Printf("subject: %s, error: %s", msg.Subject, err.Error()) + } + + for _, metric := range metrics { + select { + case n.metricC <- metric: + continue + default: + log.Printf("NATS Consumer buffer is full, dropping a metric." + + " You may want to increase the point_buffer setting") + } + } + + } + } +} + +func (n *natsConsumer) clean() { + n.Lock() + defer n.Unlock() + close(n.in) + close(n.metricC) + close(n.errs) + + for _, sub := range n.Subs { + if err := sub.Unsubscribe(); err != nil { + log.Printf("Error unsubscribing from subject %s in queue %s: %s\n", + sub.Subject, sub.Queue, err.Error()) + } + } + + if n.Conn != nil && !n.Conn.IsClosed() { + n.Conn.Close() + } +} + +func (n *natsConsumer) Stop() { + n.Lock() + close(n.done) + n.Unlock() +} + +func (n *natsConsumer) Gather(acc telegraf.Accumulator) error { + n.Lock() + defer n.Unlock() + npoints := len(n.metricC) + for i := 0; i < npoints; i++ { + point := <-n.metricC + acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time()) + } + return nil +} + +func init() { + inputs.Add("nats_consumer", func() telegraf.Input { + return &natsConsumer{} + }) +} diff --git a/plugins/inputs/nats_consumer/nats_consumer_test.go b/plugins/inputs/nats_consumer/nats_consumer_test.go new file mode 100644 index 000000000..50c663cb4 --- /dev/null +++ b/plugins/inputs/nats_consumer/nats_consumer_test.go @@ -0,0 +1,152 @@ +package natsconsumer + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/testutil" + "github.com/nats-io/nats" +) + +const ( + testMsg = "cpu_load_short,host=server01 value=23422.0 1422568543702900257" + testMsgGraphite = "cpu.load.short.graphite 23422 1454780029" + testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n" + invalidMsg = "cpu_load_short,host=server01 1422568543702900257" + pointBuffer = 5 +) + +func newTestNatsConsumer() (*natsConsumer, chan *nats.Msg) { + in := make(chan *nats.Msg, pointBuffer) + n := &natsConsumer{ + QueueGroup: "test", + Subjects: []string{"telegraf"}, + Servers: []string{"nats://localhost:4222"}, + Secure: false, + PointBuffer: pointBuffer, + in: in, + errs: make(chan error, pointBuffer), + done: make(chan struct{}), + metricC: make(chan telegraf.Metric, pointBuffer), + } + return n, in +} + +// Test that the parser parses NATS messages into points +func TestRunParser(t *testing.T) { + n, in := newTestNatsConsumer() + defer close(n.done) + + n.parser, _ = parsers.NewInfluxParser() + go n.receiver() + in <- natsMsg(testMsg) + time.Sleep(time.Millisecond) + + if a := len(n.metricC); a != 1 { + t.Errorf("got %v, expected %v", a, 1) + } +} + +// Test that the parser ignores invalid messages +func TestRunParserInvalidMsg(t *testing.T) { + n, in := newTestNatsConsumer() + defer close(n.done) + + n.parser, _ = parsers.NewInfluxParser() + go n.receiver() + in <- natsMsg(invalidMsg) + time.Sleep(time.Millisecond) + + if a := len(n.metricC); a != 0 { + t.Errorf("got %v, expected %v", a, 0) + } +} + +// Test that points are dropped when we hit the buffer limit +func TestRunParserRespectsBuffer(t *testing.T) { + n, in := newTestNatsConsumer() + defer close(n.done) + + n.parser, _ = parsers.NewInfluxParser() + go n.receiver() + for i := 0; i < pointBuffer+1; i++ { + in <- natsMsg(testMsg) + } + time.Sleep(time.Millisecond) + + if a := len(n.metricC); a != pointBuffer { + t.Errorf("got %v, expected %v", a, pointBuffer) + } +} + +// Test that the parser parses nats messages into points +func TestRunParserAndGather(t *testing.T) { + n, in := newTestNatsConsumer() + defer close(n.done) + + n.parser, _ = parsers.NewInfluxParser() + go n.receiver() + in <- natsMsg(testMsg) + time.Sleep(time.Millisecond) + + acc := testutil.Accumulator{} + n.Gather(&acc) + + if a := len(acc.Metrics); a != 1 { + t.Errorf("got %v, expected %v", a, 1) + } + acc.AssertContainsFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(23422)}) +} + +// Test that the parser parses nats messages into points +func TestRunParserAndGatherGraphite(t *testing.T) { + n, in := newTestNatsConsumer() + defer close(n.done) + + n.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) + go n.receiver() + in <- natsMsg(testMsgGraphite) + time.Sleep(time.Millisecond) + + acc := testutil.Accumulator{} + n.Gather(&acc) + + if a := len(acc.Metrics); a != 1 { + t.Errorf("got %v, expected %v", a, 1) + } + acc.AssertContainsFields(t, "cpu_load_short_graphite", + map[string]interface{}{"value": float64(23422)}) +} + +// Test that the parser parses nats messages into points +func TestRunParserAndGatherJSON(t *testing.T) { + n, in := newTestNatsConsumer() + defer close(n.done) + + n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil) + go n.receiver() + in <- natsMsg(testMsgJSON) + time.Sleep(time.Millisecond) + + acc := testutil.Accumulator{} + n.Gather(&acc) + + if a := len(acc.Metrics); a != 1 { + t.Errorf("got %v, expected %v", a, 1) + } + acc.AssertContainsFields(t, "nats_json_test", + map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }) +} + +func natsMsg(val string) *nats.Msg { + return &nats.Msg{ + Subject: "telegraf", + Data: []byte(val), + } +}