mercredi 28 novembre 2018

sbt test fails my spark testing suite while intellij test works

I am trying to test the behaviour of a class which eats and process DataFrames.

Following this previous questions: How to write unit tests in Spark 2.0+? I tried to use the loan pattern to run my tests in the following way: I have a SparkSession trait:

/**
  * This trait allows to use spark in Unit tests
  * https://stackoverflow.com/questions/43729262/how-to-write-unit-tests-in-spark-2-0
 */
trait SparkSetup {

  def withSparkSession(testMethod: SparkSession => Any) {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("Spark test")
    val sparkSession = SparkSession
      .builder()
      .config(conf)
      .enableHiveSupport()
      .getOrCreate()
    try {
      testMethod(sparkSession)
    }
    // finally sparkSession.stop()
  }
}

Which I use in my test class:

class InnerNormalizationStrategySpec
  extends WordSpec
  with Matchers
  with BeforeAndAfterAll
  with SparkSetup {
 ...
 "A correct contact message" should {
    "be normalized without errors" in withSparkSession{ ss => {

      import ss.implicits._

      val df = ss.createDataFrame(
        ss.sparkContext.parallelize(Seq[Row](Row(validContact))),
        StructType(List(StructField("value", StringType, nullable = false))))

      val result = target.innerTransform(df)

      val collectedResult: Array[NormalizedContactHistoryMessage] = result
        .where(result.col("contact").isNotNull)
        .as[NormalizedContactHistoryMessage]
        .collect()

      collectedResult.isEmpty should be(false) // There should be something
      collectedResult.length should be(1) // There should be exactly 1 message...
      collectedResult.head.contact.isDefined should be(true) // ... of type contact.
    }}
  }
 ...
}

When attempting to run my tests using IntelliJ facility, all tests written in this manner works (running the Spec class at once), however, the sbt test command from terminal makes all the tests fail. I thought also it was because of parallelism, so i added concurrentRestrictions in Global += Tags.limit(Tags.Test, 1)

in my settings, but didn't work. Any help?

Aucun commentaire:

Enregistrer un commentaire