mardi 10 juillet 2018

How to perform functional testing of a kafka consumer application?

I want to test an application that consumes Kafka messages and writes them to log. Here is approximate representation of it in scala-like pseudo-code:

import kafka.consumer.Consumer
import kafka.consumer.ConsumerConfig
import org.slf4j.LoggerFactory
import java.util.Properties
import java.util.HashMap

object ConsumerApp extends App {
  val topic = new HashMap[String, Integer]()
  topic.put("test", 1)
  val logger = LoggerFactory.getLogger(getClass().getName())
  val messageStream = Consumer
    .createJavaConsumerConnector(new ConsumerConfig(new Properties()))
    .createMessageStreams(topic)
    .get(topic).get(0)
  for (message <- messageStream) {
    val gotMessage = new String(message.message())
    logger.info(gotMessage)
  }
}

The test scenario I have in mind is following:

  • Kafka server is started.

  • The application is started and connects to the Kafka server, beginning to listen for messages on a specific to topic.

  • A message is sent to the topic.

  • The application consumes the message and logs it.

Here is the draft of the test in scala-like pseudocode:

import uk.org.lidalia.slf4jtest.TestLoggerFactory;
import uk.org.lidalia.slf4jtest.LoggingEvent.info;

abstract class UnitSpec extends FlatSpec with Matchers with EmbeddedKafka {

}

class ConsumerAppSpec extends UnitSpec {
  "ConsumerApp" should "consume and log messages from Kafka on specific topic" in {
    withRunningKafka {
      val consumer = ConsumerApp
      // interecept logger to be able to test that the kafka message is logged
      val logger = TestLoggerFactory.getTestLogger(consumer.getClass)
      // start the application, but beforehand do something to prevent it infinitely blocking
      ???
      consumer.main(Array())  

      // publish a test message    
      publishStringMessageToKafka("test", "TEST")
      // Confirm that the message has been properly logged
      ???
    }
    EmbeddedKafka.stop()
  }
}

My issue is with the are of test code at the first three question marks. If I execute the main() method, it won't terminate, preventing remainder of the test from being performed.

Aucun commentaire:

Enregistrer un commentaire