Change point_buffer to metric_buffer to conform will changes in https://github.com/influxdata/telegraf/pull/676

closes #680
This commit is contained in:
Dragostin Yanev (netixen) 2016-02-12 12:05:33 +02:00 committed by Cameron Sparr
parent 512d9822f0
commit 6c353e8b8f
5 changed files with 39 additions and 37 deletions

View File

@ -8,9 +8,10 @@ format that they would like to parse. Currently supports: "json", "influx", and
[here](https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md) [here](https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md)
### Features ### Features
- [#652](https://github.com/influxdata/telegraf/pull/652): CouchDB Input Plugin - [#652](https://github.com/influxdata/telegraf/pull/652): CouchDB Input Plugin. Thanks @codehate!
- [#655](https://github.com/influxdata/telegraf/pull/655): Support parsing arbitrary data formats. Currently limited to kafka_consumer and exec inputs. - [#655](https://github.com/influxdata/telegraf/pull/655): Support parsing arbitrary data formats. Currently limited to kafka_consumer and exec inputs.
- [#671](https://github.com/influxdata/telegraf/pull/671): Dovecot input plugin. - [#671](https://github.com/influxdata/telegraf/pull/671): Dovecot input plugin. Thanks @mikif70!
- [#680](https://github.com/influxdata/telegraf/pull/680): NATS consumer input plugin. Thanks @netixen!
### Bugfixes ### Bugfixes
- [#443](https://github.com/influxdata/telegraf/issues/443): Fix Ping command timeout parameter on Linux. - [#443](https://github.com/influxdata/telegraf/issues/443): Fix Ping command timeout parameter on Linux.

View File

@ -204,6 +204,7 @@ Telegraf can also collect metrics via the following service plugins:
* statsd * statsd
* kafka_consumer * kafka_consumer
* nats_consumer
* github_webhooks * github_webhooks
We'll be adding support for many more over the coming months. Read on if you We'll be adding support for many more over the coming months. Read on if you

View File

@ -19,8 +19,8 @@ from a NATS cluster in parallel.
subjects = ["telegraf"] subjects = ["telegraf"]
### name a queue group ### name a queue group
queue_group = "telegraf_consumers" queue_group = "telegraf_consumers"
### Maximum number of points to buffer between collection intervals ### Maximum number of metrics to buffer between collection intervals
point_buffer = 100000 metric_buffer = 100000
### Data format to consume. This can be "json", "influx" or "graphite" ### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read ### Each data format has it's own unique set of configuration options, read

View File

@ -28,7 +28,7 @@ type natsConsumer struct {
Servers []string Servers []string
Secure bool Secure bool
PointBuffer int MetricBuffer int
parser parsers.Parser parser parsers.Parser
sync.Mutex sync.Mutex
@ -39,7 +39,7 @@ type natsConsumer struct {
in chan *nats.Msg in chan *nats.Msg
// channel for all NATS read errors // channel for all NATS read errors
errs chan error errs chan error
// channel for all incoming parsed points // channel for all incoming parsed metrics
metricC chan telegraf.Metric metricC chan telegraf.Metric
done chan struct{} done chan struct{}
} }
@ -53,8 +53,8 @@ var sampleConfig = `
subjects = ["telegraf"] subjects = ["telegraf"]
### name a queue group ### name a queue group
queue_group = "telegraf_consumers" queue_group = "telegraf_consumers"
### Maximum number of points to buffer between collection intervals ### Maximum number of metrics to buffer between collection intervals
point_buffer = 100000 metric_buffer = 100000
### Data format to consume. This can be "json", "influx" or "graphite" ### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read ### Each data format has it's own unique set of configuration options, read
@ -115,11 +115,11 @@ func (n *natsConsumer) Start() error {
} }
n.done = make(chan struct{}) n.done = make(chan struct{})
if n.PointBuffer == 0 { if n.MetricBuffer == 0 {
n.PointBuffer = 100000 n.MetricBuffer = 100000
} }
n.metricC = make(chan telegraf.Metric, n.PointBuffer) n.metricC = make(chan telegraf.Metric, n.MetricBuffer)
// Start the message reader // Start the message reader
go n.receiver() go n.receiver()
@ -130,7 +130,7 @@ func (n *natsConsumer) Start() error {
} }
// receiver() reads all incoming messages from NATS, and parses them into // receiver() reads all incoming messages from NATS, and parses them into
// influxdb metric points. // telegraf metrics.
func (n *natsConsumer) receiver() { func (n *natsConsumer) receiver() {
defer n.clean() defer n.clean()
for { for {
@ -151,7 +151,7 @@ func (n *natsConsumer) receiver() {
continue continue
default: default:
log.Printf("NATS Consumer buffer is full, dropping a metric." + log.Printf("NATS Consumer buffer is full, dropping a metric." +
" You may want to increase the point_buffer setting") " You may want to increase the metric_buffer setting")
} }
} }
@ -187,10 +187,10 @@ func (n *natsConsumer) Stop() {
func (n *natsConsumer) Gather(acc telegraf.Accumulator) error { func (n *natsConsumer) Gather(acc telegraf.Accumulator) error {
n.Lock() n.Lock()
defer n.Unlock() defer n.Unlock()
npoints := len(n.metricC) nmetrics := len(n.metricC)
for i := 0; i < npoints; i++ { for i := 0; i < nmetrics; i++ {
point := <-n.metricC metric := <-n.metricC
acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time()) acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
} }
return nil return nil
} }

View File

@ -15,26 +15,26 @@ const (
testMsgGraphite = "cpu.load.short.graphite 23422 1454780029" testMsgGraphite = "cpu.load.short.graphite 23422 1454780029"
testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n" testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n"
invalidMsg = "cpu_load_short,host=server01 1422568543702900257" invalidMsg = "cpu_load_short,host=server01 1422568543702900257"
pointBuffer = 5 metricBuffer = 5
) )
func newTestNatsConsumer() (*natsConsumer, chan *nats.Msg) { func newTestNatsConsumer() (*natsConsumer, chan *nats.Msg) {
in := make(chan *nats.Msg, pointBuffer) in := make(chan *nats.Msg, metricBuffer)
n := &natsConsumer{ n := &natsConsumer{
QueueGroup: "test", QueueGroup: "test",
Subjects: []string{"telegraf"}, Subjects: []string{"telegraf"},
Servers: []string{"nats://localhost:4222"}, Servers: []string{"nats://localhost:4222"},
Secure: false, Secure: false,
PointBuffer: pointBuffer, MetricBuffer: metricBuffer,
in: in, in: in,
errs: make(chan error, pointBuffer), errs: make(chan error, metricBuffer),
done: make(chan struct{}), done: make(chan struct{}),
metricC: make(chan telegraf.Metric, pointBuffer), metricC: make(chan telegraf.Metric, metricBuffer),
} }
return n, in return n, in
} }
// Test that the parser parses NATS messages into points // Test that the parser parses NATS messages into metrics
func TestRunParser(t *testing.T) { func TestRunParser(t *testing.T) {
n, in := newTestNatsConsumer() n, in := newTestNatsConsumer()
defer close(n.done) defer close(n.done)
@ -64,24 +64,24 @@ func TestRunParserInvalidMsg(t *testing.T) {
} }
} }
// Test that points are dropped when we hit the buffer limit // Test that metrics are dropped when we hit the buffer limit
func TestRunParserRespectsBuffer(t *testing.T) { func TestRunParserRespectsBuffer(t *testing.T) {
n, in := newTestNatsConsumer() n, in := newTestNatsConsumer()
defer close(n.done) defer close(n.done)
n.parser, _ = parsers.NewInfluxParser() n.parser, _ = parsers.NewInfluxParser()
go n.receiver() go n.receiver()
for i := 0; i < pointBuffer+1; i++ { for i := 0; i < metricBuffer+1; i++ {
in <- natsMsg(testMsg) in <- natsMsg(testMsg)
} }
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
if a := len(n.metricC); a != pointBuffer { if a := len(n.metricC); a != metricBuffer {
t.Errorf("got %v, expected %v", a, pointBuffer) t.Errorf("got %v, expected %v", a, metricBuffer)
} }
} }
// Test that the parser parses nats messages into points // Test that the parser parses line format messages into metrics
func TestRunParserAndGather(t *testing.T) { func TestRunParserAndGather(t *testing.T) {
n, in := newTestNatsConsumer() n, in := newTestNatsConsumer()
defer close(n.done) defer close(n.done)
@ -101,7 +101,7 @@ func TestRunParserAndGather(t *testing.T) {
map[string]interface{}{"value": float64(23422)}) map[string]interface{}{"value": float64(23422)})
} }
// Test that the parser parses nats messages into points // Test that the parser parses graphite format messages into metrics
func TestRunParserAndGatherGraphite(t *testing.T) { func TestRunParserAndGatherGraphite(t *testing.T) {
n, in := newTestNatsConsumer() n, in := newTestNatsConsumer()
defer close(n.done) defer close(n.done)
@ -121,7 +121,7 @@ func TestRunParserAndGatherGraphite(t *testing.T) {
map[string]interface{}{"value": float64(23422)}) map[string]interface{}{"value": float64(23422)})
} }
// Test that the parser parses nats messages into points // Test that the parser parses json format messages into metrics
func TestRunParserAndGatherJSON(t *testing.T) { func TestRunParserAndGatherJSON(t *testing.T) {
n, in := newTestNatsConsumer() n, in := newTestNatsConsumer()
defer close(n.done) defer close(n.done)