Change point_buffer to metric_buffer to conform will changes in https://github.com/influxdata/telegraf/pull/676

closes #680
This commit is contained in:
Dragostin Yanev (netixen)
2016-02-12 12:05:33 +02:00
committed by Michele Fadda
parent 31d7694039
commit 6b0ea64d1b
5 changed files with 39 additions and 37 deletions

View File

@@ -28,8 +28,8 @@ type natsConsumer struct {
Servers []string
Secure bool
PointBuffer int
parser parsers.Parser
MetricBuffer int
parser parsers.Parser
sync.Mutex
Conn *nats.Conn
@@ -39,7 +39,7 @@ type natsConsumer struct {
in chan *nats.Msg
// channel for all NATS read errors
errs chan error
// channel for all incoming parsed points
// channel for all incoming parsed metrics
metricC chan telegraf.Metric
done chan struct{}
}
@@ -53,8 +53,8 @@ var sampleConfig = `
subjects = ["telegraf"]
### name a queue group
queue_group = "telegraf_consumers"
### Maximum number of points to buffer between collection intervals
point_buffer = 100000
### Maximum number of metrics to buffer between collection intervals
metric_buffer = 100000
### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
@@ -115,11 +115,11 @@ func (n *natsConsumer) Start() error {
}
n.done = make(chan struct{})
if n.PointBuffer == 0 {
n.PointBuffer = 100000
if n.MetricBuffer == 0 {
n.MetricBuffer = 100000
}
n.metricC = make(chan telegraf.Metric, n.PointBuffer)
n.metricC = make(chan telegraf.Metric, n.MetricBuffer)
// Start the message reader
go n.receiver()
@@ -130,7 +130,7 @@ func (n *natsConsumer) Start() error {
}
// receiver() reads all incoming messages from NATS, and parses them into
// influxdb metric points.
// telegraf metrics.
func (n *natsConsumer) receiver() {
defer n.clean()
for {
@@ -151,7 +151,7 @@ func (n *natsConsumer) receiver() {
continue
default:
log.Printf("NATS Consumer buffer is full, dropping a metric." +
" You may want to increase the point_buffer setting")
" You may want to increase the metric_buffer setting")
}
}
@@ -187,10 +187,10 @@ func (n *natsConsumer) Stop() {
func (n *natsConsumer) Gather(acc telegraf.Accumulator) error {
n.Lock()
defer n.Unlock()
npoints := len(n.metricC)
for i := 0; i < npoints; i++ {
point := <-n.metricC
acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time())
nmetrics := len(n.metricC)
for i := 0; i < nmetrics; i++ {
metric := <-n.metricC
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
return nil
}