I'm trying to convert a Java test sample into a Scala test sample, but something doesn't seem to work.
class KafkaMostBasicTest extends FlatSpec with Matchers with BeforeAndAfterEach {
private val topic: String = "topic1-" + System.currentTimeMillis
private var server: KafkaTestFixture = _
private var producer: Producer[String, String] = _
private var consumerConnector: ConsumerConnector = _
override def beforeEach() {
this.server = new KafkaTestFixture()
this.server.start(serverProperties())
//Create a producer
this.producer = new KafkaProducer[String, String](producerProps())
//send a message
this.producer.send(new ProducerRecord(topic, "message")).get()
}
override def afterEach() {
this.producer.close()
this.consumerConnector.shutdown()
this.server.stop()
}
"The kefka message" should "be 'message'" in {
//Create a consumer
val it: ConsumerIterator[String, String] = buildConsumer(topic)
//read it back
val messageAndMetadata: MessageAndMetadata[String, String] = it.next()
val value: String = messageAndMetadata.message()
value shouldEqual "message"
}
private def serverProperties(): Properties = {
val props = new Properties()
props.put("zookeeper.connect", "localhost:2181")
props.put("broker.id", "1")
props
}
private class KafkaTestFixture {
private var zk: TestingServer = _
private var kafka: KafkaServerStartable = _
@throws[Exception]
def start(properties: Properties) {
val port: Integer = getZkPort(properties)
this.zk = new TestingServer(port)
this.zk.start()
val kafkaConfig = new KafkaConfig(properties)
this.kafka = new KafkaServerStartable(kafkaConfig)
this.kafka.startup()
}
@throws[IOException]
def stop() {
this.kafka.shutdown()
this.zk.stop()
this.zk.close()
}
private def getZkPort(properties: Properties): Integer = {
val url: String = properties.get("zookeeper.connect").asInstanceOf[String]
val port: String = url.split(":")(1)
Integer.valueOf(port)
}
}
}
Using elimination, I tracked the problematic line this.kafka.startup()
. Until that line, the test runs successfully, but when trying to execute this line the test just stops. No errors, no failures.
I tried adding try/catch and printing out any error that might occur, but there are no problems/errors that I can see.
The resulting output from the test run is:
$ sbt test
[info] Loading project definition from /mnt/c/tmp/sbt-kafka-test/project
[info] Loading settings for project root from build.sbt ...
[info] Set current project to sbt-kafka-test (in build file:/mnt/c/tmp/sbt-kafka-test/)
[info] Compiling 1 Scala source to /mnt/c/tmp/sbt-kafka-test/target/scala-2.12/test-classes ...
[warn] there was one deprecation warning; re-run with -deprecation for details
[warn] one warning found
[info] Done compiling.
log4j:WARN No appenders could be found for logger (org.apache.zookeeper.server.ZooKeeperServerMain).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[info] HelloTest:
[info] The Hello object
[info] - should say hello
Full Scala project is available on GitHub
A working Java project is also available on GitHub.
Aucun commentaire:
Enregistrer un commentaire