lundi 3 décembre 2018

Test freezes when trying to run a KafkaServerStartable with Scala

I'm trying to convert a Java test sample into a Scala test sample, but something doesn't seem to work.

class KafkaMostBasicTest extends FlatSpec with Matchers with BeforeAndAfterEach {

  private val topic: String = "topic1-" + System.currentTimeMillis

  private var server: KafkaTestFixture = _
  private var producer: Producer[String, String] = _
  private var consumerConnector: ConsumerConnector = _

  override def beforeEach() {
    this.server = new KafkaTestFixture()
    this.server.start(serverProperties())

    //Create a producer
    this.producer = new KafkaProducer[String, String](producerProps())

    //send a message
    this.producer.send(new ProducerRecord(topic, "message")).get()

  }

  override def afterEach() {
    this.producer.close()
    this.consumerConnector.shutdown()
    this.server.stop()
  }

  "The kefka message" should "be 'message'" in {

    //Create a consumer
    val it: ConsumerIterator[String, String] = buildConsumer(topic)

    //read it back
    val messageAndMetadata: MessageAndMetadata[String, String] = it.next()
    val value: String = messageAndMetadata.message()
    value shouldEqual "message"
  }

  private def serverProperties(): Properties = {
    val props = new Properties()
    props.put("zookeeper.connect", "localhost:2181")
    props.put("broker.id", "1")
    props
  }

  private class KafkaTestFixture {
    private var zk: TestingServer = _
    private var kafka: KafkaServerStartable = _

    @throws[Exception]
    def start(properties: Properties) {
      val port: Integer = getZkPort(properties)
      this.zk = new TestingServer(port)
      this.zk.start()

      val kafkaConfig = new KafkaConfig(properties)
      this.kafka = new KafkaServerStartable(kafkaConfig)
      this.kafka.startup()
    }

    @throws[IOException]
    def stop()  {
      this.kafka.shutdown()
      this.zk.stop()
      this.zk.close()
    }

    private def getZkPort(properties: Properties): Integer = {
      val url: String = properties.get("zookeeper.connect").asInstanceOf[String]
      val port: String = url.split(":")(1)
      Integer.valueOf(port)
    }
  }
}

Using elimination, I tracked the problematic line this.kafka.startup(). Until that line, the test runs successfully, but when trying to execute this line the test just stops. No errors, no failures.

I tried adding try/catch and printing out any error that might occur, but there are no problems/errors that I can see.

The resulting output from the test run is:

$ sbt test
[info] Loading project definition from /mnt/c/tmp/sbt-kafka-test/project
[info] Loading settings for project root from build.sbt ...
[info] Set current project to sbt-kafka-test (in build file:/mnt/c/tmp/sbt-kafka-test/)
[info] Compiling 1 Scala source to /mnt/c/tmp/sbt-kafka-test/target/scala-2.12/test-classes ...
[warn] there was one deprecation warning; re-run with -deprecation for details
[warn] one warning found
[info] Done compiling.
log4j:WARN No appenders could be found for logger (org.apache.zookeeper.server.ZooKeeperServerMain).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[info] HelloTest:
[info] The Hello object
[info] - should say hello

Full Scala project is available on GitHub

A working Java project is also available on GitHub.

Aucun commentaire:

Enregistrer un commentaire