Change point_buffer to metric_buffer to conform will changes in https://github.com/influxdata/telegraf/pull/676

This commit is contained in:
Dragostin Yanev (netixen) 2016-02-12 12:05:33 +02:00
parent 8b2578b475
commit 5c4146fe68
3 changed files with 35 additions and 35 deletions

View File

@ -19,8 +19,8 @@ from a NATS cluster in parallel.
subjects = ["telegraf"] subjects = ["telegraf"]
### name a queue group ### name a queue group
queue_group = "telegraf_consumers" queue_group = "telegraf_consumers"
### Maximum number of points to buffer between collection intervals ### Maximum number of metrics to buffer between collection intervals
point_buffer = 100000 metric_buffer = 100000
### Data format to consume. This can be "json", "influx" or "graphite" ### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read ### Each data format has it's own unique set of configuration options, read

View File

@ -28,7 +28,7 @@ type natsConsumer struct {
Servers []string Servers []string
Secure bool Secure bool
PointBuffer int MetricBuffer int
parser parsers.Parser parser parsers.Parser
sync.Mutex sync.Mutex
@ -39,7 +39,7 @@ type natsConsumer struct {
in chan *nats.Msg in chan *nats.Msg
// channel for all NATS read errors // channel for all NATS read errors
errs chan error errs chan error
// channel for all incoming parsed points // channel for all incoming parsed metrics
metricC chan telegraf.Metric metricC chan telegraf.Metric
done chan struct{} done chan struct{}
} }
@ -53,8 +53,8 @@ var sampleConfig = `
subjects = ["telegraf"] subjects = ["telegraf"]
### name a queue group ### name a queue group
queue_group = "telegraf_consumers" queue_group = "telegraf_consumers"
### Maximum number of points to buffer between collection intervals ### Maximum number of metrics to buffer between collection intervals
point_buffer = 100000 metric_buffer = 100000
### Data format to consume. This can be "json", "influx" or "graphite" ### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read ### Each data format has it's own unique set of configuration options, read
@ -115,11 +115,11 @@ func (n *natsConsumer) Start() error {
} }
n.done = make(chan struct{}) n.done = make(chan struct{})
if n.PointBuffer == 0 { if n.MetricBuffer == 0 {
n.PointBuffer = 100000 n.MetricBuffer = 100000
} }
n.metricC = make(chan telegraf.Metric, n.PointBuffer) n.metricC = make(chan telegraf.Metric, n.MetricBuffer)
// Start the message reader // Start the message reader
go n.receiver() go n.receiver()
@ -130,7 +130,7 @@ func (n *natsConsumer) Start() error {
} }
// receiver() reads all incoming messages from NATS, and parses them into // receiver() reads all incoming messages from NATS, and parses them into
// influxdb metric points. // telegraf metrics.
func (n *natsConsumer) receiver() { func (n *natsConsumer) receiver() {
defer n.clean() defer n.clean()
for { for {
@ -151,7 +151,7 @@ func (n *natsConsumer) receiver() {
continue continue
default: default:
log.Printf("NATS Consumer buffer is full, dropping a metric." + log.Printf("NATS Consumer buffer is full, dropping a metric." +
" You may want to increase the point_buffer setting") " You may want to increase the metric_buffer setting")
} }
} }
@ -187,10 +187,10 @@ func (n *natsConsumer) Stop() {
func (n *natsConsumer) Gather(acc telegraf.Accumulator) error { func (n *natsConsumer) Gather(acc telegraf.Accumulator) error {
n.Lock() n.Lock()
defer n.Unlock() defer n.Unlock()
npoints := len(n.metricC) nmetrics := len(n.metricC)
for i := 0; i < npoints; i++ { for i := 0; i < nmetrics; i++ {
point := <-n.metricC metric := <-n.metricC
acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time()) acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
} }
return nil return nil
} }

View File

@ -15,26 +15,26 @@ const (
testMsgGraphite = "cpu.load.short.graphite 23422 1454780029" testMsgGraphite = "cpu.load.short.graphite 23422 1454780029"
testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n" testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n"
invalidMsg = "cpu_load_short,host=server01 1422568543702900257" invalidMsg = "cpu_load_short,host=server01 1422568543702900257"
pointBuffer = 5 metricBuffer = 5
) )
func newTestNatsConsumer() (*natsConsumer, chan *nats.Msg) { func newTestNatsConsumer() (*natsConsumer, chan *nats.Msg) {
in := make(chan *nats.Msg, pointBuffer) in := make(chan *nats.Msg, metricBuffer)
n := &natsConsumer{ n := &natsConsumer{
QueueGroup: "test", QueueGroup: "test",
Subjects: []string{"telegraf"}, Subjects: []string{"telegraf"},
Servers: []string{"nats://localhost:4222"}, Servers: []string{"nats://localhost:4222"},
Secure: false, Secure: false,
PointBuffer: pointBuffer, MetricBuffer: metricBuffer,
in: in, in: in,
errs: make(chan error, pointBuffer), errs: make(chan error, metricBuffer),
done: make(chan struct{}), done: make(chan struct{}),
metricC: make(chan telegraf.Metric, pointBuffer), metricC: make(chan telegraf.Metric, metricBuffer),
} }
return n, in return n, in
} }
// Test that the parser parses NATS messages into points // Test that the parser parses NATS messages into metrics
func TestRunParser(t *testing.T) { func TestRunParser(t *testing.T) {
n, in := newTestNatsConsumer() n, in := newTestNatsConsumer()
defer close(n.done) defer close(n.done)
@ -64,24 +64,24 @@ func TestRunParserInvalidMsg(t *testing.T) {
} }
} }
// Test that points are dropped when we hit the buffer limit // Test that metrics are dropped when we hit the buffer limit
func TestRunParserRespectsBuffer(t *testing.T) { func TestRunParserRespectsBuffer(t *testing.T) {
n, in := newTestNatsConsumer() n, in := newTestNatsConsumer()
defer close(n.done) defer close(n.done)
n.parser, _ = parsers.NewInfluxParser() n.parser, _ = parsers.NewInfluxParser()
go n.receiver() go n.receiver()
for i := 0; i < pointBuffer+1; i++ { for i := 0; i < metricBuffer+1; i++ {
in <- natsMsg(testMsg) in <- natsMsg(testMsg)
} }
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
if a := len(n.metricC); a != pointBuffer { if a := len(n.metricC); a != metricBuffer {
t.Errorf("got %v, expected %v", a, pointBuffer) t.Errorf("got %v, expected %v", a, metricBuffer)
} }
} }
// Test that the parser parses nats messages into points // Test that the parser parses line format messages into metrics
func TestRunParserAndGather(t *testing.T) { func TestRunParserAndGather(t *testing.T) {
n, in := newTestNatsConsumer() n, in := newTestNatsConsumer()
defer close(n.done) defer close(n.done)
@ -101,7 +101,7 @@ func TestRunParserAndGather(t *testing.T) {
map[string]interface{}{"value": float64(23422)}) map[string]interface{}{"value": float64(23422)})
} }
// Test that the parser parses nats messages into points // Test that the parser parses graphite format messages into metrics
func TestRunParserAndGatherGraphite(t *testing.T) { func TestRunParserAndGatherGraphite(t *testing.T) {
n, in := newTestNatsConsumer() n, in := newTestNatsConsumer()
defer close(n.done) defer close(n.done)
@ -121,7 +121,7 @@ func TestRunParserAndGatherGraphite(t *testing.T) {
map[string]interface{}{"value": float64(23422)}) map[string]interface{}{"value": float64(23422)})
} }
// Test that the parser parses nats messages into points // Test that the parser parses json format messages into metrics
func TestRunParserAndGatherJSON(t *testing.T) { func TestRunParserAndGatherJSON(t *testing.T) {
n, in := newTestNatsConsumer() n, in := newTestNatsConsumer()
defer close(n.done) defer close(n.done)