package kafka_consumer import ( "fmt" "testing" "time" "github.com/Shopify/sarama" "github.com/influxdb/telegraf/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestReadsMetricsFromKafka(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } brokerPeers := []string{testutil.GetLocalHost() + ":9092"} zkPeers := []string{testutil.GetLocalHost() + ":2181"} testTopic := fmt.Sprintf("telegraf_test_topic_%d", time.Now().Unix()) // Send a Kafka message to the kafka host msg := "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257" producer, err := sarama.NewSyncProducer(brokerPeers, nil) require.NoError(t, err) _, _, err = producer.SendMessage( &sarama.ProducerMessage{ Topic: testTopic, Value: sarama.StringEncoder(msg), }) require.NoError(t, err) defer producer.Close() // Start the Kafka Consumer k := &Kafka{ ConsumerGroup: "telegraf_test_consumers", Topics: []string{testTopic}, ZookeeperPeers: zkPeers, PointBuffer: 100000, Offset: "oldest", } if err := k.Start(); err != nil { t.Fatal(err.Error()) } else { defer k.Stop() } waitForPoint(k, t) // Verify that we can now gather the sent message var acc testutil.Accumulator // Sanity check assert.Equal(t, 0, len(acc.Points), "There should not be any points") // Gather points err = k.Gather(&acc) require.NoError(t, err) if len(acc.Points) == 1 { point := acc.Points[0] assert.Equal(t, "cpu_load_short", point.Measurement) assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Fields) assert.Equal(t, map[string]string{ "host": "server01", "direction": "in", "region": "us-west", }, point.Tags) assert.Equal(t, time.Unix(0, 1422568543702900257).Unix(), point.Time.Unix()) } else { t.Errorf("No points found in accumulator, expected 1") } } // Waits for the metric that was sent to the kafka broker to arrive at the kafka // consumer func waitForPoint(k *Kafka, t *testing.T) { // Give the kafka container up to 2 seconds to get the point to the consumer ticker := time.NewTicker(5 * time.Millisecond) counter := 0 for { select { case <-ticker.C: counter++ if counter > 1000 { t.Fatal("Waited for 5s, point never arrived to consumer") } else if len(k.pointChan) == 1 { return } } } }