Fix kafka plugin and rename to kafka_consumer

fixes #371
This commit is contained in:
Cameron Sparr
2015-11-16 13:12:45 -07:00
parent a3feddd8ed
commit 970bfce997
11 changed files with 462 additions and 430 deletions

View File

@@ -15,43 +15,77 @@ func TestReadsMetricsFromKafka(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
var zkPeers, brokerPeers []string
zkPeers = []string{testutil.GetLocalHost() + ":2181"}
brokerPeers = []string{testutil.GetLocalHost() + ":9092"}
k := &Kafka{
ConsumerGroupName: "telegraf_test_consumers",
Topic: fmt.Sprintf("telegraf_test_topic_%d", time.Now().Unix()),
ZookeeperPeers: zkPeers,
}
brokerPeers := []string{testutil.GetLocalHost() + ":9092"}
zkPeers := []string{testutil.GetLocalHost() + ":2181"}
testTopic := fmt.Sprintf("telegraf_test_topic_%d", time.Now().Unix())
// Send a Kafka message to the kafka host
msg := "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257"
producer, err := sarama.NewSyncProducer(brokerPeers, nil)
require.NoError(t, err)
_, _, err = producer.SendMessage(&sarama.ProducerMessage{Topic: k.Topic, Value: sarama.StringEncoder(msg)})
_, _, err = producer.SendMessage(
&sarama.ProducerMessage{
Topic: testTopic,
Value: sarama.StringEncoder(msg),
})
require.NoError(t, err)
defer producer.Close()
producer.Close()
// Start the Kafka Consumer
k := &Kafka{
ConsumerGroup: "telegraf_test_consumers",
Topics: []string{testTopic},
ZookeeperPeers: zkPeers,
PointBuffer: 100000,
Offset: "oldest",
}
if err := k.Start(); err != nil {
t.Fatal(err.Error())
} else {
defer k.Stop()
}
waitForPoint(k, t)
// Verify that we can now gather the sent message
var acc testutil.Accumulator
// Sanity check
assert.Equal(t, 0, len(acc.Points), "there should not be any points")
assert.Equal(t, 0, len(acc.Points), "There should not be any points")
// Gather points
err = k.Gather(&acc)
require.NoError(t, err)
assert.Equal(t, 1, len(acc.Points), "there should be a single point")
point := acc.Points[0]
assert.Equal(t, "cpu_load_short", point.Measurement)
assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Fields)
assert.Equal(t, map[string]string{
"host": "server01",
"direction": "in",
"region": "us-west",
}, point.Tags)
assert.Equal(t, time.Unix(0, 1422568543702900257).Unix(), point.Time.Unix())
if len(acc.Points) == 1 {
point := acc.Points[0]
assert.Equal(t, "cpu_load_short", point.Measurement)
assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Fields)
assert.Equal(t, map[string]string{
"host": "server01",
"direction": "in",
"region": "us-west",
}, point.Tags)
assert.Equal(t, time.Unix(0, 1422568543702900257).Unix(), point.Time.Unix())
} else {
t.Errorf("No points found in accumulator, expected 1")
}
}
// Waits for the metric that was sent to the kafka broker to arrive at the kafka
// consumer
func waitForPoint(k *Kafka, t *testing.T) {
// Give the kafka container up to 2 seconds to get the point to the consumer
ticker := time.NewTicker(5 * time.Millisecond)
counter := 0
for {
select {
case <-ticker.C:
counter++
if counter > 1000 {
t.Fatal("Waited for 5s, point never arrived to consumer")
} else if len(k.pointChan) == 1 {
return
}
}
}
}