From 5c4146fe68df2d7c453e9c1c0b7195f75643bb1e Mon Sep 17 00:00:00 2001 From: "Dragostin Yanev (netixen)" Date: Fri, 12 Feb 2016 12:05:33 +0200 Subject: [PATCH] Change point_buffer to metric_buffer to conform will changes in https://github.com/influxdata/telegraf/pull/676 --- plugins/inputs/nats_consumer/README.md | 4 +- plugins/inputs/nats_consumer/nats_consumer.go | 28 +++++++------- .../nats_consumer/nats_consumer_test.go | 38 +++++++++---------- 3 files changed, 35 insertions(+), 35 deletions(-) diff --git a/plugins/inputs/nats_consumer/README.md b/plugins/inputs/nats_consumer/README.md index f3b67c9d5..b2d027039 100644 --- a/plugins/inputs/nats_consumer/README.md +++ b/plugins/inputs/nats_consumer/README.md @@ -19,8 +19,8 @@ from a NATS cluster in parallel. subjects = ["telegraf"] ### name a queue group queue_group = "telegraf_consumers" - ### Maximum number of points to buffer between collection intervals - point_buffer = 100000 + ### Maximum number of metrics to buffer between collection intervals + metric_buffer = 100000 ### Data format to consume. This can be "json", "influx" or "graphite" ### Each data format has it's own unique set of configuration options, read diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 4b25fa0a1..56d56990f 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -28,8 +28,8 @@ type natsConsumer struct { Servers []string Secure bool - PointBuffer int - parser parsers.Parser + MetricBuffer int + parser parsers.Parser sync.Mutex Conn *nats.Conn @@ -39,7 +39,7 @@ type natsConsumer struct { in chan *nats.Msg // channel for all NATS read errors errs chan error - // channel for all incoming parsed points + // channel for all incoming parsed metrics metricC chan telegraf.Metric done chan struct{} } @@ -53,8 +53,8 @@ var sampleConfig = ` subjects = ["telegraf"] ### name a queue group queue_group = "telegraf_consumers" - ### Maximum number of points to buffer between collection intervals - point_buffer = 100000 + ### Maximum number of metrics to buffer between collection intervals + metric_buffer = 100000 ### Data format to consume. This can be "json", "influx" or "graphite" ### Each data format has it's own unique set of configuration options, read @@ -115,11 +115,11 @@ func (n *natsConsumer) Start() error { } n.done = make(chan struct{}) - if n.PointBuffer == 0 { - n.PointBuffer = 100000 + if n.MetricBuffer == 0 { + n.MetricBuffer = 100000 } - n.metricC = make(chan telegraf.Metric, n.PointBuffer) + n.metricC = make(chan telegraf.Metric, n.MetricBuffer) // Start the message reader go n.receiver() @@ -130,7 +130,7 @@ func (n *natsConsumer) Start() error { } // receiver() reads all incoming messages from NATS, and parses them into -// influxdb metric points. +// telegraf metrics. func (n *natsConsumer) receiver() { defer n.clean() for { @@ -151,7 +151,7 @@ func (n *natsConsumer) receiver() { continue default: log.Printf("NATS Consumer buffer is full, dropping a metric." + - " You may want to increase the point_buffer setting") + " You may want to increase the metric_buffer setting") } } @@ -187,10 +187,10 @@ func (n *natsConsumer) Stop() { func (n *natsConsumer) Gather(acc telegraf.Accumulator) error { n.Lock() defer n.Unlock() - npoints := len(n.metricC) - for i := 0; i < npoints; i++ { - point := <-n.metricC - acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time()) + nmetrics := len(n.metricC) + for i := 0; i < nmetrics; i++ { + metric := <-n.metricC + acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) } return nil } diff --git a/plugins/inputs/nats_consumer/nats_consumer_test.go b/plugins/inputs/nats_consumer/nats_consumer_test.go index 50c663cb4..214695d91 100644 --- a/plugins/inputs/nats_consumer/nats_consumer_test.go +++ b/plugins/inputs/nats_consumer/nats_consumer_test.go @@ -15,26 +15,26 @@ const ( testMsgGraphite = "cpu.load.short.graphite 23422 1454780029" testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n" invalidMsg = "cpu_load_short,host=server01 1422568543702900257" - pointBuffer = 5 + metricBuffer = 5 ) func newTestNatsConsumer() (*natsConsumer, chan *nats.Msg) { - in := make(chan *nats.Msg, pointBuffer) + in := make(chan *nats.Msg, metricBuffer) n := &natsConsumer{ - QueueGroup: "test", - Subjects: []string{"telegraf"}, - Servers: []string{"nats://localhost:4222"}, - Secure: false, - PointBuffer: pointBuffer, - in: in, - errs: make(chan error, pointBuffer), - done: make(chan struct{}), - metricC: make(chan telegraf.Metric, pointBuffer), + QueueGroup: "test", + Subjects: []string{"telegraf"}, + Servers: []string{"nats://localhost:4222"}, + Secure: false, + MetricBuffer: metricBuffer, + in: in, + errs: make(chan error, metricBuffer), + done: make(chan struct{}), + metricC: make(chan telegraf.Metric, metricBuffer), } return n, in } -// Test that the parser parses NATS messages into points +// Test that the parser parses NATS messages into metrics func TestRunParser(t *testing.T) { n, in := newTestNatsConsumer() defer close(n.done) @@ -64,24 +64,24 @@ func TestRunParserInvalidMsg(t *testing.T) { } } -// Test that points are dropped when we hit the buffer limit +// Test that metrics are dropped when we hit the buffer limit func TestRunParserRespectsBuffer(t *testing.T) { n, in := newTestNatsConsumer() defer close(n.done) n.parser, _ = parsers.NewInfluxParser() go n.receiver() - for i := 0; i < pointBuffer+1; i++ { + for i := 0; i < metricBuffer+1; i++ { in <- natsMsg(testMsg) } time.Sleep(time.Millisecond) - if a := len(n.metricC); a != pointBuffer { - t.Errorf("got %v, expected %v", a, pointBuffer) + if a := len(n.metricC); a != metricBuffer { + t.Errorf("got %v, expected %v", a, metricBuffer) } } -// Test that the parser parses nats messages into points +// Test that the parser parses line format messages into metrics func TestRunParserAndGather(t *testing.T) { n, in := newTestNatsConsumer() defer close(n.done) @@ -101,7 +101,7 @@ func TestRunParserAndGather(t *testing.T) { map[string]interface{}{"value": float64(23422)}) } -// Test that the parser parses nats messages into points +// Test that the parser parses graphite format messages into metrics func TestRunParserAndGatherGraphite(t *testing.T) { n, in := newTestNatsConsumer() defer close(n.done) @@ -121,7 +121,7 @@ func TestRunParserAndGatherGraphite(t *testing.T) { map[string]interface{}{"value": float64(23422)}) } -// Test that the parser parses nats messages into points +// Test that the parser parses json format messages into metrics func TestRunParserAndGatherJSON(t *testing.T) { n, in := newTestNatsConsumer() defer close(n.done)