Flush based on buffer size rather than time
this includes: - Add Accumulator to the Start() function of service inputs - For message consumer plugins, use the Accumulator to constantly add metrics and make Gather a dummy function - rework unit tests to match this new behavior. - make "flush_buffer_when_full" a config option that defaults to true closes #666
This commit is contained in:
@@ -1,7 +1,6 @@
|
||||
package kafka_consumer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -19,11 +18,13 @@ type Kafka struct {
|
||||
Topics []string
|
||||
ZookeeperPeers []string
|
||||
Consumer *consumergroup.ConsumerGroup
|
||||
MetricBuffer int
|
||||
|
||||
// Legacy metric buffer support
|
||||
MetricBuffer int
|
||||
// TODO remove PointBuffer, legacy support
|
||||
PointBuffer int
|
||||
Offset string
|
||||
|
||||
Offset string
|
||||
parser parsers.Parser
|
||||
|
||||
sync.Mutex
|
||||
@@ -32,9 +33,10 @@ type Kafka struct {
|
||||
in <-chan *sarama.ConsumerMessage
|
||||
// channel for all kafka consumer errors
|
||||
errs <-chan *sarama.ConsumerError
|
||||
// channel for all incoming parsed kafka metrics
|
||||
metricC chan telegraf.Metric
|
||||
done chan struct{}
|
||||
done chan struct{}
|
||||
|
||||
// keep the accumulator internally:
|
||||
acc telegraf.Accumulator
|
||||
|
||||
// doNotCommitMsgs tells the parser not to call CommitUpTo on the consumer
|
||||
// this is mostly for test purposes, but there may be a use-case for it later.
|
||||
@@ -48,8 +50,6 @@ var sampleConfig = `
|
||||
zookeeper_peers = ["localhost:2181"]
|
||||
### the name of the consumer group
|
||||
consumer_group = "telegraf_metrics_consumers"
|
||||
### Maximum number of metrics to buffer between collection intervals
|
||||
metric_buffer = 100000
|
||||
### Offset (must be either "oldest" or "newest")
|
||||
offset = "oldest"
|
||||
|
||||
@@ -72,11 +72,13 @@ func (k *Kafka) SetParser(parser parsers.Parser) {
|
||||
k.parser = parser
|
||||
}
|
||||
|
||||
func (k *Kafka) Start() error {
|
||||
func (k *Kafka) Start(acc telegraf.Accumulator) error {
|
||||
k.Lock()
|
||||
defer k.Unlock()
|
||||
var consumerErr error
|
||||
|
||||
k.acc = acc
|
||||
|
||||
config := consumergroup.NewConfig()
|
||||
switch strings.ToLower(k.Offset) {
|
||||
case "oldest", "":
|
||||
@@ -106,13 +108,6 @@ func (k *Kafka) Start() error {
|
||||
}
|
||||
|
||||
k.done = make(chan struct{})
|
||||
if k.PointBuffer == 0 && k.MetricBuffer == 0 {
|
||||
k.MetricBuffer = 100000
|
||||
} else if k.PointBuffer > 0 {
|
||||
// Legacy support of PointBuffer field TODO remove
|
||||
k.MetricBuffer = k.PointBuffer
|
||||
}
|
||||
k.metricC = make(chan telegraf.Metric, k.MetricBuffer)
|
||||
|
||||
// Start the kafka message reader
|
||||
go k.receiver()
|
||||
@@ -138,14 +133,7 @@ func (k *Kafka) receiver() {
|
||||
}
|
||||
|
||||
for _, metric := range metrics {
|
||||
fmt.Println(string(metric.Name()))
|
||||
select {
|
||||
case k.metricC <- metric:
|
||||
continue
|
||||
default:
|
||||
log.Printf("Kafka Consumer buffer is full, dropping a metric." +
|
||||
" You may want to increase the metric_buffer setting")
|
||||
}
|
||||
k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
|
||||
}
|
||||
|
||||
if !k.doNotCommitMsgs {
|
||||
@@ -169,13 +157,6 @@ func (k *Kafka) Stop() {
|
||||
}
|
||||
|
||||
func (k *Kafka) Gather(acc telegraf.Accumulator) error {
|
||||
k.Lock()
|
||||
defer k.Unlock()
|
||||
nmetrics := len(k.metricC)
|
||||
for i := 0; i < nmetrics; i++ {
|
||||
metric := <-k.metricC
|
||||
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -44,18 +44,19 @@ func TestReadsMetricsFromKafka(t *testing.T) {
|
||||
}
|
||||
p, _ := parsers.NewInfluxParser()
|
||||
k.SetParser(p)
|
||||
if err := k.Start(); err != nil {
|
||||
|
||||
// Verify that we can now gather the sent message
|
||||
var acc testutil.Accumulator
|
||||
|
||||
// Sanity check
|
||||
assert.Equal(t, 0, len(acc.Metrics), "There should not be any points")
|
||||
if err := k.Start(&acc); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
} else {
|
||||
defer k.Stop()
|
||||
}
|
||||
|
||||
waitForPoint(k, t)
|
||||
|
||||
// Verify that we can now gather the sent message
|
||||
var acc testutil.Accumulator
|
||||
// Sanity check
|
||||
assert.Equal(t, 0, len(acc.Metrics), "There should not be any points")
|
||||
waitForPoint(&acc, t)
|
||||
|
||||
// Gather points
|
||||
err = k.Gather(&acc)
|
||||
@@ -77,7 +78,7 @@ func TestReadsMetricsFromKafka(t *testing.T) {
|
||||
|
||||
// Waits for the metric that was sent to the kafka broker to arrive at the kafka
|
||||
// consumer
|
||||
func waitForPoint(k *Kafka, t *testing.T) {
|
||||
func waitForPoint(acc *testutil.Accumulator, t *testing.T) {
|
||||
// Give the kafka container up to 2 seconds to get the point to the consumer
|
||||
ticker := time.NewTicker(5 * time.Millisecond)
|
||||
counter := 0
|
||||
@@ -87,7 +88,7 @@ func waitForPoint(k *Kafka, t *testing.T) {
|
||||
counter++
|
||||
if counter > 1000 {
|
||||
t.Fatal("Waited for 5s, point never arrived to consumer")
|
||||
} else if len(k.metricC) == 1 {
|
||||
} else if acc.NFields() == 1 {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
|
||||
@@ -17,29 +16,28 @@ const (
|
||||
testMsgGraphite = "cpu.load.short.graphite 23422 1454780029"
|
||||
testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n"
|
||||
invalidMsg = "cpu_load_short,host=server01 1422568543702900257"
|
||||
pointBuffer = 5
|
||||
)
|
||||
|
||||
func NewTestKafka() (*Kafka, chan *sarama.ConsumerMessage) {
|
||||
in := make(chan *sarama.ConsumerMessage, pointBuffer)
|
||||
func newTestKafka() (*Kafka, chan *sarama.ConsumerMessage) {
|
||||
in := make(chan *sarama.ConsumerMessage, 1000)
|
||||
k := Kafka{
|
||||
ConsumerGroup: "test",
|
||||
Topics: []string{"telegraf"},
|
||||
ZookeeperPeers: []string{"localhost:2181"},
|
||||
PointBuffer: pointBuffer,
|
||||
Offset: "oldest",
|
||||
in: in,
|
||||
doNotCommitMsgs: true,
|
||||
errs: make(chan *sarama.ConsumerError, pointBuffer),
|
||||
errs: make(chan *sarama.ConsumerError, 1000),
|
||||
done: make(chan struct{}),
|
||||
metricC: make(chan telegraf.Metric, pointBuffer),
|
||||
}
|
||||
return &k, in
|
||||
}
|
||||
|
||||
// Test that the parser parses kafka messages into points
|
||||
func TestRunParser(t *testing.T) {
|
||||
k, in := NewTestKafka()
|
||||
k, in := newTestKafka()
|
||||
acc := testutil.Accumulator{}
|
||||
k.acc = &acc
|
||||
defer close(k.done)
|
||||
|
||||
k.parser, _ = parsers.NewInfluxParser()
|
||||
@@ -47,12 +45,14 @@ func TestRunParser(t *testing.T) {
|
||||
in <- saramaMsg(testMsg)
|
||||
time.Sleep(time.Millisecond)
|
||||
|
||||
assert.Equal(t, len(k.metricC), 1)
|
||||
assert.Equal(t, acc.NFields(), 1)
|
||||
}
|
||||
|
||||
// Test that the parser ignores invalid messages
|
||||
func TestRunParserInvalidMsg(t *testing.T) {
|
||||
k, in := NewTestKafka()
|
||||
k, in := newTestKafka()
|
||||
acc := testutil.Accumulator{}
|
||||
k.acc = &acc
|
||||
defer close(k.done)
|
||||
|
||||
k.parser, _ = parsers.NewInfluxParser()
|
||||
@@ -60,27 +60,14 @@ func TestRunParserInvalidMsg(t *testing.T) {
|
||||
in <- saramaMsg(invalidMsg)
|
||||
time.Sleep(time.Millisecond)
|
||||
|
||||
assert.Equal(t, len(k.metricC), 0)
|
||||
}
|
||||
|
||||
// Test that points are dropped when we hit the buffer limit
|
||||
func TestRunParserRespectsBuffer(t *testing.T) {
|
||||
k, in := NewTestKafka()
|
||||
defer close(k.done)
|
||||
|
||||
k.parser, _ = parsers.NewInfluxParser()
|
||||
go k.receiver()
|
||||
for i := 0; i < pointBuffer+1; i++ {
|
||||
in <- saramaMsg(testMsg)
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
|
||||
assert.Equal(t, len(k.metricC), 5)
|
||||
assert.Equal(t, acc.NFields(), 0)
|
||||
}
|
||||
|
||||
// Test that the parser parses kafka messages into points
|
||||
func TestRunParserAndGather(t *testing.T) {
|
||||
k, in := NewTestKafka()
|
||||
k, in := newTestKafka()
|
||||
acc := testutil.Accumulator{}
|
||||
k.acc = &acc
|
||||
defer close(k.done)
|
||||
|
||||
k.parser, _ = parsers.NewInfluxParser()
|
||||
@@ -88,17 +75,18 @@ func TestRunParserAndGather(t *testing.T) {
|
||||
in <- saramaMsg(testMsg)
|
||||
time.Sleep(time.Millisecond)
|
||||
|
||||
acc := testutil.Accumulator{}
|
||||
k.Gather(&acc)
|
||||
|
||||
assert.Equal(t, len(acc.Metrics), 1)
|
||||
assert.Equal(t, acc.NFields(), 1)
|
||||
acc.AssertContainsFields(t, "cpu_load_short",
|
||||
map[string]interface{}{"value": float64(23422)})
|
||||
}
|
||||
|
||||
// Test that the parser parses kafka messages into points
|
||||
func TestRunParserAndGatherGraphite(t *testing.T) {
|
||||
k, in := NewTestKafka()
|
||||
k, in := newTestKafka()
|
||||
acc := testutil.Accumulator{}
|
||||
k.acc = &acc
|
||||
defer close(k.done)
|
||||
|
||||
k.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
|
||||
@@ -106,17 +94,18 @@ func TestRunParserAndGatherGraphite(t *testing.T) {
|
||||
in <- saramaMsg(testMsgGraphite)
|
||||
time.Sleep(time.Millisecond)
|
||||
|
||||
acc := testutil.Accumulator{}
|
||||
k.Gather(&acc)
|
||||
|
||||
assert.Equal(t, len(acc.Metrics), 1)
|
||||
assert.Equal(t, acc.NFields(), 1)
|
||||
acc.AssertContainsFields(t, "cpu_load_short_graphite",
|
||||
map[string]interface{}{"value": float64(23422)})
|
||||
}
|
||||
|
||||
// Test that the parser parses kafka messages into points
|
||||
func TestRunParserAndGatherJSON(t *testing.T) {
|
||||
k, in := NewTestKafka()
|
||||
k, in := newTestKafka()
|
||||
acc := testutil.Accumulator{}
|
||||
k.acc = &acc
|
||||
defer close(k.done)
|
||||
|
||||
k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
|
||||
@@ -124,10 +113,9 @@ func TestRunParserAndGatherJSON(t *testing.T) {
|
||||
in <- saramaMsg(testMsgJSON)
|
||||
time.Sleep(time.Millisecond)
|
||||
|
||||
acc := testutil.Accumulator{}
|
||||
k.Gather(&acc)
|
||||
|
||||
assert.Equal(t, len(acc.Metrics), 1)
|
||||
assert.Equal(t, acc.NFields(), 2)
|
||||
acc.AssertContainsFields(t, "kafka_json_test",
|
||||
map[string]interface{}{
|
||||
"a": float64(5),
|
||||
|
||||
Reference in New Issue
Block a user