Flush based on buffer size rather than time

this includes:
- Add Accumulator to the Start() function of service inputs
- For message consumer plugins, use the Accumulator to constantly add
  metrics and make Gather a dummy function
- rework unit tests to match this new behavior.
- make "flush_buffer_when_full" a config option that defaults to true

closes #666
This commit is contained in:
Cameron Sparr
2016-02-15 17:21:38 -07:00
committed by Michele Fadda
parent b9293f2778
commit eb5dddf0d6
15 changed files with 271 additions and 285 deletions

View File

@@ -28,8 +28,10 @@ type natsConsumer struct {
Servers []string
Secure bool
// Legacy metric buffer support
MetricBuffer int
parser parsers.Parser
parser parsers.Parser
sync.Mutex
Conn *nats.Conn
@@ -39,9 +41,8 @@ type natsConsumer struct {
in chan *nats.Msg
// channel for all NATS read errors
errs chan error
// channel for all incoming parsed metrics
metricC chan telegraf.Metric
done chan struct{}
done chan struct{}
acc telegraf.Accumulator
}
var sampleConfig = `
@@ -53,9 +54,7 @@ var sampleConfig = `
subjects = ["telegraf"]
### name a queue group
queue_group = "telegraf_consumers"
### Maximum number of metrics to buffer between collection intervals
metric_buffer = 100000
### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
@@ -84,10 +83,12 @@ func (n *natsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e erro
}
// Start the nats consumer. Caller must call *natsConsumer.Stop() to clean up.
func (n *natsConsumer) Start() error {
func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
n.Lock()
defer n.Unlock()
n.acc = acc
var connectErr error
opts := nats.DefaultOptions
@@ -115,11 +116,6 @@ func (n *natsConsumer) Start() error {
}
n.done = make(chan struct{})
if n.MetricBuffer == 0 {
n.MetricBuffer = 100000
}
n.metricC = make(chan telegraf.Metric, n.MetricBuffer)
// Start the message reader
go n.receiver()
@@ -146,13 +142,7 @@ func (n *natsConsumer) receiver() {
}
for _, metric := range metrics {
select {
case n.metricC <- metric:
continue
default:
log.Printf("NATS Consumer buffer is full, dropping a metric." +
" You may want to increase the metric_buffer setting")
}
n.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
}
@@ -163,7 +153,6 @@ func (n *natsConsumer) clean() {
n.Lock()
defer n.Unlock()
close(n.in)
close(n.metricC)
close(n.errs)
for _, sub := range n.Subs {
@@ -185,13 +174,6 @@ func (n *natsConsumer) Stop() {
}
func (n *natsConsumer) Gather(acc telegraf.Accumulator) error {
n.Lock()
defer n.Unlock()
nmetrics := len(n.metricC)
for i := 0; i < nmetrics; i++ {
metric := <-n.metricC
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
return nil
}