package kafka_consumer import ( "strings" "testing" "time" "github.com/influxdb/telegraf/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/Shopify/sarama.v1" ) const testMsg = "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257" func TestReadFromKafkaBatchesMsgsOnBatchSize(t *testing.T) { halt := make(chan bool, 1) metricChan := make(chan []byte, 1) kafkaChan := make(chan *sarama.ConsumerMessage, 10) for i := 0; i < 10; i++ { kafkaChan <- saramaMsg(testMsg) } expectedBatch := strings.Repeat(testMsg+"\n", 9) + testMsg readFromKafka(kafkaChan, metricChan, 10, func(msg *sarama.ConsumerMessage) error { batch := <-metricChan assert.Equal(t, expectedBatch, string(batch)) halt <- true return nil }, halt) } func TestReadFromKafkaBatchesMsgsOnTimeout(t *testing.T) { halt := make(chan bool, 1) metricChan := make(chan []byte, 1) kafkaChan := make(chan *sarama.ConsumerMessage, 10) for i := 0; i < 3; i++ { kafkaChan <- saramaMsg(testMsg) } expectedBatch := strings.Repeat(testMsg+"\n", 2) + testMsg readFromKafka(kafkaChan, metricChan, 10, func(msg *sarama.ConsumerMessage) error { batch := <-metricChan assert.Equal(t, expectedBatch, string(batch)) halt <- true return nil }, halt) } func TestEmitMetricsSendMetricsToAcc(t *testing.T) { k := &Kafka{} var acc testutil.Accumulator testChan := make(chan []byte, 1) testChan <- []byte(testMsg) err := emitMetrics(k, &acc, testChan) require.NoError(t, err) assert.Equal(t, 1, len(acc.Points), "there should be a single point") point := acc.Points[0] assert.Equal(t, "cpu_load_short", point.Measurement) assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Values) assert.Equal(t, map[string]string{ "host": "server01", "direction": "in", "region": "us-west", }, point.Tags) assert.Equal(t, time.Unix(0, 1422568543702900257), point.Time) } func TestEmitMetricsTimesOut(t *testing.T) { k := &Kafka{} var acc testutil.Accumulator testChan := make(chan []byte) err := emitMetrics(k, &acc, testChan) require.NoError(t, err) assert.Equal(t, 0, len(acc.Points), "there should not be a any points") } func saramaMsg(val string) *sarama.ConsumerMessage { return &sarama.ConsumerMessage{ Key: nil, Value: []byte(val), Offset: 0, Partition: 0, } }