I want to test an application that consumes Kafka messages and writes them to log. Here is approximate representation of it in scala-like pseudo-code:
import kafka.consumer.Consumer
import kafka.consumer.ConsumerConfig
import org.slf4j.LoggerFactory
import java.util.Properties
import java.util.HashMap
object ConsumerApp extends App {
val topic = new HashMap[String, Integer]()
topic.put("test", 1)
val logger = LoggerFactory.getLogger(getClass().getName())
val messageStream = Consumer
.createJavaConsumerConnector(new ConsumerConfig(new Properties()))
.createMessageStreams(topic)
.get(topic).get(0)
for (message <- messageStream) {
val gotMessage = new String(message.message())
logger.info(gotMessage)
}
}
The test scenario I have in mind is following:
-
Kafka server is started.
-
The application is started and connects to the Kafka server, beginning to listen for messages on a specific to topic.
-
A message is sent to the topic.
-
The application consumes the message and logs it.
Here is the draft of the test in scala-like pseudocode:
import uk.org.lidalia.slf4jtest.TestLoggerFactory;
import uk.org.lidalia.slf4jtest.LoggingEvent.info;
abstract class UnitSpec extends FlatSpec with Matchers with EmbeddedKafka {
}
class ConsumerAppSpec extends UnitSpec {
"ConsumerApp" should "consume and log messages from Kafka on specific topic" in {
withRunningKafka {
val consumer = ConsumerApp
// interecept logger to be able to test that the kafka message is logged
val logger = TestLoggerFactory.getTestLogger(consumer.getClass)
// start the application, but beforehand do something to prevent it infinitely blocking
???
consumer.main(Array())
// publish a test message
publishStringMessageToKafka("test", "TEST")
// Confirm that the message has been properly logged
???
}
EmbeddedKafka.stop()
}
}
My issue is with the are of test code at the first three question marks. If I execute the main() method, it won't terminate, preventing remainder of the test from being performed.
Aucun commentaire:
Enregistrer un commentaire