From 6b0ea64d1b1c42415674464818085bcb2342cf14 Mon Sep 17 00:00:00 2001 From: "Dragostin Yanev (netixen)" Date: Fri, 12 Feb 2016 12:05:33 +0200 Subject: [PATCH] Change point_buffer to metric_buffer to conform will changes in https://github.com/influxdata/telegraf/pull/676 closes #680 --- CHANGELOG.md | 5 ++- README.md | 1 + plugins/inputs/nats_consumer/README.md | 4 +- plugins/inputs/nats_consumer/nats_consumer.go | 28 +++++++------- .../nats_consumer/nats_consumer_test.go | 38 +++++++++---------- 5 files changed, 39 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ee69938be..535e0d067 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,9 +8,10 @@ format that they would like to parse. Currently supports: "json", "influx", and [here](https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md) ### Features -- [#652](https://github.com/influxdata/telegraf/pull/652): CouchDB Input Plugin +- [#652](https://github.com/influxdata/telegraf/pull/652): CouchDB Input Plugin. Thanks @codehate! - [#655](https://github.com/influxdata/telegraf/pull/655): Support parsing arbitrary data formats. Currently limited to kafka_consumer and exec inputs. -- [#671](https://github.com/influxdata/telegraf/pull/671): Dovecot input plugin. +- [#671](https://github.com/influxdata/telegraf/pull/671): Dovecot input plugin. Thanks @mikif70! +- [#680](https://github.com/influxdata/telegraf/pull/680): NATS consumer input plugin. Thanks @netixen! ### Bugfixes - [#443](https://github.com/influxdata/telegraf/issues/443): Fix Ping command timeout parameter on Linux. diff --git a/README.md b/README.md index 6109e0841..ac6d68f0a 100644 --- a/README.md +++ b/README.md @@ -204,6 +204,7 @@ Telegraf can also collect metrics via the following service plugins: * statsd * kafka_consumer +* nats_consumer * github_webhooks We'll be adding support for many more over the coming months. Read on if you diff --git a/plugins/inputs/nats_consumer/README.md b/plugins/inputs/nats_consumer/README.md index f3b67c9d5..b2d027039 100644 --- a/plugins/inputs/nats_consumer/README.md +++ b/plugins/inputs/nats_consumer/README.md @@ -19,8 +19,8 @@ from a NATS cluster in parallel. subjects = ["telegraf"] ### name a queue group queue_group = "telegraf_consumers" - ### Maximum number of points to buffer between collection intervals - point_buffer = 100000 + ### Maximum number of metrics to buffer between collection intervals + metric_buffer = 100000 ### Data format to consume. This can be "json", "influx" or "graphite" ### Each data format has it's own unique set of configuration options, read diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 4b25fa0a1..56d56990f 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -28,8 +28,8 @@ type natsConsumer struct { Servers []string Secure bool - PointBuffer int - parser parsers.Parser + MetricBuffer int + parser parsers.Parser sync.Mutex Conn *nats.Conn @@ -39,7 +39,7 @@ type natsConsumer struct { in chan *nats.Msg // channel for all NATS read errors errs chan error - // channel for all incoming parsed points + // channel for all incoming parsed metrics metricC chan telegraf.Metric done chan struct{} } @@ -53,8 +53,8 @@ var sampleConfig = ` subjects = ["telegraf"] ### name a queue group queue_group = "telegraf_consumers" - ### Maximum number of points to buffer between collection intervals - point_buffer = 100000 + ### Maximum number of metrics to buffer between collection intervals + metric_buffer = 100000 ### Data format to consume. This can be "json", "influx" or "graphite" ### Each data format has it's own unique set of configuration options, read @@ -115,11 +115,11 @@ func (n *natsConsumer) Start() error { } n.done = make(chan struct{}) - if n.PointBuffer == 0 { - n.PointBuffer = 100000 + if n.MetricBuffer == 0 { + n.MetricBuffer = 100000 } - n.metricC = make(chan telegraf.Metric, n.PointBuffer) + n.metricC = make(chan telegraf.Metric, n.MetricBuffer) // Start the message reader go n.receiver() @@ -130,7 +130,7 @@ func (n *natsConsumer) Start() error { } // receiver() reads all incoming messages from NATS, and parses them into -// influxdb metric points. +// telegraf metrics. func (n *natsConsumer) receiver() { defer n.clean() for { @@ -151,7 +151,7 @@ func (n *natsConsumer) receiver() { continue default: log.Printf("NATS Consumer buffer is full, dropping a metric." + - " You may want to increase the point_buffer setting") + " You may want to increase the metric_buffer setting") } } @@ -187,10 +187,10 @@ func (n *natsConsumer) Stop() { func (n *natsConsumer) Gather(acc telegraf.Accumulator) error { n.Lock() defer n.Unlock() - npoints := len(n.metricC) - for i := 0; i < npoints; i++ { - point := <-n.metricC - acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time()) + nmetrics := len(n.metricC) + for i := 0; i < nmetrics; i++ { + metric := <-n.metricC + acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) } return nil } diff --git a/plugins/inputs/nats_consumer/nats_consumer_test.go b/plugins/inputs/nats_consumer/nats_consumer_test.go index 50c663cb4..214695d91 100644 --- a/plugins/inputs/nats_consumer/nats_consumer_test.go +++ b/plugins/inputs/nats_consumer/nats_consumer_test.go @@ -15,26 +15,26 @@ const ( testMsgGraphite = "cpu.load.short.graphite 23422 1454780029" testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n" invalidMsg = "cpu_load_short,host=server01 1422568543702900257" - pointBuffer = 5 + metricBuffer = 5 ) func newTestNatsConsumer() (*natsConsumer, chan *nats.Msg) { - in := make(chan *nats.Msg, pointBuffer) + in := make(chan *nats.Msg, metricBuffer) n := &natsConsumer{ - QueueGroup: "test", - Subjects: []string{"telegraf"}, - Servers: []string{"nats://localhost:4222"}, - Secure: false, - PointBuffer: pointBuffer, - in: in, - errs: make(chan error, pointBuffer), - done: make(chan struct{}), - metricC: make(chan telegraf.Metric, pointBuffer), + QueueGroup: "test", + Subjects: []string{"telegraf"}, + Servers: []string{"nats://localhost:4222"}, + Secure: false, + MetricBuffer: metricBuffer, + in: in, + errs: make(chan error, metricBuffer), + done: make(chan struct{}), + metricC: make(chan telegraf.Metric, metricBuffer), } return n, in } -// Test that the parser parses NATS messages into points +// Test that the parser parses NATS messages into metrics func TestRunParser(t *testing.T) { n, in := newTestNatsConsumer() defer close(n.done) @@ -64,24 +64,24 @@ func TestRunParserInvalidMsg(t *testing.T) { } } -// Test that points are dropped when we hit the buffer limit +// Test that metrics are dropped when we hit the buffer limit func TestRunParserRespectsBuffer(t *testing.T) { n, in := newTestNatsConsumer() defer close(n.done) n.parser, _ = parsers.NewInfluxParser() go n.receiver() - for i := 0; i < pointBuffer+1; i++ { + for i := 0; i < metricBuffer+1; i++ { in <- natsMsg(testMsg) } time.Sleep(time.Millisecond) - if a := len(n.metricC); a != pointBuffer { - t.Errorf("got %v, expected %v", a, pointBuffer) + if a := len(n.metricC); a != metricBuffer { + t.Errorf("got %v, expected %v", a, metricBuffer) } } -// Test that the parser parses nats messages into points +// Test that the parser parses line format messages into metrics func TestRunParserAndGather(t *testing.T) { n, in := newTestNatsConsumer() defer close(n.done) @@ -101,7 +101,7 @@ func TestRunParserAndGather(t *testing.T) { map[string]interface{}{"value": float64(23422)}) } -// Test that the parser parses nats messages into points +// Test that the parser parses graphite format messages into metrics func TestRunParserAndGatherGraphite(t *testing.T) { n, in := newTestNatsConsumer() defer close(n.done) @@ -121,7 +121,7 @@ func TestRunParserAndGatherGraphite(t *testing.T) { map[string]interface{}{"value": float64(23422)}) } -// Test that the parser parses nats messages into points +// Test that the parser parses json format messages into metrics func TestRunParserAndGatherJSON(t *testing.T) { n, in := newTestNatsConsumer() defer close(n.done)